Skip to content

evalwire.runner

ExperimentRunner auto-discovers experiment subdirectories, fetches the matching Phoenix dataset for each one, and runs the task against every example. Results are stored in Phoenix and returned as a list.

Basic usage

from phoenix.client import Client
from evalwire.runner import ExperimentRunner

client = Client()

runner = ExperimentRunner(
    experiments_dir="experiments",
    phoenix_client=client,
    concurrency=2,
)
results = runner.run()

Directory layout

Each subdirectory of experiments_dir that contains a task.py is treated as one experiment. The directory name must exactly match a Phoenix dataset name:

experiments/
  es_search/           <- matches dataset named "es_search"
    task.py            <- must define async def task(example)
    top_k.py           <- evaluator: must define top_k = ...
    exact_match.py     <- evaluator: must define exact_match = ...
  source_router/       <- matches dataset named "source_router"
    task.py
    is_in.py

Subdirectories without task.py are silently skipped. Files (non-directories) at the top level of experiments_dir are also skipped.

Experiment naming

Each run in Phoenix is named {prefix}_{dataset_name}_{iso_timestamp}. The default prefix is "eval". Override it with experiment_name_prefix:

results = runner.run(experiment_name_prefix="nightly")
# produces e.g. "nightly_es_search_2025-01-15T09:30:00"

Running a subset

Pass names to run only specific experiments:

results = runner.run(names=["es_search"])

Dry run

Set dry_run=True (or dry_run=N to limit to N examples) to execute tasks without uploading results to Phoenix. Useful for smoke-testing your task code:

runner = ExperimentRunner(
    experiments_dir="experiments",
    phoenix_client=client,
    dry_run=3,
)
runner.run()

Async tasks

Tasks are async functions. evalwire wraps them in a per-thread event loop so Phoenix's synchronous runner can call them. The loop is kept open between calls (unlike asyncio.run()) so that async I/O libraries which reuse connections across calls work correctly.

Error behaviour

If any experiment fails (dataset not found, task raises, evaluator raises), runner.run() raises SystemExit(1) after all experiments complete so that CI pipelines fail loudly. Successful experiments are still returned.

Pitfalls

  • The dataset name must match the directory name exactly (case-sensitive).
  • At least one evaluator file is required per experiment. Phoenix raises an error if run_experiment is called with an empty evaluator list.
  • Relative imports inside experiment modules work because evalwire adds the parent of experiments_dir to sys.path during discovery. It is removed again afterwards.

See also


evalwire.runner

ExperimentRunner — auto-discovers and executes evalwire experiments.

ExperimentRunner

Discover, load, and run all experiments under experiments_dir.

Each subdirectory that contains a task.py file is treated as one experiment. The directory name must match the name of a Phoenix dataset.

Parameters

experiments_dir: Root directory containing per-experiment subdirectories. phoenix_client: An initialised phoenix.client.Client instance. concurrency: Number of experiments to run in parallel. Default: 1 (sequential). dry_run: If True, run one example per experiment without uploading results. If an int, run that many examples.

Source code in src/evalwire/runner.py
class ExperimentRunner:
    """Discover, load, and run all experiments under ``experiments_dir``.

    Each subdirectory that contains a ``task.py`` file is treated as one
    experiment. The directory name must match the name of a Phoenix dataset.

    Parameters
    ----------
    experiments_dir:
        Root directory containing per-experiment subdirectories.
    phoenix_client:
        An initialised ``phoenix.client.Client`` instance.
    concurrency:
        Number of experiments to run in parallel. Default: 1 (sequential).
    dry_run:
        If ``True``, run one example per experiment without uploading results.
        If an ``int``, run that many examples.
    """

    def __init__(
        self,
        experiments_dir: Path | str,
        phoenix_client: Client,
        *,
        concurrency: int = 1,
        dry_run: bool | int = False,
    ) -> None:
        self.experiments_dir = Path(experiments_dir)
        self.client = phoenix_client
        self.concurrency = concurrency
        self.dry_run = dry_run

    def run(
        self,
        names: list[str] | None = None,
        *,
        experiment_name_prefix: str = "eval",
        metadata: dict[str, Any] | None = None,
    ) -> list[Any]:
        """Discover, load, and run experiments.

        Parameters
        ----------
        names:
            If provided, only run experiments whose directory names are in
            this list. If ``None``, run all discovered experiments.
        experiment_name_prefix:
            Prefix for the auto-generated experiment name in Phoenix.
            Format: ``"{prefix}_{dataset_name}_{iso_timestamp}"``.
        metadata:
            Extra key/value pairs attached to each experiment record.

        Returns
        -------
        list
            One experiment result object per experiment.
        """
        experiments = self._discover(names)
        if not experiments:
            logger.warning("No experiments found in %s.", self.experiments_dir)
            return []

        results: list[Any] = []
        failed = False

        def _run_one(exp_name: str, task: Any, evaluators: list[Any]) -> Any:
            dataset = self._get_dataset(exp_name)
            if dataset is None:
                return None

            # Phoenix's sync client cannot await coroutine functions directly.
            # Wrap async tasks in a sync bridge so they work transparently.
            # We use a persistent per-thread event loop instead of asyncio.run()
            # because asyncio.run() closes the loop after each call, which breaks
            # async libraries (httpx, anyio) that try to clean up transports on a
            # loop that is already gone ("RuntimeError: Event loop is closed").
            if inspect.iscoroutinefunction(task):
                _async_task = task

                def task(example: Any, _fn: Any = _async_task) -> Any:  # noqa: E731
                    return _get_thread_event_loop().run_until_complete(_fn(example))

            timestamp = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
            experiment_name = f"{experiment_name_prefix}_{exp_name}_{timestamp}"
            exp_metadata = {"dataset": exp_name, "run_by": "evalwire"}
            if metadata:
                exp_metadata.update(metadata)

            logger.info("Running experiment %r…", experiment_name)
            return self.client.experiments.run_experiment(
                dataset=dataset,
                task=task,
                evaluators=evaluators,
                experiment_name=experiment_name,
                experiment_metadata=exp_metadata,
                dry_run=self.dry_run,
            )

        with ThreadPoolExecutor(max_workers=self.concurrency) as executor:
            futures = {
                executor.submit(_run_one, exp_name, task, evaluators): exp_name
                for exp_name, task, evaluators in experiments
            }
            for future in as_completed(futures):
                exp_name = futures[future]
                try:
                    result = future.result()
                    if result is None:
                        failed = True
                    else:
                        results.append(result)
                except Exception as exc:
                    logger.error("Experiment %r failed: %s", exp_name, exc)
                    failed = True

        if failed:
            raise SystemExit(1)

        return results

    def _discover(self, names: list[str] | None) -> list[tuple[str, Any, list[Any]]]:
        """Return a list of ``(name, task, evaluators)`` tuples."""
        found: list[tuple[str, Any, list[Any]]] = []

        if not self.experiments_dir.is_dir():
            logger.error(
                "Experiments directory %s does not exist.", self.experiments_dir
            )
            return found

        # Temporarily insert the parent of experiments_dir on sys.path so that
        # relative imports inside experiment modules work. We use try/finally to
        # restore sys.path afterwards so the mutation does not leak into the
        # broader process (important in long-running servers and test suites).
        parent = str(self.experiments_dir.parent.resolve())
        path_inserted = parent not in sys.path
        if path_inserted:
            sys.path.insert(0, parent)

        try:
            for subdir in sorted(self.experiments_dir.iterdir()):
                if not subdir.is_dir():
                    continue
                exp_name = subdir.name
                if names is not None and exp_name not in names:
                    continue

                task_file = subdir / "task.py"
                if not task_file.exists():
                    logger.debug("Skipping %r — no task.py found.", exp_name)
                    continue

                init_file = subdir / "__init__.py"
                if not init_file.exists():
                    init_file.touch()
                    logger.debug("Created missing __init__.py in %r.", exp_name)

                task = self._load_attribute(task_file, "task")
                if task is None:
                    logger.warning(
                        "task.py in %r has no 'task' callable; skipping.", exp_name
                    )
                    continue

                evaluators: list[Any] = []
                for py_file in sorted(subdir.glob("*.py")):
                    if py_file.stem in ("task", "__init__"):
                        continue
                    evaluator = self._load_attribute(py_file, py_file.stem)
                    if evaluator is not None:
                        evaluators.append(evaluator)
                    else:
                        logger.warning(
                            "Evaluator file %s has no callable %r; skipping.",
                            py_file,
                            py_file.stem,
                        )

                found.append((exp_name, task, evaluators))
                logger.debug(
                    "Discovered experiment %r with %d evaluator(s).",
                    exp_name,
                    len(evaluators),
                )
        finally:
            if path_inserted and parent in sys.path:
                sys.path.remove(parent)

        return found

    def _load_attribute(self, path: Path, attribute: str) -> Any:
        """Import a Python file and return the named attribute, or None.

        We use the low-level ``importlib.util.spec_from_file_location`` /
        ``module_from_spec`` / ``exec_module`` API (rather than the higher-level
        ``importlib.import_module``) deliberately: it lets us assign a stable,
        unique module name (``_evalwire_exp_<dir>_<stem>``) without polluting
        any package namespace or relying on the directory structure being a
        proper Python package.
        """
        module_name = f"_evalwire_exp_{path.parent.name}_{path.stem}"
        try:
            spec = importlib.util.spec_from_file_location(module_name, path)
            if spec is None or spec.loader is None:
                return None
            module = importlib.util.module_from_spec(spec)
            sys.modules[module_name] = module
            spec.loader.exec_module(module)  # type: ignore[union-attr]
            return getattr(module, attribute, None)
        except Exception as exc:
            sys.modules.pop(module_name, None)
            logger.error("Failed to load %s: %s", path, exc, exc_info=True)
            return None

    def _get_dataset(self, name: str) -> Any | None:
        try:
            return self.client.datasets.get_dataset(dataset=name)
        except Exception as exc:
            logger.warning(
                "No Phoenix dataset named %r; skipping experiment (%s).", name, exc
            )
            return None

run(names=None, *, experiment_name_prefix='eval', metadata=None)

Discover, load, and run experiments.

Parameters

names: If provided, only run experiments whose directory names are in this list. If None, run all discovered experiments. experiment_name_prefix: Prefix for the auto-generated experiment name in Phoenix. Format: "{prefix}_{dataset_name}_{iso_timestamp}". metadata: Extra key/value pairs attached to each experiment record.

Returns

list One experiment result object per experiment.

Source code in src/evalwire/runner.py
def run(
    self,
    names: list[str] | None = None,
    *,
    experiment_name_prefix: str = "eval",
    metadata: dict[str, Any] | None = None,
) -> list[Any]:
    """Discover, load, and run experiments.

    Parameters
    ----------
    names:
        If provided, only run experiments whose directory names are in
        this list. If ``None``, run all discovered experiments.
    experiment_name_prefix:
        Prefix for the auto-generated experiment name in Phoenix.
        Format: ``"{prefix}_{dataset_name}_{iso_timestamp}"``.
    metadata:
        Extra key/value pairs attached to each experiment record.

    Returns
    -------
    list
        One experiment result object per experiment.
    """
    experiments = self._discover(names)
    if not experiments:
        logger.warning("No experiments found in %s.", self.experiments_dir)
        return []

    results: list[Any] = []
    failed = False

    def _run_one(exp_name: str, task: Any, evaluators: list[Any]) -> Any:
        dataset = self._get_dataset(exp_name)
        if dataset is None:
            return None

        # Phoenix's sync client cannot await coroutine functions directly.
        # Wrap async tasks in a sync bridge so they work transparently.
        # We use a persistent per-thread event loop instead of asyncio.run()
        # because asyncio.run() closes the loop after each call, which breaks
        # async libraries (httpx, anyio) that try to clean up transports on a
        # loop that is already gone ("RuntimeError: Event loop is closed").
        if inspect.iscoroutinefunction(task):
            _async_task = task

            def task(example: Any, _fn: Any = _async_task) -> Any:  # noqa: E731
                return _get_thread_event_loop().run_until_complete(_fn(example))

        timestamp = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
        experiment_name = f"{experiment_name_prefix}_{exp_name}_{timestamp}"
        exp_metadata = {"dataset": exp_name, "run_by": "evalwire"}
        if metadata:
            exp_metadata.update(metadata)

        logger.info("Running experiment %r…", experiment_name)
        return self.client.experiments.run_experiment(
            dataset=dataset,
            task=task,
            evaluators=evaluators,
            experiment_name=experiment_name,
            experiment_metadata=exp_metadata,
            dry_run=self.dry_run,
        )

    with ThreadPoolExecutor(max_workers=self.concurrency) as executor:
        futures = {
            executor.submit(_run_one, exp_name, task, evaluators): exp_name
            for exp_name, task, evaluators in experiments
        }
        for future in as_completed(futures):
            exp_name = futures[future]
            try:
                result = future.result()
                if result is None:
                    failed = True
                else:
                    results.append(result)
            except Exception as exc:
                logger.error("Experiment %r failed: %s", exp_name, exc)
                failed = True

    if failed:
        raise SystemExit(1)

    return results