Benchmarking¶
Note
If you aren't familiar with our benchmark package, please read RAI Bench first.
Currently, we offer 2 predefined benchmarks:
If you want to test multiple models across different benchmark configurations, go to Testing Models.
If your goal is creating custom tasks and scenarios, visit Creating Custom Tasks.
Manipulation O3DE¶
- Follow the main setup Basic Setup and setup from Manipulation demo Setup
- To see available options run:
python src/rai_bench/rai_bench/examples/manipulation_o3de.py --help
-
Example usage:
python src/rai_bench/rai_bench/examples/manipulation_o3de.py --model-name qwen2.5:7b --vendor ollama --levels trivial
Note
When using Ollama, be sure to pull the model first.
Warning
Running all scenarios will take a while. If you want to just try it out, we recommend choosing just one level of difficulty.
Tool Calling Agent¶
- This benchmark does not require any additional setup besides the main one Basic Setup
- To see available options run:
python src/rai_bench/rai_bench/examples/tool_calling_agent.py --help
- Example usage:
python src/rai_bench/rai_bench/examples/tool_calling_agent.py --model-name qwen2.5:7b --vendor ollama --extra-tool-calls 5 --task-types basic --n-shots 5 --prompt-detail descriptive --complexities easy
Testing Models¶
The best way of benchmarking your models is using the src/rai_bench/rai_bench/examples/benchmarking_models.py
Feel free to modify the benchmark configs to suit your needs, you can choose every possible set of params and the benchmark will be run tasks with every combination:
if __name__ == "__main__":
# Define models you want to benchmark
model_names = ["qwen3:4b", "llama3.2:3b"]
vendors = ["ollama", "ollama"]
# Define benchmarks that will be used
mani_conf = ManipulationO3DEBenchmarkConfig(
o3de_config_path="src/rai_bench/rai_bench/manipulation_o3de/predefined/configs/o3de_config.yaml",
levels=[ # define what difficulty of tasks to include in benchmark
"trivial",
"easy",
],
repeats=1, # how many times to repeat
)
tool_conf = ToolCallingAgentBenchmarkConfig(
extra_tool_calls=[0, 5], # how many extra tool calls allowed to still pass
task_types=[ # what types of tasks to include
"basic",
"custom_interfaces",
],
N_shots=[0, 2], # examples in system prompt
prompt_detail=["brief", "descriptive"], # how descriptive should task prompt be
repeats=1,
)
out_dir = "src/rai_bench/rai_bench/experiments"
test_models(
model_names=model_names,
vendors=vendors,
benchmark_configs=[mani_conf, tool_conf],
out_dir=out_dir,
# if you want to pass any additinal args to model
additional_model_args=[
{"reasoning": False},
{},
],
)
Based on the example above the Tool Calling
benchmark will run basic and custom_interfaces tasks with every configuration of [extra_tool_calls x N_shots x prompt_detail] provided which will result in almost 500 tasks. Manipulation benchmark will run all specified task level once as there is no additional params. Reapeat is set to 1 in both configs so there will be no additional runs.
Note
When using ollama vendor make sure to pull used models first
Viewing Results¶
From every benchmark run, there will be results saved in the provided output directory:
- Logs - in
benchmark.log
file - results_summary.csv - for overall metrics
- results.csv - for detailed results of every task/scenario
When using test_models
, the output directories will be saved as <run_datetime>/<benchmark_name>/<model>/<repeat>/...
and this format can be visualized with our Streamlit script:
streamlit run src/rai_bench/rai_bench/examples/visualise_streamlit.py
Creating Custom Tasks¶
Manipulation O3DE Scenarios¶
To create your own Scenarios, you will need a Scene Config and Task - check out example src/rai_bench/rai_bench/examples/custom_scenario.py
.
You can combine already existing Scene and existing Task to create a new Scenario like:
import logging
from pathlib import Path
from typing import List, Sequence, Tuple, Union
from rclpy.impl.rcutils_logger import RcutilsLogger
from rai_bench.manipulation_o3de.benchmark import Scenario
from rai_bench.manipulation_o3de.interfaces import (
ManipulationTask,
)
from rai_bench.manipulation_o3de.tasks import PlaceObjectAtCoordTask
from rai_sim.simulation_bridge import Entity, SceneConfig
loggers_type = Union[RcutilsLogger, logging.Logger]
### Define your scene setup ####################3
path_to_your_config = (
"src/rai_bench/rai_bench/manipulation_o3de/predefined/configs/1a.yaml"
)
scene_config = SceneConfig.load_base_config(Path(path_to_your_config))
# configure existing Task with different params
target_coords = (0.1, 0.1)
disp = 0.1
task = PlaceObjectAtCoordTask(
obj_type="apple",
target_position=target_coords,
allowable_displacement=disp,
)
Scenario(task=task, scene_config=scene_config, scene_config_path=path_to_your_config)
But you can also create them from scratch. Creating a Scene Config is very easy, just declare entities in a YAML file like:
entities:
- name: apple1
prefab_name: apple # make sure that this prefab exists in simulation
pose:
translation:
x: 0.0
y: 0.5
z: 0.05
rotation:
x: 0.0
y: 0.0
z: 0.0
w: 1.0
Creating your own Task will require slightly more effort. Let's start with something simple - a Task that will require throwing given objects off the table:
class ThrowObjectsOffTableTask(ManipulationTask):
def __init__(self, obj_types: List[str], logger: loggers_type | None = None):
super().__init__(logger=logger)
# obj_types is a list of objects that are subject of the task
# In this case, it will mean which objects should be thrown off the table
# can be any objects
self.obj_types = obj_types
@property
def task_prompt(self) -> str:
# define prompt
obj_names = ", ".join(obj + "s" for obj in self.obj_types).replace("_", " ")
# 0.0 z is the level of table, so any coord below that means it is off the table
return f"Manipulate objects, so that all of the {obj_names} are dropped outside of the table (for example y<-0.75)."
def check_if_required_objects_present(self, simulation_config: SceneConfig) -> bool:
# Validate if any required objects are present in sim config
# if there is not a single object of provided type, there is no point in running
# this task of given scene config
count = sum(
1 for ent in simulation_config.entities if ent.prefab_name in self.obj_types
)
return count > 1
def calculate_correct(self, entities: Sequence[Entity]) -> Tuple[int, int]:
selected_type_objects = self.filter_entities_by_object_type(
entities=entities, object_types=self.obj_types
)
# check how many objects are below table, that will be our metric
correct = sum(
1 for ent in selected_type_objects if ent.pose.pose.position.z < 0.0
)
incorrect: int = len(selected_type_objects) - correct
return correct, incorrect
# configure existing Task with different params
target_coords = (0.1, 0.1)
disp = 0.1
task = ThrowObjectsOffTableTask(
obj_types=["apple"],
)
super_scenario = Scenario(
task=task, scene_config=scene_config, scene_config_path=path_to_your_config
)
As obj_types
is parameterizable, it enables various variants of this Task. In combination with a lot of simulation configs available, it means that a single Task can provide dozens of scenarios.
Then yo test it simply run:
##### Now you can run it in benchmark ##################
if __name__ == "__main__":
from pathlib import Path
from rai_bench import (
define_benchmark_logger,
)
from rai_bench.manipulation_o3de import run_benchmark
from rai_bench.utils import get_llm_for_benchmark
experiment_dir = Path(out_dir="src/rai_bench/experiments/custom_task/")
experiment_dir.mkdir(parents=True, exist_ok=True)
bench_logger = define_benchmark_logger(out_dir=experiment_dir)
llm = get_llm_for_benchmark(
model_name="gpt-4o",
vendor="openai",
)
run_benchmark(
llm=llm,
out_dir=experiment_dir,
# use your scenario
scenarios=[super_scenario],
bench_logger=bench_logger,
)
Congratulations, you just created and launched your first Scenario from scratch!
Tool Calling Tasks¶
To create a Tool Calling Task, you will need to define Subtasks, Validators, and Task itself.
Check the example src/rai_bench/rai_bench/examples/custom_task.py
.
Let's create a basic task that requires using a tool to receive a message from a specific topic.
from typing import List
from langchain_core.tools import BaseTool
from rai_bench.tool_calling_agent.interfaces import Task, TaskArgs
from rai_bench.tool_calling_agent.mocked_tools import (
MockGetROS2TopicsNamesAndTypesTool,
MockReceiveROS2MessageTool,
)
from rai_bench.tool_calling_agent.subtasks import (
CheckArgsToolCallSubTask,
)
from rai_bench.tool_calling_agent.validators import (
OrderedCallsValidator,
)
# This Task will check if robot can receive msessage from specified topic
class GetROS2RobotPositionTask(Task):
complexity = "easy"
type = "custom"
@property
def available_tools(self) -> List[BaseTool]:
# define topics that will be seen by agent
TOPICS = [
"/robot_position",
"/attached_collision_object",
"/clock",
"/collision_object",
]
TOPICS_STRING = [
"topic: /attached_collision_object\ntype: moveit_msgs/msg/AttachedCollisionObject\n",
"topic: /clock\ntype: rosgraph_msgs/msg/Clock\n",
"topic: /collision_object\ntype: moveit_msgs/msg/CollisionObject\n",
"topic: /robot_position\n type: sensor_msgs/msg/RobotPosition",
]
# define which tools will be available for agent
return [
MockGetROS2TopicsNamesAndTypesTool(
mock_topics_names_and_types=TOPICS_STRING
),
MockReceiveROS2MessageTool(available_topics=TOPICS),
]
def get_system_prompt(self) -> str:
return "You are a ROS 2 expert that want to solve tasks. You have access to various tools that allow you to query the ROS 2 system."
def get_base_prompt(self) -> str:
return "Get the position of the robot."
def get_prompt(self) -> str:
# Create versions for different levels
if self.prompt_detail == "brief":
return self.get_base_prompt()
else:
return (
f"{self.get_base_prompt()} "
"You can discover what topics are currently active."
)
@property
def optional_tool_calls_number(self) -> int:
# Listing topics before getting any message is fine
return 1
# define subtask
receive_robot_pos_subtask = CheckArgsToolCallSubTask(
expected_tool_name="receive_ros2_message",
expected_args={"topic": "/robot_position"},
expected_optional_args={
"timeout_sec": int # if there is not exact value expected, you can pass type
},
)
# use OrderedCallValidator as there is only 1 subtask to check
topics_ord_val = OrderedCallsValidator(subtasks=[receive_robot_pos_subtask])
# optionally pass number of extra tool calls
args = TaskArgs(extra_tool_calls=0)
super_task = GetROS2RobotPositionTask(validators=[topics_ord_val], task_args=args)
Then run it with:
##### Now you can run it in benchmark ##################
if __name__ == "__main__":
from pathlib import Path
from rai_bench import (
define_benchmark_logger,
)
from rai_bench.tool_calling_agent import (
run_benchmark,
)
from rai_bench.utils import get_llm_for_benchmark
experiment_dir = Path("src/rai_bench/rai_bench/experiments/custom_task")
experiment_dir.mkdir(parents=True, exist_ok=True)
bench_logger = define_benchmark_logger(out_dir=experiment_dir)
super_task.set_logger(bench_logger)
llm = get_llm_for_benchmark(
model_name="gpt-4o",
vendor="openai",
)
run_benchmark(
llm=llm,
out_dir=experiment_dir,
tasks=[super_task],
bench_logger=bench_logger,
)