Skip to content

evalwire.uploader

DatasetUploader reads a CSV testset and uploads it to Arize Phoenix as one named dataset per unique tag value. It handles three conflict modes (skip, overwrite, append) and supports multi-tag rows via a configurable delimiter.

Basic usage

from phoenix.client import Client
from evalwire.uploader import DatasetUploader

client = Client()

uploader = DatasetUploader(
    csv_path="data/testset.csv",
    phoenix_client=client,
)
datasets = uploader.upload(on_exist="skip")
print(datasets)  # {"es_search": <Dataset>, "source_router": <Dataset>}

CSV format

The CSV must contain at least a tag column, one input column, and one expected-output column:

user_query,expected_output,tags
"find cycling paths","url-a | url-b","es_search | source_router"
"find parks","url-c","es_search"

Pipe-delimited values in any column are split into lists. A row with tags = "es_search | source_router" is added to both datasets.

Conflict modes

on_exist Behaviour
"skip" Do nothing if the dataset already exists.
"overwrite" Delete the existing dataset and re-create it.
"append" Call add_examples_to_dataset on the existing dataset. If not found, create it.

Pitfalls

  • Phoenix raises ValueError (not a Phoenix-specific exception) when get_dataset is called for a non-existent dataset. evalwire catches this and creates the dataset instead.
  • There is no official delete method in the Phoenix Python client. evalwire calls the REST endpoint DELETE /v1/datasets/{id} directly for the overwrite mode.
  • Creating a dataset with a name that already exists returns a 409 Conflict error, not a new version. Use on_exist="overwrite" to replace it.

See also


evalwire.uploader

DatasetUploader — uploads a CSV testset to Arize Phoenix as named datasets.

DatasetUploader

Upload a human-curated CSV testset to Arize Phoenix.

Each unique value found in tag_column becomes a separate Phoenix dataset. A row tagged with multiple pipe-delimited values is added to each corresponding dataset.

Parameters

csv_path: Path to the CSV file. phoenix_client: An initialised phoenix.client.Client instance. input_keys: Column names that form the input of each dataset example. output_keys: Column names that form the output of each dataset example. tag_column: Column used to split rows into separate datasets. delimiter: Delimiter used to split multi-value cells (tags and output columns).

Source code in src/evalwire/uploader.py
class DatasetUploader:
    """Upload a human-curated CSV testset to Arize Phoenix.

    Each unique value found in ``tag_column`` becomes a separate Phoenix
    dataset. A row tagged with multiple pipe-delimited values is added to
    each corresponding dataset.

    Parameters
    ----------
    csv_path:
        Path to the CSV file.
    phoenix_client:
        An initialised ``phoenix.client.Client`` instance.
    input_keys:
        Column names that form the ``input`` of each dataset example.
    output_keys:
        Column names that form the ``output`` of each dataset example.
    tag_column:
        Column used to split rows into separate datasets.
    delimiter:
        Delimiter used to split multi-value cells (tags and output columns).
    """

    def __init__(
        self,
        csv_path: Path | str,
        phoenix_client: Client,
        input_keys: list[str] | None = None,
        output_keys: list[str] | None = None,
        tag_column: str = "tags",
        delimiter: str = "|",
    ) -> None:
        self.csv_path = Path(csv_path)
        self.client = phoenix_client
        self.input_keys = list(input_keys) if input_keys is not None else ["user_query"]
        self.output_keys = (
            list(output_keys) if output_keys is not None else ["expected_output"]
        )
        self.tag_column = tag_column
        self.delimiter = delimiter

    def upload(
        self,
        on_exist: Literal["skip", "overwrite", "append"] = "skip",
    ) -> dict[str, Any]:
        """Upload one Phoenix dataset per unique tag value found in the CSV.

        Parameters
        ----------
        on_exist:
            How to handle a dataset that already exists in Phoenix:
            - ``"skip"``      — leave the existing dataset untouched (default).
            - ``"overwrite"`` — delete and re-create.
            - ``"append"``    — add new examples to the existing dataset.

        Returns
        -------
        dict[str, Any]
            Mapping of tag name → created/updated dataset object.
        """
        df = self._load_csv()
        groups = self._group_by_tag(df)
        results: dict[str, Any] = {}

        for tag, group_df in groups.items():
            logger.info("Uploading dataset %r (%d rows)…", tag, len(group_df))
            dataset = self._upload_one(tag, group_df, on_exist)
            results[tag] = dataset

        return results

    def _load_csv(self) -> pd.DataFrame:
        df = pd.read_csv(self.csv_path)
        # Split any column whose values contain the delimiter character.
        # Use pd.api.types.is_string_dtype to support both object (pandas <3)
        # and the new StringDtype (pandas >=3).
        delimiter = self.delimiter
        for col in df.columns:
            if pd.api.types.is_string_dtype(df[col]):
                as_str = df[col].astype(str)
                mask = as_str.str.contains(delimiter, regex=False, na=False)
                if mask.any() or col == self.tag_column:
                    df[col] = as_str.apply(
                        lambda v, d=delimiter: (
                            [s.strip() for s in v.split(d)] if d in v else v
                        )
                    )
        return df

    def _group_by_tag(self, df: pd.DataFrame) -> dict[str, pd.DataFrame]:
        """Return one DataFrame per unique tag value."""
        groups: dict[str, list[int]] = {}
        for idx, row in df.iterrows():
            tags_cell = row[self.tag_column]
            tags: list[str] = (
                tags_cell if isinstance(tags_cell, list) else [str(tags_cell)]
            )
            for tag in tags:
                tag = tag.strip()
                if tag:
                    groups.setdefault(tag, []).append(cast(int, idx))

        return {
            tag: df.loc[indices].reset_index(drop=True)
            for tag, indices in groups.items()
        }

    def _upload_one(
        self,
        name: str,
        df: pd.DataFrame,
        on_exist: Literal["skip", "overwrite", "append"],
    ) -> Any:
        if on_exist == "skip":
            try:
                existing = self.client.datasets.get_dataset(dataset=name)
                logger.info("Dataset %r already exists, skipping.", name)
                return existing
            except ValueError:
                logger.debug(
                    "Dataset %r not found; creating it.",
                    name,
                )
            return self.client.datasets.create_dataset(
                dataframe=df,
                name=name,
                input_keys=self.input_keys,
                output_keys=self.output_keys,
            )

        elif on_exist == "overwrite":
            try:
                existing = self.client.datasets.get_dataset(dataset=name)
                self._delete_dataset(existing.id)
                logger.info("Deleted existing dataset %r; re-creating.", name)
            except ValueError:
                logger.debug("Dataset %r not found; creating it.", name)
            return self.client.datasets.create_dataset(
                dataframe=df,
                name=name,
                input_keys=self.input_keys,
                output_keys=self.output_keys,
            )

        else:  # "append"
            try:
                dataset = self.client.datasets.add_examples_to_dataset(
                    dataset=name,
                    dataframe=df,
                    input_keys=self.input_keys,
                    output_keys=self.output_keys,
                )
                logger.debug("Appended %d examples to dataset %r.", len(df), name)
                return dataset
            except ValueError:
                logger.debug(
                    "Dataset %r not found; creating it.",
                    name,
                )
            return self.client.datasets.create_dataset(
                dataframe=df,
                name=name,
                input_keys=self.input_keys,
                output_keys=self.output_keys,
            )

    def _delete_dataset(self, dataset_id: str) -> None:
        """Delete a dataset via the Phoenix REST API.

        The Phoenix Python client does not expose a delete method, but the
        server supports ``DELETE /v1/datasets/{id}``.
        """
        response = self.client.datasets._client.delete(
            url=f"v1/datasets/{quote(dataset_id)}",
            headers={"accept": "application/json"},
        )
        response.raise_for_status()

upload(on_exist='skip')

Upload one Phoenix dataset per unique tag value found in the CSV.

Parameters

on_exist: How to handle a dataset that already exists in Phoenix: - "skip" — leave the existing dataset untouched (default). - "overwrite" — delete and re-create. - "append" — add new examples to the existing dataset.

Returns

dict[str, Any] Mapping of tag name → created/updated dataset object.

Source code in src/evalwire/uploader.py
def upload(
    self,
    on_exist: Literal["skip", "overwrite", "append"] = "skip",
) -> dict[str, Any]:
    """Upload one Phoenix dataset per unique tag value found in the CSV.

    Parameters
    ----------
    on_exist:
        How to handle a dataset that already exists in Phoenix:
        - ``"skip"``      — leave the existing dataset untouched (default).
        - ``"overwrite"`` — delete and re-create.
        - ``"append"``    — add new examples to the existing dataset.

    Returns
    -------
    dict[str, Any]
        Mapping of tag name → created/updated dataset object.
    """
    df = self._load_csv()
    groups = self._group_by_tag(df)
    results: dict[str, Any] = {}

    for tag, group_df in groups.items():
        logger.info("Uploading dataset %r (%d rows)…", tag, len(group_df))
        dataset = self._upload_one(tag, group_df, on_exist)
        results[tag] = dataset

    return results