Source code for flexeval.compute_metrics

"""Utilities for computing needed metric computations and actually invoking those computations."""

import copy
import importlib
import importlib.util
import inspect
import json
import logging
import string
import types
from concurrent.futures import ThreadPoolExecutor
from typing import Iterable, Union

import networkx as nx

from flexeval import function_types
from flexeval.classes.eval_set_run import EvalSetRun
from flexeval.classes.message import Message
from flexeval.classes.thread import Thread
from flexeval.classes.tool_call import ToolCall
from flexeval.classes.turn import Turn
from flexeval.configuration import completion_functions, function_metrics
from flexeval.schema import EvalRun, FunctionsCollection, eval_schema

logger = logging.getLogger(__name__)


[docs] class ObjectMetric: def __init__(self, object: Message | Turn | ToolCall | Thread, metric: dict): """Tracks a unique (object, metric) combination and any results computed for that metric. Args: object (Message | Turn | ToolCall | Thread): The object to track. metric (dict): The metric to track. """ self.object: Message | Turn | ToolCall | Thread = object self.metric: dict = metric self.metric_results: list[dict] | None = None def __repr__(self) -> str: return f"ObjectMetric(object={self.object.__class__.__name__} {self.object.id}, metric={self.metric}, metric_results={self.metric_results})"
[docs] class MetricGraphBuilder: """Builds :class:`networkx.DiGraph`\s of :class:`~flexeval.compute_metrics.ObjectMetric` instances that reflect any computational dependencies between them.""" def __init__(self): # key: tuple(metric_level, metric_id, object_id) # value: ObjectMetric self.id_to_object_metric_map = {} def build_metric_structures(self, evalsetrun: EvalSetRun): metric_id_map = {} metrics_by_level = {} for metric_instance in json.loads(evalsetrun.metrics_graph_ordered_list): metric_level = metric_instance["metric_level"] if metric_level not in metrics_by_level: metrics_by_level[metric_level] = [] metrics_by_level[metric_level].append(metric_instance) metric_id_map[metric_instance["id"]] = metric_instance self.metric_id_map = metric_id_map self.metrics_by_level = metrics_by_level def get_or_create_object_metric( self, metric_level: eval_schema.MetricLevel, object: Message | Turn | ToolCall | Thread, metric: dict, ) -> ObjectMetric: key = (metric_level, metric["id"], object.id) if key not in self.id_to_object_metric_map: object_metric = ObjectMetric(object, metric) self.id_to_object_metric_map[key] = object_metric return self.id_to_object_metric_map[key] def get_index( self, target_id: int, objects: list[Message | Turn | ToolCall | Thread] ): for i, object in enumerate(objects): if target_id == object.id: break else: raise ValueError( f"Failed to find object with id {target_id} in {len(objects)} objects." ) return i def find_object_metric_from_depends_on( self, current_object: Message | Turn | ToolCall | Thread, current_metric_level: eval_schema.MetricLevel, current_index: int, depends_on: dict, ) -> ObjectMetric | None: """ If you're a Turn metric that depends on a Message metric, then we create a dependency on ALL or ANY Message meeting the criteria. We don't know how to handle that... In contrast, if you're a Message metric that depends on a Turn metric, then we have a dependency on only a single object: that Message's Turn. """ metric_id = depends_on["parent_id"] dependency_metric_level = depends_on.get("metric_level") if dependency_metric_level is None: # if not specified in the dependency already, look up the metric level depends_on_metric = self.metric_id_map[metric_id] dependency_metric_level = depends_on_metric["metric_level"] if dependency_metric_level is None: raise ValueError( f"Metric lacks a metric level: {depends_on_metric} (matched via dependency_info: {depends_on})" ) if dependency_metric_level == current_metric_level: pass # just use current_index, no lookup needed elif current_metric_level == "ToolCall": if dependency_metric_level == "Message": current_index = self.get_index( current_object.message_id, self.objects_by_level["Message"] ) elif dependency_metric_level == "Turn": current_index = self.get_index( current_object.turn_id, self.objects_by_level["Turn"] ) elif dependency_metric_level == "Thread": current_index = 0 # only a single thread, by definition elif current_metric_level == "Message": if dependency_metric_level == "Turn": current_index = self.get_index( current_object.turn_id, self.objects_by_level["Turn"] ) elif dependency_metric_level == "Thread": current_index = 0 # only a single thread, by definition elif dependency_metric_level == "ToolCall": raise ValueError( f"Can't depend on a {dependency_metric_level} metric from a {current_metric_level} metric." ) elif current_metric_level == "Turn": if dependency_metric_level == "Thread": current_index = 0 # only a single thread, by definition else: raise ValueError( f"Can't depend on a {dependency_metric_level} metric from a {current_metric_level} metric." ) elif current_metric_level == "Thread": raise ValueError( f"Can't depend on a {dependency_metric_level} metric from a {current_metric_level} metric." ) else: raise ValueError(f"Unsupported level: {current_metric_level=}") relative_object_position = depends_on["relative_object_position"] target_object_index = current_index + relative_object_position if target_object_index < 0: logger.debug( f"Object at position {current_index} object cannot in principle satisfy this dependency, so skipping it." ) return None object = self.objects_by_level[dependency_metric_level][target_object_index] metric = self.metric_id_map[metric_id] return self.get_or_create_object_metric(dependency_metric_level, object, metric) def build_thread_task_graphs(self, evalsetrun: EvalSetRun) -> Iterable[nx.DiGraph]: threads = evalsetrun.threads for thread in threads: yield self.build_thread_task_graph(thread) def build_thread_task_graph(self, thread: Thread) -> nx.DiGraph: self.objects_by_level = { "Thread": [thread], "Turn": list(thread.turns), "Message": list(thread.messages), "ToolCall": list(thread.toolcalls), } g = nx.DiGraph() for level, metrics_at_level in self.metrics_by_level.items(): if len(metrics_at_level) == 0: continue objects = self.objects_by_level[level] for i, object in enumerate(objects): for metric in metrics_at_level: # register metric on object object_metric = self.get_or_create_object_metric( level, object, metric ) g.add_node(object_metric) if "depends_on" in metric: for dependency in metric["depends_on"]: # register dependency metric on the relevant object dependency_object_metric = ( self.find_object_metric_from_depends_on( object, level, i, dependency ) ) if dependency_object_metric is None: logger.debug( "This object cannot in principle satisfy this dependency, so skipping it." ) # TODO verify that this is the expected behavior in chained dependencies X -> Y -> Z g.remove_node(object_metric) continue g.add_node(dependency_object_metric) g.add_edge( dependency_object_metric, object_metric, depends_on=dependency, ) return g
[docs] def compute_metrics(evalrun: EvalRun, evalsetrun: EvalSetRun) -> list[dict]: n_workers = evalrun.config.max_workers raise_on_error = evalrun.config.raise_on_metric_error mgb = MetricGraphBuilder() mgb.build_metric_structures(evalsetrun) graphs = mgb.build_thread_task_graphs(evalsetrun) mc = MetricComputer.from_evalrun(evalrun, evalsetrun) metrics = [] if n_workers == 1: for graph in graphs: graph_metrics = mc.process_thread_dependency_graph(graph, raise_on_error) metrics.extend(graph_metrics) else: with ThreadPoolExecutor(max_workers=n_workers) as executor: futures = [] for graph in graphs: future = executor.submit(mc.process_thread_dependency_graph, graph) futures.append(future) for i, future in enumerate(futures): metrics.extend(future.result()) if i % 100 == 0: logger.info(f"Metrics futures resulted: {i + 1} / {len(futures)}") return metrics
[docs] class MetricComputer: @classmethod def from_evalrun( cls, evalrun: EvalRun, evalsetrun: EvalSetRun | None = None ) -> "MetricComputer": function_modules = evalrun.function_modules # convert from string module names or filepaths to Python modules actual_modules = [] for i, function_module in enumerate(function_modules): if isinstance(function_module, types.ModuleType): # already a module actual_modules.append(function_module) elif isinstance(function_module, FunctionsCollection): raise ValueError("FunctionsCollection not yet implemented!") else: # it's a filepath try: # TODO I think this is not necessary given the pydantic schema; this should always fail for filepaths # alternately, we might call import_module() on the ModuleType modules, but I think that's unnecessary module = importlib.import_module(str(function_module)) except ModuleNotFoundError as module_not_found: try: spec = importlib.util.spec_from_file_location( f"function_module_{i}", function_module ) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) except Exception as module_not_loaded: raise ValueError( f"Failed to load function module specified by {function_module}. (module not found: {module_not_found}, and failed to load from file location: {module_not_loaded})" ) actual_modules.append(module) if evalrun.add_default_functions and function_metrics not in actual_modules: actual_modules.append(function_metrics) mc = cls(actual_modules, evalsetrun) # validation step: verify that all functions are present missing_functions = set() if evalrun.eval.metrics.function is not None: for function_item in evalrun.eval.metrics.function: try: mc.find_function(function_item.name) except ValueError: missing_functions.add(function_item.name) if len(missing_functions) > 0: raise ValueError( f"Failed to find {len(missing_functions)} functions in the provided function module. Missing function names: {', '.join(sorted(missing_functions))}" ) # validation step: verify that all rubrics are present missing_rubrics = set() if mc.rubrics is not None and evalrun.eval.metrics.rubric is not None: for rubric_item in evalrun.eval.metrics.rubric: if rubric_item.name not in mc.rubrics: missing_rubrics.add(rubric_item.name) if len(missing_rubrics) > 0: raise ValueError( f"Failed to find {len(missing_rubrics)} rubrics in the provided rubric set. Missing rubric names: {', '.join(sorted(missing_rubrics))}" ) return mc def __init__(self, function_modules: list, evalsetrun: EvalSetRun | None = None): self.function_modules: list = function_modules self.rubrics: dict | None = ( self.load_rubrics(evalsetrun) if evalsetrun is not None else None ) def load_rubrics(self, evalsetrun: EvalSetRun): """Set the rubrics to be used by this MetricComputer from the given EvalSetRun.""" self.rubrics = json.loads(evalsetrun.rubrics) def process_thread_dependency_graphs( self, graph_list: Iterable[nx.DiGraph] ) -> list[dict]: evaluated_metrics = [] for g in graph_list: evaluated_metrics.extend(self.process_thread_dependency_graph(g)) return evaluated_metrics def process_thread_dependency_graph( self, g: nx.DiGraph, raise_on_error: bool = True ) -> list[dict]: evaluated_metrics = [] try: for object_metric in nx.topological_sort(g): all_dependencies_met = True for dependency in g.predecessors(object_metric): if dependency.metric_results is None: raise ValueError( f"FlexEval error: expected metric_result for dependency {dependency.metric['evaluation_name']} to be computed before processing metric {object_metric.metric['evaluation_name']}." ) dependency_info = g.get_edge_data(dependency, object_metric)[ "depends_on" ] dependency_met = False if ( "metric_name" in dependency_info and dependency_info["metric_name"] is not None and dependency_info["metric_name"] != dependency.metric["evaluation_name"] ): for metric_result in dependency.metric_results: # expected key must be present and in the expected range if ( dependency_info["metric_name"] == metric_result["metric_name"] ): dependency_met = ( metric_result["metric_value"] >= dependency_info["metric_min_value"] ) and ( metric_result["metric_value"] <= dependency_info["metric_max_value"] ) break else: logger.debug( f"Key {dependency_info['metric_name']} not found in results for dependency {dependency.metric['evaluation_name']}." ) elif len(dependency.metric_results) == 1: metric_result = dependency.metric_results[0] dependency_met = ( metric_result["metric_value"] >= dependency_info["metric_min_value"] ) and ( metric_result["metric_value"] <= dependency_info["metric_max_value"] ) elif len(dependency.metric_results) == 0: logger.debug( f"Skipping metric because dependency '{dependency.metric['evaluation_name']}' has no results." ) else: raise ValueError( f"Not sure how to evaluate dependency '{dependency.metric['evaluation_name']}' for metric '{object_metric.metric['evaluation_name']}', as it has {len(dependency.metric_results)} results but no specified key." ) if not dependency_met: all_dependencies_met = False logger.debug( f"Value for metric '{dependency.metric['evaluation_name']}' not in range for dependency {dependency_info}." ) break if all_dependencies_met: # TODO in the future, we could pass some metric_results as kwargs to the metric function # or as a special formatting key to the rubric metric_results = self.compute_metric( object_metric.object, **object_metric.metric ) object_metric.metric_results = metric_results evaluated_metrics.extend(metric_results) else: # no results for this metric, as dependencies were unmet object_metric.metric_results = [] self._validate_metrics(evaluated_metrics) except Exception as ex: logger.exception(f"An error occurred during metric processing: {ex}") if raise_on_error: raise return evaluated_metrics def compute_metrics(self, object: Union[Thread, Turn, Message, ToolCall]): """we've defined a variable called metrics_to_evaluate it's a list we need to loop through each entry looks like this { 'name': 'string_length', 'type': 'function', 'kwargs': {}, 'depends_on': [] } """ # we'll keep the results in a list # for each new metric, if it has dependencies, we'll need to make sure they're met - otherwise we won't run it evaluated_metrics = [] # METRICS IN ORDER for metric_to_evaluate in object.metrics_to_evaluate: # see if there's a dependency dependencies_are_all_met = True # If there are no dependencies, this loop won't execute # and the metric will be evaluated if ( "depends_on" in metric_to_evaluate and len(metric_to_evaluate["depends_on"]) > 0 ): # here, we have a metric with 1+ dependencies # ALL of these dependencies must be satisfied # we determine whether a given metric is a match if it matches # 1 - the id # 2 - the metric_name # 3 - the metric_min_value # 4 - the metric_max_value # not meeting ANY of them will short-circuit the loop and cause the eval to not evaluate # check all dependencies for dependency in metric_to_evaluate["depends_on"]: # for each dependency, assume it's not met # if it's in the list AND its values meet the criteria, it's met dependency_is_met = False # if a specific metric_name was specified, you need to match exactly: for em in evaluated_metrics: # 'depends_on' will have all fields populated at this point if em["id"] == dependency["parent_id"]: if ( em["metric_value"] >= dependency["metric_min_value"] and em["metric_value"] <= dependency["metric_max_value"] ): # this specific dependency was met - can quit looking dependency_is_met = True break else: logger.debug( f"Metric value '{em['metric_value']}' not in range for dependency id='{dependency['parent_id']}'." ) if not dependency_is_met: dependencies_are_all_met = False # if even one dependency is not met - don't do the evaluation break if dependencies_are_all_met: # pass through arguments, but add 'self' as the turn # ONLY call if dependencies are ALL met # TODO - maybe in the future we'll want to add the computed value from # the dependency through as an argument here metric_results = self.compute_metric(object, **metric_to_evaluate) evaluated_metrics.extend(metric_results) else: logger.debug( f"Skipping metric '{em['metric_name']}' (id='{em['id']}') due to unmet dependencies." ) return evaluated_metrics def compute_metric( self, object: Union[Thread, Turn, Message, ToolCall], evaluation_name: str, evaluation_type: str, metric_level: str, kwargs: dict, context_only: bool = None, depends_on: list = None, id: int = None, notes: str = None, # just a placeholder ) -> list[dict]: if evaluation_type == "function": metrics = self.compute_function_metric( function_name=evaluation_name, metric_kwargs=kwargs, metric_level=metric_level, context_only=context_only, input_object=object, depends_on=depends_on, id=id, ) elif evaluation_type == "rubric": metrics = self.compute_rubric_metric( rubric_name=evaluation_name, metric_kwargs=kwargs, metric_level=metric_level, object=object, depends_on=depends_on, id=id, ) else: raise ValueError( f"The argument evaluation_type provided to compute_metric is invalid. Must be one of `function` or `rubric`. You passed `{type}`." ) self._validate_metrics(metrics) return metrics def _validate_metrics(self, metrics: list[dict]): for m in metrics: if m.get("evaluation_type", None) is None: raise ValueError( f"Metric {m} does not have a value for the key `type`." ) if m.get("metric_value", None) is None: raise ValueError( f"Metric {m} does not have a value for the key `metric_value`." ) def invoke_function( self, metric_function: callable, metric_level: eval_schema.MetricLevel, input_object: function_types.AnyFunctionObjectInput, metric_kwargs: dict, context_only: bool, ): function_input = function_types.get_function_input( metric_function, metric_level, input_object, context_only ) metrics_result = metric_function(function_input, **metric_kwargs) return metrics_result def find_function(self, function_name: str): for function_module in self.function_modules: if hasattr(function_module, function_name) and callable( getattr(function_module, function_name) ): metric_function = getattr(function_module, function_name) metric_source = inspect.getsource(metric_function) return metric_function, metric_source raise ValueError( f"Metric function with name `{function_name}` was not found in any of the {len(self.function_modules)} provided function modules." ) def compute_function_metric( self, function_name: str, metric_kwargs: dict, input_object: Union[Thread, Turn, Message, ToolCall], metric_level: eval_schema.MetricLevel, context_only: bool, depends_on: list, id: int, ): # this is NOT a method - it's a function b/c we want it to be able to return multiple metrics, if more than one is returned # they share most of the same information though so it's convenient to have them constructed similarly # will return a list of dictionaries # Check if the function exists in any of the function namespaces metric_function, metric_source = self.find_function(function_name) metrics_result = self.invoke_function( metric_function, metric_level, input_object, metric_kwargs, context_only ) base_result = { metric_level.lower(): input_object, "evaluation_name": function_name, "evaluation_type": "function", "metric_level": metric_level, "kwargs": metric_kwargs, "source": metric_source, # TODO - put this back? "context_only": context_only, "depends_on": depends_on, "id": id, } # now deal with output if isinstance(metrics_result, float) or isinstance(metrics_result, int): result = copy.deepcopy(base_result) result["metric_name"] = function_name result["metric_value"] = metrics_result return [result] elif isinstance(metrics_result, dict): result_list = [] # TODO rethink this behavior for k, v in metrics_result.items(): result = copy.deepcopy(base_result) if "metric_name" in result and result["metric_name"] != k: logger.warning( f"Overriding metric_name in metric result with '{k}' (was '{result['metric_name']}')." ) result["metric_name"] = k result["metric_value"] = float(v) result_list.append(result) return result_list elif isinstance(metrics_result, list): result_list = [] for entry in metrics_result: result = copy.deepcopy(base_result) result["metric_name"] = entry.get("name", None) result["metric_value"] = float(entry.get("value", None)) result_list.append(result) return result_list else: raise ValueError( f"The metric type returned from `{metric_function}` is not a supported type. It must be one of `list`, `int`, `float`, or `dict`. You supplied `{type(metrics_result)}`." ) def compute_rubric_metric( self, rubric_name: str, metric_kwargs: dict, object: Union[Thread, Turn, Message], metric_level: str, depends_on: list, id: int, ): if self.rubrics is not None: rubrics = self.rubrics else: rubrics = json.loads(object.evalsetrun.rubrics) if rubric_name not in rubrics: raise ValueError( f"You requested a rubric called `{rubric_name}`, but only these were found: {rubrics.keys()}." ) prompt = rubrics.get(rubric_name).get("prompt", "") # format input for rubric conversation, context, content, tool_calls = object.format_input_for_rubric() # conversation : all turns; context: all turns without the last entry; completion: only the last entry # use three keywords: # #{conversation} -- The whole conversation # #{context} -- The previous turns without the current entry # #{content} -- Only the current turn / message / toolcall depending on the metric_level # for the future: add {compeltion} under the condition of do_completion == True # Add verfication steps before populating the rubric # case 1: {conversation} and {context} should not go together # case 2: {completion} and {content} should not go together # case 3: if there is a {completion}, do_completion should be true if "{conversation}" in prompt and "{context}" in prompt: raise Exception( "Your rubric should not have both {conversation} and {context}. Please check the README file for more information about how to write FlexEval rubrics." ) if "{completion}" in prompt and "{content}" in prompt: raise Exception( "Your rubric should not have both {content} and {completion}. Please check the README file for more information about how to write FlexEval rubrics." ) if "{completion}" in prompt and not object.evalsetrun.do_completion: raise Exception( "Your rubric has {completion}, but in your test specification for this rubric evaluation, do_completion is not True. Please check the README file for more information about how to write FlexEval rubrics." ) populated_prompt = prompt.format( conversation=conversation, context=context, content=content, tool_calls=tool_calls, ) # with do_completion == True, only the completion is evaluated with or without the context. if object.evalsetrun.do_completion and "{completion}" in prompt: # TODO revisit this logic # also included object.is_completion, which only works for Message rubrics # but we can in principle check for a message in either a turn or a thread with is_flexeval_completion true populated_prompt = prompt.format(completion=content) choice_scores = rubrics.get(rubric_name).get("choice_scores") # get rubric grader if object.evalsetrun.grader_llm is None or object.evalsetrun.grader_llm == "": raise ValueError( "Attempting to evaluate a rubric metric, but no grader LLM defined." ) grader_completion_function = json.loads(object.evalsetrun.grader_llm) if grader_completion_function is None or len(grader_completion_function) == 0: raise ValueError( "Attempting to evaluate a rubric metric, but no grader LLM defined." ) grader_completion_fn_name = grader_completion_function.get( "function_name", None ) grader_completion_fn_kwargs = grader_completion_function.get("kwargs", {}) if hasattr(completion_functions, grader_completion_fn_name) and hasattr( completion_functions, grader_completion_fn_name ): completion_function = getattr( completion_functions, grader_completion_fn_name, None ) DEFAULT_COT_TEMPLATE = "\nBefore answering, I will reason in a step-by-step manner as to get the right answer, then conclude with the answer in the format requested." ANSWER_PROMPT = f"""" First, write out in a step by step manner your reasoning to be sure that your conclusion is correct. Avoid simply stating the correct answer at the outset. Then print only a single choice from {list(choice_scores.keys())} (without quotes or punctuation) on its own line corresponding to the correct answer. At the end, repeat just the answer by itself on a new line. Reasoning:""".strip() # This is the call to the grader completion function completion = completion_function( conversation_history=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "assistant", "content": DEFAULT_COT_TEMPLATE}, {"role": "user", "content": populated_prompt + ANSWER_PROMPT}, {"role": "assistant", "content": DEFAULT_COT_TEMPLATE}, ], **grader_completion_fn_kwargs, ) completion_text = completion["choices"][0]["message"]["content"] # use logic from Evals # https://github.com/openai/evals/blob/d3dc89042ddee879a68a326fdb37716ee518640c/evals/elsuite/modelgraded/classify_utils.py#L29 choice_scores["__invalid__"] = None def get_match(completion_text: str, choice_scores: dict): MATCH_FNS = { "include": lambda x, y: float(x in y), "exact": lambda x, y: float(x == y), "endswith": lambda x, y: x.endswith(y), "starts_or_endswith": lambda x, y: x.startswith(y) or x.endswith(y), } lines = completion_text.strip().split("\n") lines = lines[::-1] # reverse lines for line in lines: line = line.strip() line = "".join(c for c in line if c not in string.punctuation) if not line: continue for choice in choice_scores.keys(): if MATCH_FNS["starts_or_endswith"](line, choice): return choice return "__invalid__" score = get_match( completion_text=completion_text, choice_scores=choice_scores ) result = { metric_level.lower(): object, "metric_name": rubric_name, "evaluation_name": rubric_name, "evaluation_type": "rubric", "id": id, "kwargs": metric_kwargs, "depends_on": depends_on, "source": populated_prompt, "metric_level": metric_level, "metric_value": choice_scores[score], "rubric_prompt": populated_prompt, "rubric_completion": completion_text, "rubric_model": completion.get("model", None), "rubric_completion_tokens": completion.get("usage", {}).get( "completion_tokens", 0 ), "rubric_prompt_tokens": completion.get("usage", {}).get( "prompt_tokens", 0 ), "rubric_score": score, } return [result]
[docs] def add_all_metrics_to_objects(iterable_of_objects, metrics): """ Adds all metric instances in metrics_for_level to each instance of an evaluable object (e.g., Turn, Thread, Message, or ToolCall) in iterable_of_objects. This addition is done by appending to the `metrics_to_evaluate` field, which all instances in iterable_of_objects should have. :param iterable_of_objects: list of objects that have a metrics_to_evaluate field :param metrics: list of metric instances to add to each object """ for object in iterable_of_objects: # Field metrics_to_evaluate initialized in constructor # metric dependencies happen WITHIN turns, rather than across # this means I can associate a sequence of metrics within each turn # but then have the turns execute them in parallel # each turn will keep track of its own set of metrics # Keeping this as a loop to do the rubric_count appropriately object.metrics_to_evaluate = object.metrics_to_evaluate + metrics
[docs] def count_rubric_metrics(iterable_of_objects): """ Returns the total number of rubric type metrics in the metrics_to_evaluate field in each object. :param iterable_of_objects: list of objects that have a metrics_to_evaluate field """ rubric_count = 0 for object in iterable_of_objects: for metric_instance in object.metrics_to_evaluate: if metric_instance.get("evaluation_type") == "rubric": rubric_count += 1 return rubric_count