Source code for flexeval.run_utils

"""Utilities for :mod:`~flexeval.runner`."""

import json
import logging

from flexeval import rubric
from flexeval.classes.dataset import Dataset
from flexeval.classes.eval_runner import EvalRunner
from flexeval.classes.eval_set_run import EvalSetRun, EvalSetRunDatasets
from flexeval.schema import evalrun_schema
from flexeval import data_loader

logger = logging.getLogger(__name__)


[docs] def build_eval_set_run(runner: EvalRunner) -> EvalSetRun: rubrics = rubric.load_rubric_metrics(runner.evalrun.rubric_paths) # TODO this code uses a model_name that does not appear in the Eval schema; should look into this model_name = json.dumps(None) evalsetrun = EvalSetRun.create( name=runner.evalrun.eval.name, notes=runner.evalrun.eval.notes, metrics=runner.evalrun.eval.metrics.model_dump_json(), metrics_graph_ordered_list=json.dumps(runner.metrics_graph_ordered_list), do_completion=runner.evalrun.eval.do_completion, completion_llm=( runner.evalrun.eval.completion_llm.model_dump_json() if runner.evalrun.eval.completion_llm is not None else json.dumps(None) ), model_name=model_name, grader_llm=( runner.evalrun.eval.grader_llm.model_dump_json() if runner.evalrun.eval.grader_llm is not None else json.dumps(None) ), # only save rubrics that will actually be used rubrics=json.dumps( { i["evaluation_name"]: rubrics[i["evaluation_name"]].model_dump() for i in runner.metrics_graph_ordered_list if i["evaluation_type"] == "rubric" } ), ) return evalsetrun
[docs] def find_dataset_by_name(name: str) -> Dataset | None: """Return the loaded Dataset with this name, or None if no such dataset exists. If a Dataset with this name exists but is not marked is_loaded (the remnant of a crashed prior load), it is treated as stale: cleaned up via :func:`_cleanup_stale_dataset` and None is returned, so the caller can proceed as if no dataset existed. Raises: ValueError: If more than one Dataset has this name, or if a stale unloaded Dataset has derived rows (metrics or eval-run links) that suggest a genuine integrity problem — see _cleanup_stale_dataset. """ # LIMIT 2: we only need to know 0, 1, or >1 results = list(Dataset.select().where(Dataset.name == name).limit(2)) if len(results) == 0: return None if len(results) > 1: raise ValueError(f"Multiple datasets with name '{name}'.") dataset = results[0] if not dataset.is_loaded: _cleanup_stale_dataset(dataset) return None return dataset
def _cleanup_stale_dataset(dataset: Dataset) -> None: """Delete a partially-loaded Dataset and its child rows. A Dataset with ``is_loaded=False`` is the remnant of a prior load that crashed between the Dataset row being committed and the final ``is_loaded=True`` save — its Thread/Turn/Message/ToolCall rows (if any) are partial and unusable. Derived rows (Metric, EvalSetRunDatasets) should never exist for an unloaded Dataset — they're only created after a successful load. If they do, something bypassed the normal flow and we refuse to touch it. """ if dataset.metrics_list.exists() or dataset.evalsetrun_links.exists(): raise ValueError( f"Dataset '{dataset.name}' (ID={dataset.id}) has is_loaded=False but " "has metrics or eval-run links — refusing to clean up (possible integrity error)." ) counts = { "threads": dataset.threads.count(), "turns": dataset.turns.count(), "messages": dataset.messages.count(), "toolcalls": dataset.toolcalls.count(), } logger.warning( f"Dropping unloaded dataset '{dataset.name}' (ID={dataset.id}); " f"partial rows from a prior failed load: {counts}. Reloading from scratch." ) dataset.delete_instance(recursive=True)
[docs] def create_dataset(data_source: evalrun_schema.DataSource) -> Dataset: dataset = Dataset.create( datasource_type=type(data_source).__name__, name=data_source.name, notes=data_source.notes, ) return dataset
[docs] def load_datasets( evalrun: evalrun_schema.EvalRun, ) -> list[Dataset]: datasets = [] config = evalrun.config for data_source in evalrun.data_sources: datasource_type = type(data_source).__name__ # Auto-name unnamed IterableDataSources so same-instance reuse works if ( isinstance(data_source, evalrun_schema.IterableDataSource) and not data_source.name ): data_source.name = f"_iterable_{id(data_source)}" # 1. Validate naming constraints if config.raise_on_unnamed_dataset and ( data_source.name is None or data_source.name.strip() == "" ): raise ValueError( f"Configuration requires named datasets, but a {datasource_type} was unnamed." ) # 2. Look up existing dataset by name (if named) existing_dataset = None if data_source.name: existing_dataset = find_dataset_by_name(data_source.name) # 3. Dispatch by DataSource type if isinstance(data_source, evalrun_schema.NamedDataSource): # NamedDataSource MUST match an existing dataset if existing_dataset is None: raise ValueError( f"NamedDataSource requires an existing dataset with name '{data_source.name}', but none was found." ) dataset = existing_dataset elif isinstance( data_source, (evalrun_schema.FileDataSource, evalrun_schema.IterableDataSource), ): # Reuse if configured and existing dataset matches (checked first, takes priority) if config.reuse_dataset_by_name and existing_dataset is not None: if existing_dataset.datasource_type != datasource_type: logger.warning( f"Reusing dataset '{existing_dataset.name}' (ID={existing_dataset.id}) " f"but datasource type differs: existing={existing_dataset.datasource_type}, new={datasource_type}." ) logger.info( f"Reusing existing dataset '{existing_dataset.name}' (ID={existing_dataset.id})." ) dataset = existing_dataset else: # Check for duplicate name conflict (only when not reusing) if ( config.raise_on_duplicate_dataset_name and existing_dataset is not None ): raise ValueError( f"Configuration requires unique dataset names, but '{data_source.name}' already exists (ID={existing_dataset.id})." ) # Create and load new dataset dataset = create_dataset(data_source) if isinstance(data_source, evalrun_schema.IterableDataSource): data_loader.load_iterable(dataset, data_source.contents) elif isinstance(data_source, evalrun_schema.FileDataSource): data_loader.load_file( dataset, data_source, max_n_conversation_threads=config.max_n_conversation_threads, nb_evaluations_per_thread=config.nb_evaluations_per_thread, ) dataset.metadata_dict["imported_path"] = str(data_source.path) dataset.metadata_dict["imported_format"] = data_source.format.value dataset.is_loaded = True dataset.save() else: raise ValueError(f"Unsupported DataSource type: {datasource_type}") datasets.append(dataset) return datasets
[docs] def set_datasets_for_evalsetrun(datasets: list[Dataset], evalsetrun: EvalSetRun): for dataset in datasets: EvalSetRunDatasets.create( evalsetrun=evalsetrun, dataset=dataset, )
[docs] def build_evalsetrun_datasets( evalrun: evalrun_schema.EvalRun, evalsetrun: EvalSetRun ) -> list[Dataset]: datasets = load_datasets(evalrun) set_datasets_for_evalsetrun(datasets, evalsetrun) return datasets