Skip to content

Benchmarking

Note

If you aren't familiar with our benchmark package, please read RAI Bench first.

Currently, we offer 2 predefined benchmarks:

If you want to test multiple models across different benchmark configurations, go to Testing Models.

If your goal is creating custom tasks and scenarios, visit Creating Custom Tasks.

Manipulation O3DE

  • Follow setup from Manipulation demo Setup
  • Create the O3DE config:
    binary_path: /path/to/binary/RAIManipulationDemo.GameLauncher
    level: RoboticManipulationBenchmark
    robotic_stack_command: ros2 launch examples/manipulation-demo-no-binary.launch.py
    required_simulation_ros2_interfaces:
    services:
        - /spawn_entity
        - /delete_entity
    topics:
        - /color_image5
        - /depth_image5
        - /color_camera_info5
    actions: []
    required_robotic_ros2_interfaces:
    services:
        - /grounding_dino_classify
        - /grounded_sam_segment
        - /manipulator_move_to
    topics: []
    actions: []
    
  • Run the benchmark with:

    python src/rai_bench/rai_bench/examples/manipulation_o3de.py --model-name <your-model> --vendor <your-vendor> --o3de-config-path <path/to/your/config> --levels <trivial, easy>
    

    Warning

    Running all scenarios will take a while. If you want to just try it out, we recommend choosing just one level of difficulty.

Tool Calling Agent

This benchmark does not require any additional setup besides the main one Basic Setup, just run:

python src/rai_bench/rai_bench/examples/tool_calling_agent.py --model-name <model-name> --vendor <vendor> --extra-tool-calls <5> --task-types <basic> --out-dir <out_dir>

Note

This Benchmark is significantly faster, but still if just trying out, we recommend choosing just one task-type.

Testing Models

The best way of benchmarking your models is using the rai_bench.test_models function with benchmark configs.

test_models function definition

rai_bench.test_models.test_models(model_names, vendors, benchmark_configs, out_dir, additional_model_args=None)

Source code in rai_bench/test_models.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def test_models(
    model_names: List[str],
    vendors: List[str],
    benchmark_configs: List[BenchmarkConfig],
    out_dir: str,
    additional_model_args: Optional[List[Dict[str, Any]]] = None,
):
    if additional_model_args is None:
        additional_model_args = [{} for _ in model_names]

    experiment_id = uuid.uuid4()
    if len(model_names) != len(vendors):
        raise ValueError("Number of passed models must match number of passed vendors")
    else:
        for bench_conf in benchmark_configs:
            # for each bench configuration seperate run folder
            now = datetime.now()
            run_name = f"run_{now.strftime('%Y-%m-%d_%H-%M-%S')}"
            for i, model_name in enumerate(model_names):
                for u in range(bench_conf.repeats):
                    curr_out_dir = (
                        out_dir
                        + "/"
                        + run_name
                        + "/"
                        + bench_conf.name
                        + "/"
                        + model_name
                        + "/"
                        + str(u)
                    )
                    llm = get_llm_for_benchmark(
                        model_name=model_name,
                        vendor=vendors[i],
                        **additional_model_args[i],
                    )
                    bench_logger = define_benchmark_logger(out_dir=Path(curr_out_dir))
                    try:
                        if isinstance(bench_conf, ToolCallingAgentBenchmarkConfig):
                            tool_calling_tasks = tool_calling_agent.get_tasks(
                                extra_tool_calls=bench_conf.extra_tool_calls,
                                complexities=bench_conf.complexities,
                                task_types=bench_conf.task_types,
                            )
                            tool_calling_agent.run_benchmark(
                                llm=llm,
                                out_dir=Path(curr_out_dir),
                                tasks=tool_calling_tasks,
                                experiment_id=experiment_id,
                                bench_logger=bench_logger,
                            )
                        elif isinstance(bench_conf, ManipulationO3DEBenchmarkConfig):
                            manipulation_o3de_scenarios = (
                                manipulation_o3de.get_scenarios(
                                    levels=bench_conf.levels,
                                    logger=bench_logger,
                                )
                            )
                            manipulation_o3de.run_benchmark(
                                llm=llm,
                                out_dir=Path(curr_out_dir),
                                o3de_config_path=bench_conf.o3de_config_path,
                                scenarios=manipulation_o3de_scenarios,
                                experiment_id=experiment_id,
                                bench_logger=bench_logger,
                            )
                    except Exception as e:
                        bench_logger.critical(f"BENCHMARK RUN FAILED: {e}")
                        bench_logger.critical(
                            f"{bench_conf.name} benchmark for {model_name}, vendor: {vendors[i]}, execution number: {u + 1}"
                        )

Example usage:

from rai_bench import (
    ManipulationO3DEBenchmarkConfig,
    ToolCallingAgentBenchmarkConfig,
    test_models,
)

if __name__ == "__main__":
    # Define models you want to benchmark
    model_names = ["qwen2.5:7b", "llama3.2:3b"]
    vendors = ["ollama", "ollama"]

    # Define benchmarks that will be used
    man_conf = ManipulationO3DEBenchmarkConfig(
        o3de_config_path="path/to/your/o3de_config.yaml",  # path to your O3DE config
        levels=[  # define what difficulty of tasks to include in benchmark
            "trivial",
        ],
        repeats=1,  # how many times to repeat
    )
    tool_conf = ToolCallingAgentBenchmarkConfig(
        extra_tool_calls=5,  # how many extra tool calls allowed to still pass
        task_types=[  # what types of tasks to include
            "basic",
            "spatial_reasoning",
            "manipulation",
        ],
        repeats=1,
    )

    out_dir = "src/rai_bench/rai_bench/experiments"
    test_models(
        model_names=model_names,
        vendors=vendors,
        benchmark_configs=[man_conf, tool_conf],
        out_dir=out_dir,
    )

Viewing Results

From every benchmark run, there will be results saved in the provided output directory:

  • Logs - in benchmark.log file
  • results_summary.csv - for overall metrics
  • results.csv - for detailed results of every task/scenario

When using test_models, the output directories will be saved as <run_datetime>/<benchmark_name>/<models>/<repeat>/... and this format can be visualized with our Streamlit script:

streamlit run src/rai_bench/rai_bench/examples/visualise_streamlit.py

Creating Custom Tasks

Manipulation O3DE Scenarios

To create your own Scenarios, you will need a Scene Config and Task. You can combine already existing Scene and existing Task to create a new Scenario like:

from pathlib import Path
from rai_bench.manipulation_o3de.tasks import PlaceObjectAtCoordTask
from rai_sim.simulation_bridge import SceneConfig
from rai_bench.manipulation_o3de.benchmark import Scenario


path_to_your_config = "src/rai_bench/rai_bench/manipulation_o3de/predefined/configs/1a.yaml"
scene_config = SceneConfig.load_base_config(Path(path_to_your_config))

# configure existing Task with different params
target_coords = (0.1, 0.1)
disp = 0.1
task = PlaceObjectAtCoordTask(
    obj_type="apple",
    target_position=target_coords,
    allowable_displacement=disp,
)

Scenario(task=task, scene_config=scene_config, scene_config_path=path_to_your_config)

But you can also create them from scratch. Creating a Scene Config is very easy, just declare entities in a YAML file like:

entities:
  - name: apple1
    prefab_name: apple # make sure that this prefab exists in simulation
      pose:
          translation:
              x: 0.0
              y: 0.5
              z: 0.05
          rotation:
              x: 0.0
              y: 0.0
              z: 0.0
              w: 1.0

Creating your own Task will require slightly more effort. Let's start with something simple - a Task that will require throwing given objects off the table:

import logging
from typing import List, Tuple, Union
from rclpy.impl.rcutils_logger import RcutilsLogger
from rai_bench.manipulation_o3de.interfaces import (
    ManipulationTask,
)
from rai_sim.simulation_bridge import Entity, SimulationConfig

loggers_type = Union[RcutilsLogger, logging.Logger]


class ThrowObjectsOffTableTask(ManipulationTask):
    def __init__(self, obj_types: List[str], logger: loggers_type | None = None):
        super().__init__(logger=logger)
        # obj_types is a list of objects that are subject of the task
        # In this case, it will mean which objects should be thrown off the table
        # can be any objects
        self.obj_types = obj_types

    @property
    def task_prompt(self) -> str:
        # define prompt
        obj_names = ", ".join(obj + "s" for obj in self.obj_types).replace("_", " ")
        # 0.0 z is the level of table, so any coord below that means it is off the table
        return f"Manipulate objects, so that all of the {obj_names} are thrown off the table (negative z)"

    def check_if_required_objects_present(
        self, simulation_config: SimulationConfig
    ) -> bool:
        # Validate if any required objects are present in sim config
        # if there is not a single object of provided type, there is no point in running
        # this task of given scene config
        count = sum(
            1 for ent in simulation_config.entities if ent.prefab_name in self.obj_types
        )
        return count > 1

    def calculate_correct(self, entities: List[Entity]) -> Tuple[int, int]:
        selected_type_objects = self.filter_entities_by_object_type(
            entities=entities, object_types=self.obj_types
        )

        # check how many objects are below table, that will be our metric
        correct = sum(
            1 for ent in selected_type_objects if ent.pose.pose.position.z < 0.0
        )

        incorrect: int = len(selected_type_objects) - correct
        return correct, incorrect

As obj_types is parameterizable, it enables various variants of this Task. In combination with a lot of simulation configs available, it means that a single Task can provide dozens of scenarios.

Congratulations, you just created your first Scenario from scratch!

Tool Calling Tasks

To create a Tool Calling Task, you will need to define Subtasks, Validators, and Task itself. Let's create a basic task that requires using a tool to receive a message from a specific topic.

from rai_bench.tool_calling_agent.subtasks import (
    CheckArgsToolCallSubTask,
)
from rai_bench.tool_calling_agent.validators import (
    OrderedCallsValidator,
)
from rai_bench.tool_calling_agent.tasks.basic import BasicTask
from rai_bench.tool_calling_agent.mocked_tools import (
    MockGetROS2TopicsNamesAndTypesTool,
)
from langchain_core.tools import BaseTool
from typing import List

# configure existing Task with different params
target_coords = (0.1, 0.1)
disp = 0.1
task = PlaceObjectAtCoordTask(
    obj_type="apple",
    target_position=target_coords,
    allowable_displacement=disp,
)

Scenario(task=task, scene_config=scene_config, scene_config_path=path_to_your_config)

# define subtask that requires
receive_robot_pos_subtask = CheckArgsToolCallSubTask(
    expected_tool_name="receive_ros2_message",
    expected_args={"topic": "/robot_position"},
    expected_optional_args={
        "timeout_sec": int
    },  # if there is not exact value expected, you can pass type
)
# use OrderedCallValidator as there is only 1 subtask
topics_ord_val = OrderedCallsValidator(subtasks=[receive_robot_pos_subtask])


class GetROS2RobotPositionTask(BasicTask):
    complexity = "easy"

    @property
    def available_tools(self) -> List[BaseTool]:
        return [
            # define which topics will be seen by agent
            MockGetROS2TopicsNamesAndTypesTool(
                mock_topics_names_and_types=[
                    "topic: /attached_collision_object\ntype: moveit_msgs/msg/AttachedCollisionObject\n",
                    "topic: /clock\ntype: rosgraph_msgs/msg/Clock\n",
                    "topic: /collision_object\ntype: moveit_msgs/msg/CollisionObject\n",
                    "topic: /robot_position\n type: sensor_msgs/msg/RobotPosition",
                ]
            ),
        ]

    def get_prompt(self) -> str:
        return "Get the position of the robot."

# optionally pass number of extra tool calls
task = GetROS2RobotPositionTask(validators=[topics_ord_val], extra_tool_calls=1)