Skip to content

Aggregators

Overview

Aggregators in RAI are components that collect and process messages from various sources, transforming them into summarized or analyzed information. They are particularly useful in state-based agents where they help maintain and update the agent's state through periodic aggregation.

BaseAggregator

BaseAggregator is the abstract base class for all aggregator implementations in the RAI framework. It provides a generic interface for collecting and processing messages of a specific type.

Class Definition

rai.aggregators.base.BaseAggregator

Bases: ABC, Generic[T]

Interface for aggregators.

Source code in src/rai_core/rai/aggregators/base.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class BaseAggregator(ABC, Generic[T]):
    """
    Interface for aggregators.
    """

    def __init__(self, max_size: int | None = None) -> None:
        super().__init__()
        self._buffer: Deque[T] = deque()
        self.max_size = max_size

    def __call__(self, msg: T) -> None:
        if self.max_size is not None and len(self._buffer) >= self.max_size:
            self._buffer.popleft()
        self._buffer.append(msg)

    @abstractmethod
    def get(self) -> BaseMessage | None:
        """Returns the outcome of processing the aggregated message"""
        pass

    def clear_buffer(self) -> None:
        """Clears the buffer of messages"""
        self._buffer.clear()

    def get_buffer(self) -> List[T]:
        """Returns a copy of the buffer of messages"""
        return list(self._buffer)

    def __str__(self) -> str:
        return f"{self.__class__.__name__}(len={len(self._buffer)})"

clear_buffer()

Clears the buffer of messages

Source code in src/rai_core/rai/aggregators/base.py
44
45
46
def clear_buffer(self) -> None:
    """Clears the buffer of messages"""
    self._buffer.clear()

get() abstractmethod

Returns the outcome of processing the aggregated message

Source code in src/rai_core/rai/aggregators/base.py
39
40
41
42
@abstractmethod
def get(self) -> BaseMessage | None:
    """Returns the outcome of processing the aggregated message"""
    pass

get_buffer()

Returns a copy of the buffer of messages

Source code in src/rai_core/rai/aggregators/base.py
48
49
50
def get_buffer(self) -> List[T]:
    """Returns a copy of the buffer of messages"""
    return list(self._buffer)

Purpose

The BaseAggregator class serves as the foundation for message aggregation in RAI, providing:

  • A buffer for collecting messages
  • Size management to prevent memory overflow
  • A consistent interface for processing and returning aggregated results
  • Type safety through generics

Usage in State-Based Agents

Aggregators are typically used in state-based agents to maintain and update the agent's state:

config = StateBasedConfig(
    aggregators={
        ("/camera/camera/color/image_raw", "sensor_msgs/msg/Image"): [
            ROS2ImgVLMDiffAggregator()
        ],
        "/rosout": [
            ROS2LogsAggregator()
        ]
    }
)

agent = ROS2StateBasedAgent(
    config=config,
    target_connectors={"to_human": hri_connector},
    tools=tools
)

Direct Registration via Connector

Aggregators can also be registered directly with a connector using the register_callback method. This allows for more flexible message processing outside of state-based agents:

# Create a connector
connector = ROS2Connector()

# Create an aggregator
image_aggregator = ROS2GetLastImageAggregator()

# Register the aggregator as a callback for a specific topic
connector.register_callback(
    topic="/camera/camera/color/image_raw",
    msg_type="sensor_msgs/msg/Image",
    callback=image_aggregator
)

# The aggregator will now process all messages received on the topic
# You can retrieve the aggregated result at any time
aggregated_message = image_aggregator.get()

This approach is useful when you need to:

  • Process messages from specific topics independently
  • Combine multiple aggregators for the same topic
  • Use aggregators in non-state-based agents
  • Have more control over when aggregation occurs

Best Practices

  1. Buffer Management: Set appropriate max_size to prevent memory issues
  2. Resource Cleanup: Clear buffers when no longer needed
  3. Error Handling: Handle empty buffers and processing errors gracefully
  4. Type Safety: Use appropriate generic types for message types
  5. Performance: Consider the computational cost of aggregation operations

Implementation Example

class CustomAggregator(BaseAggregator[CustomMessage]):
    def get(self) -> HumanMessage | None:
        msgs = self.get_buffer()
        if not msgs:
            return None

        # Process messages
        summary = process_messages(msgs)

        # Clear buffer after processing
        self.clear_buffer()

        return HumanMessage(content=summary)

See Also

  • ROS 2 Aggregators: For more information on the different types of ROS 2 aggregators in RAI
  • Agents: For more information on the different types of agents in RAI
  • Connectors: For more information on the different types of connectors in RAI
  • Langchain Integration: For more information on the LangChain integration within RAI
  • Multimodal messages: For more information on the multimodal LangChain messages in RAI
  • Runners: For more information on the different types of runners in RAI