class ExperimentRunner:
"""Discover, load, and run all experiments under ``experiments_dir``.
Each subdirectory that contains a ``task.py`` file is treated as one
experiment. The directory name must match the name of a Phoenix dataset.
Parameters
----------
experiments_dir:
Root directory containing per-experiment subdirectories.
phoenix_client:
An initialised ``phoenix.client.Client`` instance.
concurrency:
Number of experiments to run in parallel. Default: 1 (sequential).
dry_run:
If ``True``, run one example per experiment without uploading results.
If an ``int``, run that many examples.
"""
def __init__(
self,
experiments_dir: Path | str,
phoenix_client: Client,
*,
concurrency: int = 1,
dry_run: bool | int = False,
) -> None:
self.experiments_dir = Path(experiments_dir)
self.client = phoenix_client
self.concurrency = concurrency
self.dry_run = dry_run
def run(
self,
names: list[str] | None = None,
*,
experiment_name_prefix: str = "eval",
metadata: dict[str, Any] | None = None,
) -> list[Any]:
"""Discover, load, and run experiments.
Parameters
----------
names:
If provided, only run experiments whose directory names are in
this list. If ``None``, run all discovered experiments.
experiment_name_prefix:
Prefix for the auto-generated experiment name in Phoenix.
Format: ``"{prefix}_{dataset_name}_{iso_timestamp}"``.
metadata:
Extra key/value pairs attached to each experiment record.
Returns
-------
list
One experiment result object per experiment.
"""
experiments = self._discover(names)
if not experiments:
logger.warning("No experiments found in %s.", self.experiments_dir)
return []
results: list[Any] = []
failed = False
def _run_one(exp_name: str, task: Any, evaluators: list[Any]) -> Any:
dataset = self._get_dataset(exp_name)
if dataset is None:
return None
# Phoenix's sync client cannot await coroutine functions directly.
# Wrap async tasks in a sync bridge so they work transparently.
# We use a persistent per-thread event loop instead of asyncio.run()
# because asyncio.run() closes the loop after each call, which breaks
# async libraries (httpx, anyio) that try to clean up transports on a
# loop that is already gone ("RuntimeError: Event loop is closed").
if inspect.iscoroutinefunction(task):
_async_task = task
def task(example: Any, _fn: Any = _async_task) -> Any: # noqa: E731
return _get_thread_event_loop().run_until_complete(_fn(example))
timestamp = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
experiment_name = f"{experiment_name_prefix}_{exp_name}_{timestamp}"
exp_metadata = {"dataset": exp_name, "run_by": "evalwire"}
if metadata:
exp_metadata.update(metadata)
logger.info("Running experiment %r…", experiment_name)
return self.client.experiments.run_experiment(
dataset=dataset,
task=task,
evaluators=evaluators,
experiment_name=experiment_name,
experiment_metadata=exp_metadata,
dry_run=self.dry_run,
)
with ThreadPoolExecutor(max_workers=self.concurrency) as executor:
futures = {
executor.submit(_run_one, exp_name, task, evaluators): exp_name
for exp_name, task, evaluators in experiments
}
for future in as_completed(futures):
exp_name = futures[future]
try:
result = future.result()
if result is None:
failed = True
else:
results.append(result)
except Exception as exc:
logger.error("Experiment %r failed: %s", exp_name, exc)
failed = True
if failed:
raise SystemExit(1)
return results
def _discover(self, names: list[str] | None) -> list[tuple[str, Any, list[Any]]]:
"""Return a list of ``(name, task, evaluators)`` tuples."""
found: list[tuple[str, Any, list[Any]]] = []
if not self.experiments_dir.is_dir():
logger.error(
"Experiments directory %s does not exist.", self.experiments_dir
)
return found
# Temporarily insert the parent of experiments_dir on sys.path so that
# relative imports inside experiment modules work. We use try/finally to
# restore sys.path afterwards so the mutation does not leak into the
# broader process (important in long-running servers and test suites).
parent = str(self.experiments_dir.parent.resolve())
path_inserted = parent not in sys.path
if path_inserted:
sys.path.insert(0, parent)
try:
for subdir in sorted(self.experiments_dir.iterdir()):
if not subdir.is_dir():
continue
exp_name = subdir.name
if names is not None and exp_name not in names:
continue
task_file = subdir / "task.py"
if not task_file.exists():
logger.debug("Skipping %r — no task.py found.", exp_name)
continue
init_file = subdir / "__init__.py"
if not init_file.exists():
init_file.touch()
logger.debug("Created missing __init__.py in %r.", exp_name)
task = self._load_attribute(task_file, "task")
if task is None:
logger.warning(
"task.py in %r has no 'task' callable; skipping.", exp_name
)
continue
evaluators: list[Any] = []
for py_file in sorted(subdir.glob("*.py")):
if py_file.stem in ("task", "__init__"):
continue
evaluator = self._load_attribute(py_file, py_file.stem)
if evaluator is not None:
evaluators.append(evaluator)
else:
logger.warning(
"Evaluator file %s has no callable %r; skipping.",
py_file,
py_file.stem,
)
found.append((exp_name, task, evaluators))
logger.debug(
"Discovered experiment %r with %d evaluator(s).",
exp_name,
len(evaluators),
)
finally:
if path_inserted and parent in sys.path:
sys.path.remove(parent)
return found
def _load_attribute(self, path: Path, attribute: str) -> Any:
"""Import a Python file and return the named attribute, or None.
We use the low-level ``importlib.util.spec_from_file_location`` /
``module_from_spec`` / ``exec_module`` API (rather than the higher-level
``importlib.import_module``) deliberately: it lets us assign a stable,
unique module name (``_evalwire_exp_<dir>_<stem>``) without polluting
any package namespace or relying on the directory structure being a
proper Python package.
"""
module_name = f"_evalwire_exp_{path.parent.name}_{path.stem}"
try:
spec = importlib.util.spec_from_file_location(module_name, path)
if spec is None or spec.loader is None:
return None
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module) # type: ignore[union-attr]
return getattr(module, attribute, None)
except Exception as exc:
logger.error("Failed to load %s: %s", path, exc, exc_info=True)
return None
def _get_dataset(self, name: str) -> Any | None:
try:
return self.client.datasets.get_dataset(dataset=name)
except Exception as exc:
logger.warning(
"No Phoenix dataset named %r; skipping experiment (%s).", name, exc
)
return None