Aggregators¶
Overview¶
Aggregators in RAI are components that collect and process messages from various sources, transforming them into summarized or analyzed information. They are particularly useful in state-based agents where they help maintain and update the agent's state through periodic aggregation.
BaseAggregator¶
BaseAggregator
is the abstract base class for all aggregator implementations in the RAI framework. It provides a generic interface for collecting and processing messages of a specific type.
Class Definition¶
rai.aggregators.base.BaseAggregator
¶
Bases: ABC
, Generic[T]
Interface for aggregators.
Source code in src/rai_core/rai/aggregators/base.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
|
clear_buffer()
¶
Clears the buffer of messages
Source code in src/rai_core/rai/aggregators/base.py
44 45 46 |
|
get()
abstractmethod
¶
Returns the outcome of processing the aggregated message
Source code in src/rai_core/rai/aggregators/base.py
39 40 41 42 |
|
get_buffer()
¶
Returns a copy of the buffer of messages
Source code in src/rai_core/rai/aggregators/base.py
48 49 50 |
|
Purpose¶
The BaseAggregator
class serves as the foundation for message aggregation in RAI, providing:
- A buffer for collecting messages
- Size management to prevent memory overflow
- A consistent interface for processing and returning aggregated results
- Type safety through generics
Usage in State-Based Agents¶
Aggregators are typically used in state-based agents to maintain and update the agent's state:
config = StateBasedConfig(
aggregators={
("/camera/camera/color/image_raw", "sensor_msgs/msg/Image"): [
ROS2ImgVLMDiffAggregator()
],
"/rosout": [
ROS2LogsAggregator()
]
}
)
agent = ROS2StateBasedAgent(
config=config,
target_connectors={"to_human": hri_connector},
tools=tools
)
Direct Registration via Connector¶
Aggregators can also be registered directly with a connector using the register_callback
method. This allows for more flexible message processing outside of state-based agents:
# Create a connector
connector = ROS2Connector()
# Create an aggregator
image_aggregator = ROS2GetLastImageAggregator()
# Register the aggregator as a callback for a specific topic
connector.register_callback(
topic="/camera/camera/color/image_raw",
msg_type="sensor_msgs/msg/Image",
callback=image_aggregator
)
# The aggregator will now process all messages received on the topic
# You can retrieve the aggregated result at any time
aggregated_message = image_aggregator.get()
This approach is useful when you need to:
- Process messages from specific topics independently
- Combine multiple aggregators for the same topic
- Use aggregators in non-state-based agents
- Have more control over when aggregation occurs
Best Practices¶
- Buffer Management: Set appropriate max_size to prevent memory issues
- Resource Cleanup: Clear buffers when no longer needed
- Error Handling: Handle empty buffers and processing errors gracefully
- Type Safety: Use appropriate generic types for message types
- Performance: Consider the computational cost of aggregation operations
Implementation Example¶
class CustomAggregator(BaseAggregator[CustomMessage]):
def get(self) -> HumanMessage | None:
msgs = self.get_buffer()
if not msgs:
return None
# Process messages
summary = process_messages(msgs)
# Clear buffer after processing
self.clear_buffer()
return HumanMessage(content=summary)
See Also¶
- ROS 2 Aggregators: For more information on the different types of ROS 2 aggregators in RAI
- Agents: For more information on the different types of agents in RAI
- Connectors: For more information on the different types of connectors in RAI
- Langchain Integration: For more information on the LangChain integration within RAI
- Multimodal messages: For more information on the multimodal LangChain messages in RAI
- Runners: For more information on the different types of runners in RAI