v0.2.0· Apache 2.0

Search docs...

corail/events

4 min read

Event System

Corail includes an in-process event bus for runtime observability. Components emit events as they execute; subscribers react asynchronously. This is a lightweight pub/sub for a single agent Pod, not a distributed message broker.

EventBus

from corail.events.bus import EventBus
from corail.events.types import Event, EventType
 
bus = EventBus()
 
# Subscribe to a specific event type
async def on_tool_called(event: Event):
    print(f"Tool called: {event.data['name']}")
 
bus.subscribe(EventType.TOOL_CALLED, on_tool_called)
 
# Subscribe to ALL events (wildcard)
async def audit_log(event: Event):
    print(f"[{event.type.value}] {event.data}")
 
bus.subscribe("*", audit_log)
 
# Emit an event
await bus.emit(Event(
    type=EventType.TOOL_CALLED,
    agent_id="my-agent",
    data={"name": "calculator", "args": {"expression": "2+2"}},
))

Key behaviors:

  • Handlers run concurrently via asyncio.gather
  • Errors in handlers are logged, never raised to the emitter
  • History is kept (last 1000 events), accessible via bus.history
  • emit_sync for fire-and-forget from synchronous code

Event types

All events are defined in the EventType enum:

Message lifecycle

EventEmitted when
message.receivedUser input arrives
message.responseFinal response is ready

LLM

EventEmitted when
thinking.startedLLM begins reasoning
thinking.completedLLM finishes reasoning
llm.call.startedLLM API call begins (includes round number)
llm.call.completedLLM API call returns (includes stop_reason)
llm.tokenA single token is generated

Tool execution

EventEmitted when
tool.calledTool is invoked (includes name and args)
tool.resultTool returns successfully (includes output)
tool.errorTool fails (includes error and attempt number)

Guards

EventEmitted when
guard.input.checkedInput guard passed
guard.output.checkedOutput guard passed
guard.blockedA guard blocked content (includes direction and reason)

Memory

EventEmitted when
memory.retrievedConversation history loaded
memory.updatedNew message stored

Retrieval / RAG

EventEmitted when
retrieval.searchedVector search executed
retrieval.resultsRetrieval results available

Budget

EventEmitted when
budget.warningApproaching budget limit
budget.exceededMax rounds or tokens reached

Agent lifecycle

EventEmitted when
agent.startedAgent Pod starts
agent.stoppedAgent Pod stops
agent.errorUnrecoverable error

Turn lifecycle

Emitted around each round of the agent loop. Useful for observability, stream UX, and MLflow child spans.

EventEmitted when
turn.startedA new reasoning round begins (includes round index)
turn.endedThe round ends. Only emitted at the terminal exit of the loop, and carries the real stop_reason (one of StopReason: end_turn, max_rounds, token_budget, tool_error, guard_blocked, user_aborted).

Session

EventEmitted when
session.createdNew conversation started
session.endedConversation ended

Control plane

EventEmitted when
control.config.updatedConfiguration changed at runtime
control.tools.reloadTool reload requested
control.kbs.reloadKnowledge base reload requested
control.agent.pausedAgent paused by platform
control.agent.resumedAgent resumed by platform

Event dataclass

@dataclass
class Event:
    type: EventType                    # Required
    timestamp: datetime                # Auto-set to UTC now
    agent_id: str = ""                 # Agent that emitted
    user_id: str = ""                  # User context
    session_id: str = ""               # Conversation context
    data: dict[str, Any] = {}          # Arbitrary payload
 
    def to_dict(self) -> dict:         # JSON-serializable
        ...

Integration with the agent strategy

The agent-react strategy creates an EventBus automatically via its initializer. The bus receives events throughout the execution cycle:

  1. MESSAGE_RECEIVED -- user input accepted
  2. TURN_STARTED -- each reasoning round (one per LLM call)
  3. LLM_CALL_STARTED / LLM_CALL_COMPLETED
  4. TOOL_CALLED / TOOL_RESULT / TOOL_ERROR -- tool execution
  5. GUARD_BLOCKED -- if a guard rejects content
  6. BUDGET_EXCEEDED -- if limits are hit
  7. TURN_ENDED -- one final event carrying the StopReason
  8. MESSAGE_RESPONSE -- final response ready

The MLflow tracing listener (corail/tracing/mlflow_listener.py) subscribes to TOOL_CALLED / TOOL_RESULT / TOOL_ERROR and collects them into contextvar storage. The channel layer then turns each collected event into a tool:<name> child span inside the @mlflow.trace-decorated chat handler, so every request produces a full trace tree without requiring strategies to know about MLflow directly.