Skip to content

evalwire.runner

evalwire.runner

ExperimentRunner — auto-discovers and executes evalwire experiments.

ExperimentRunner

Discover, load, and run all experiments under experiments_dir.

Each subdirectory that contains a task.py file is treated as one experiment. The directory name must match the name of a Phoenix dataset.

Parameters

experiments_dir: Root directory containing per-experiment subdirectories. phoenix_client: An initialised phoenix.client.Client instance. concurrency: Number of experiments to run in parallel. Default: 1 (sequential). dry_run: If True, run one example per experiment without uploading results. If an int, run that many examples.

Source code in src/evalwire/runner.py
class ExperimentRunner:
    """Discover, load, and run all experiments under ``experiments_dir``.

    Each subdirectory that contains a ``task.py`` file is treated as one
    experiment. The directory name must match the name of a Phoenix dataset.

    Parameters
    ----------
    experiments_dir:
        Root directory containing per-experiment subdirectories.
    phoenix_client:
        An initialised ``phoenix.client.Client`` instance.
    concurrency:
        Number of experiments to run in parallel. Default: 1 (sequential).
    dry_run:
        If ``True``, run one example per experiment without uploading results.
        If an ``int``, run that many examples.
    """

    def __init__(
        self,
        experiments_dir: Path | str,
        phoenix_client: Client,
        *,
        concurrency: int = 1,
        dry_run: bool | int = False,
    ) -> None:
        self.experiments_dir = Path(experiments_dir)
        self.client = phoenix_client
        self.concurrency = concurrency
        self.dry_run = dry_run

    def run(
        self,
        names: list[str] | None = None,
        *,
        experiment_name_prefix: str = "eval",
        metadata: dict[str, Any] | None = None,
    ) -> list[Any]:
        """Discover, load, and run experiments.

        Parameters
        ----------
        names:
            If provided, only run experiments whose directory names are in
            this list. If ``None``, run all discovered experiments.
        experiment_name_prefix:
            Prefix for the auto-generated experiment name in Phoenix.
            Format: ``"{prefix}_{dataset_name}_{iso_timestamp}"``.
        metadata:
            Extra key/value pairs attached to each experiment record.

        Returns
        -------
        list
            One experiment result object per experiment.
        """
        experiments = self._discover(names)
        if not experiments:
            logger.warning("No experiments found in %s.", self.experiments_dir)
            return []

        results: list[Any] = []
        failed = False

        def _run_one(exp_name: str, task: Any, evaluators: list[Any]) -> Any:
            dataset = self._get_dataset(exp_name)
            if dataset is None:
                return None

            # Phoenix's sync client cannot await coroutine functions directly.
            # Wrap async tasks in a sync bridge so they work transparently.
            # We use a persistent per-thread event loop instead of asyncio.run()
            # because asyncio.run() closes the loop after each call, which breaks
            # async libraries (httpx, anyio) that try to clean up transports on a
            # loop that is already gone ("RuntimeError: Event loop is closed").
            if inspect.iscoroutinefunction(task):
                _async_task = task

                def task(example: Any, _fn: Any = _async_task) -> Any:  # noqa: E731
                    return _get_thread_event_loop().run_until_complete(_fn(example))

            timestamp = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
            experiment_name = f"{experiment_name_prefix}_{exp_name}_{timestamp}"
            exp_metadata = {"dataset": exp_name, "run_by": "evalwire"}
            if metadata:
                exp_metadata.update(metadata)

            logger.info("Running experiment %r…", experiment_name)
            return self.client.experiments.run_experiment(
                dataset=dataset,
                task=task,
                evaluators=evaluators,
                experiment_name=experiment_name,
                experiment_metadata=exp_metadata,
                dry_run=self.dry_run,
            )

        with ThreadPoolExecutor(max_workers=self.concurrency) as executor:
            futures = {
                executor.submit(_run_one, exp_name, task, evaluators): exp_name
                for exp_name, task, evaluators in experiments
            }
            for future in as_completed(futures):
                exp_name = futures[future]
                try:
                    result = future.result()
                    if result is None:
                        failed = True
                    else:
                        results.append(result)
                except Exception as exc:
                    logger.error("Experiment %r failed: %s", exp_name, exc)
                    failed = True

        if failed:
            raise SystemExit(1)

        return results

    def _discover(self, names: list[str] | None) -> list[tuple[str, Any, list[Any]]]:
        """Return a list of ``(name, task, evaluators)`` tuples."""
        found: list[tuple[str, Any, list[Any]]] = []

        if not self.experiments_dir.is_dir():
            logger.error(
                "Experiments directory %s does not exist.", self.experiments_dir
            )
            return found

        # Temporarily insert the parent of experiments_dir on sys.path so that
        # relative imports inside experiment modules work. We use try/finally to
        # restore sys.path afterwards so the mutation does not leak into the
        # broader process (important in long-running servers and test suites).
        parent = str(self.experiments_dir.parent.resolve())
        path_inserted = parent not in sys.path
        if path_inserted:
            sys.path.insert(0, parent)

        try:
            for subdir in sorted(self.experiments_dir.iterdir()):
                if not subdir.is_dir():
                    continue
                exp_name = subdir.name
                if names is not None and exp_name not in names:
                    continue

                task_file = subdir / "task.py"
                if not task_file.exists():
                    logger.debug("Skipping %r — no task.py found.", exp_name)
                    continue

                init_file = subdir / "__init__.py"
                if not init_file.exists():
                    init_file.touch()
                    logger.debug("Created missing __init__.py in %r.", exp_name)

                task = self._load_attribute(task_file, "task")
                if task is None:
                    logger.warning(
                        "task.py in %r has no 'task' callable; skipping.", exp_name
                    )
                    continue

                evaluators: list[Any] = []
                for py_file in sorted(subdir.glob("*.py")):
                    if py_file.stem in ("task", "__init__"):
                        continue
                    evaluator = self._load_attribute(py_file, py_file.stem)
                    if evaluator is not None:
                        evaluators.append(evaluator)
                    else:
                        logger.warning(
                            "Evaluator file %s has no callable %r; skipping.",
                            py_file,
                            py_file.stem,
                        )

                found.append((exp_name, task, evaluators))
                logger.debug(
                    "Discovered experiment %r with %d evaluator(s).",
                    exp_name,
                    len(evaluators),
                )
        finally:
            if path_inserted and parent in sys.path:
                sys.path.remove(parent)

        return found

    def _load_attribute(self, path: Path, attribute: str) -> Any:
        """Import a Python file and return the named attribute, or None.

        We use the low-level ``importlib.util.spec_from_file_location`` /
        ``module_from_spec`` / ``exec_module`` API (rather than the higher-level
        ``importlib.import_module``) deliberately: it lets us assign a stable,
        unique module name (``_evalwire_exp_<dir>_<stem>``) without polluting
        any package namespace or relying on the directory structure being a
        proper Python package.
        """
        module_name = f"_evalwire_exp_{path.parent.name}_{path.stem}"
        try:
            spec = importlib.util.spec_from_file_location(module_name, path)
            if spec is None or spec.loader is None:
                return None
            module = importlib.util.module_from_spec(spec)
            sys.modules[module_name] = module
            spec.loader.exec_module(module)  # type: ignore[union-attr]
            return getattr(module, attribute, None)
        except Exception as exc:
            logger.error("Failed to load %s: %s", path, exc, exc_info=True)
            return None

    def _get_dataset(self, name: str) -> Any | None:
        try:
            return self.client.datasets.get_dataset(dataset=name)
        except Exception as exc:
            logger.warning(
                "No Phoenix dataset named %r; skipping experiment (%s).", name, exc
            )
            return None

run(names=None, *, experiment_name_prefix='eval', metadata=None)

Discover, load, and run experiments.

Parameters

names: If provided, only run experiments whose directory names are in this list. If None, run all discovered experiments. experiment_name_prefix: Prefix for the auto-generated experiment name in Phoenix. Format: "{prefix}_{dataset_name}_{iso_timestamp}". metadata: Extra key/value pairs attached to each experiment record.

Returns

list One experiment result object per experiment.

Source code in src/evalwire/runner.py
def run(
    self,
    names: list[str] | None = None,
    *,
    experiment_name_prefix: str = "eval",
    metadata: dict[str, Any] | None = None,
) -> list[Any]:
    """Discover, load, and run experiments.

    Parameters
    ----------
    names:
        If provided, only run experiments whose directory names are in
        this list. If ``None``, run all discovered experiments.
    experiment_name_prefix:
        Prefix for the auto-generated experiment name in Phoenix.
        Format: ``"{prefix}_{dataset_name}_{iso_timestamp}"``.
    metadata:
        Extra key/value pairs attached to each experiment record.

    Returns
    -------
    list
        One experiment result object per experiment.
    """
    experiments = self._discover(names)
    if not experiments:
        logger.warning("No experiments found in %s.", self.experiments_dir)
        return []

    results: list[Any] = []
    failed = False

    def _run_one(exp_name: str, task: Any, evaluators: list[Any]) -> Any:
        dataset = self._get_dataset(exp_name)
        if dataset is None:
            return None

        # Phoenix's sync client cannot await coroutine functions directly.
        # Wrap async tasks in a sync bridge so they work transparently.
        # We use a persistent per-thread event loop instead of asyncio.run()
        # because asyncio.run() closes the loop after each call, which breaks
        # async libraries (httpx, anyio) that try to clean up transports on a
        # loop that is already gone ("RuntimeError: Event loop is closed").
        if inspect.iscoroutinefunction(task):
            _async_task = task

            def task(example: Any, _fn: Any = _async_task) -> Any:  # noqa: E731
                return _get_thread_event_loop().run_until_complete(_fn(example))

        timestamp = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
        experiment_name = f"{experiment_name_prefix}_{exp_name}_{timestamp}"
        exp_metadata = {"dataset": exp_name, "run_by": "evalwire"}
        if metadata:
            exp_metadata.update(metadata)

        logger.info("Running experiment %r…", experiment_name)
        return self.client.experiments.run_experiment(
            dataset=dataset,
            task=task,
            evaluators=evaluators,
            experiment_name=experiment_name,
            experiment_metadata=exp_metadata,
            dry_run=self.dry_run,
        )

    with ThreadPoolExecutor(max_workers=self.concurrency) as executor:
        futures = {
            executor.submit(_run_one, exp_name, task, evaluators): exp_name
            for exp_name, task, evaluators in experiments
        }
        for future in as_completed(futures):
            exp_name = futures[future]
            try:
                result = future.result()
                if result is None:
                    failed = True
                else:
                    results.append(result)
            except Exception as exc:
                logger.error("Experiment %r failed: %s", exp_name, exc)
                failed = True

    if failed:
        raise SystemExit(1)

    return results