# Event System Architecture
Version: 1.1.1
Last Updated: January 23, 2026
Overview
BranchPy implements a unified event bus architecture for real-time system monitoring, audit logging, and dashboard updates. All events flow through a central event bus and are delivered to clients via Server-Sent Events (SSE) streaming.
The event system provides:
- Real-time notifications of system operations and state changes
- Audit trail for compliance and debugging
- Correlation tracking across multi-step operations
- Topic-based filtering for selective event consumption
- Structured event schema for consistent data exchange
Architecture
Event Bus
The central event bus (branchpy.server.events.bus) coordinates event emission and delivery:
┌─────────────────────────────────────────────────────────â”
│ Event Bus │
│ │
│ ┌─────────────┠┌──────────────┠┌──────────┠│
│ │ Emitter │───▶│ Event Queue │───▶│ Delivery │ │
│ │ (helpers) │ │ (in-mem) │ │ (SSE) │ │
│ └─────────────┘ └──────────────┘ └──────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ validate() enqueue() stream() │
└─────────────────────────────────────────────────────────┘
│ │
│ │
[Components] [Clients]
(SAE, validator, (Web UI, CLI,
daemon, etc.) extensions)
Key Components:
- Emitters: Helper functions that construct and emit events (
helpers.py) - Event Queue: In-memory FIFO queue for buffering events
- SSE Delivery: HTTP endpoint streaming events to clients (
/api/v1/events/stream) - JSONL Persistence: Optional disk logging for audit trail (
.branchpy/governance/events.jsonl)
Event Structure
All events conform to a base schema with common fields:
{
"type": "sae.done", // Event type identifier
"ts": "2026-01-23T14:30:00Z", // ISO 8601 timestamp
"level": "info", // Severity: debug | info | warn | error
"topic": "sae", // Category: audit, sae, validation, etc.
"source": "solver", // Originating component
"correlation_id": "a1b2c3d4-...", // UUID for operation tracing
... // Event-specific payload
}
Required Fields:
type: Event type (e.g.,sae.start,validation.done,audit.save)ts: Timestamp (auto-generated if omitted)level: Severity level for filtering and alertingtopic: Category for topic-based subscriptionscorrelation_id: UUID linking related events (auto-generated if omitted)
Optional Fields:
source: Component that emitted the event- Custom payload fields specific to event type
Event Topics
Events are categorized by topic to enable selective consumption:
| Topic | Description | Example Types |
|---|---|---|
audit |
User actions and system changes | audit.save, audit.command |
sae |
Static analysis engine operations | sae.start, sae.done |
validation |
Validation checks and policy | validation.start, validation.done |
cfg |
Control Flow Graph extraction | cfg.start, cfg.done |
daemon |
WebSocket daemon lifecycle | daemon.started, daemon.stopped |
compare |
Run-to-run comparison operations | compare.start, compare.done |
system |
System-level messages and errors | error, info |
stats |
Performance metrics | stats.memory, stats.timing |
lint |
Linting operations | lint.start, lint.done |
policy |
Policy enforcement | policy.violated, policy.applied |
general |
Uncategorized events | message |
See Event Catalog for complete event reference.
SSE Streaming
Clients subscribe to the event stream via Server-Sent Events:
Endpoint: GET /api/v1/events/stream
Response Format:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
data: {"type":"sae.start","ts":"2026-01-23T14:30:00Z","level":"info",...}
data: {"type":"sae.done","ts":"2026-01-23T14:30:15Z","level":"info",...}
Client Example (JavaScript):
const es = new EventSource('/api/v1/events/stream');
es.onmessage = (e) => {
const event = JSON.parse(e.data);
// Filter by topic
if (event.topic === 'sae') {
console.log('SAE event:', event);
}
// Filter by level
if (event.level === 'error') {
showError(event.message);
}
};
es.onerror = () => {
console.error('SSE connection lost, reconnecting...');
};
Features:
- Automatic reconnection: Browsers reconnect on disconnect
- Filtering: Clients filter events by topic, level, correlation_id
- Low latency: Sub-second event delivery
- Stateless: No server-side session management
See …/server/sse.md for SSE implementation details.
Correlation Tracking
Correlation IDs link events across multi-step operations:
import uuid
from branchpy.server.events.helpers import emit_sae_start, emit_sae_done
# Start operation with correlation ID
corr_id = str(uuid.uuid4())
emit_sae_start(project="/path", correlation_id=corr_id)
# ... perform analysis ...
emit_sae_done(labels=123, pruned=7, correlation_id=corr_id)
Client Filtering:
// Track specific operation
const requestId = "a1b2c3d4-e5f6-...";
es.onmessage = (e) => {
const event = JSON.parse(e.data);
if (event.correlation_id === requestId) {
updateProgress(event);
}
};
Benefits:
- Request tracing: Follow operation across components
- Performance profiling: Measure end-to-end latency
- Error attribution: Link errors to initiating request
- Dashboard filtering: Show events for specific operation
Event Emission
Using Helper Functions
Always use helper functions from branchpy.server.events.helpers:
from branchpy.server.events.helpers import (
emit_event,
emit_sae_start,
emit_sae_done,
emit_validation_start,
emit_validation_done,
)
# Use typed helpers for common events
emit_sae_start(project="/path/to/project", correlation_id=corr_id)
emit_sae_done(labels=123, pruned=7, correlation_id=corr_id)
# Use generic emit_event for custom events
emit_event(
"custom.event",
level="info",
topic="custom",
source="my_module",
correlation_id=corr_id,
custom_field="value"
)
Best Practices
- Always use helpers: Never call
current_bus.emit()directly - Pass correlation_id: Enable tracing for multi-step operations
- Use appropriate topics: Categorize events for filtering
- Set correct levels: Use
debug<info<warn<error - Include context: Add payload fields (file paths, counts, messages)
- Fail silently: Don’t crash if bus unavailable (events are optional)
Event Persistence
Events can be persisted to disk for audit and replay:
Storage: .branchpy/governance/events.jsonl
Format: JSON Lines (one event per line)
Example:
{"type":"sae.start","ts":"2026-01-23T14:30:00Z","level":"info","topic":"sae",...}
{"type":"sae.done","ts":"2026-01-23T14:30:15Z","level":"info","topic":"sae",...}
Retention:
- Default: 90 days
- Auto-rotation: 50MB per file
- Archive:
.branchpy/governance/archive/*.jsonl.gz
Replay:
- Load historical events from JSONL files
- Filter by correlation_id to replay specific operations
- See Event Bus & Replay Guide for UI
Related Documentation
- Event Catalog - Complete event schema reference
- Server Architecture - HTTP server and SSE implementation
- Logging System - Structured logging integration
- Audit Trail - Compliance and forensics
Source References
Based on:
docs/v1.1.0/Events/events.schema.mddocs/v1.1.0/Events/event-bus-replay-guide.mdbranchpy/server/events/bus.pybranchpy/server/events/helpers.py
Last Updated: January 23, 2026
Version: 1.1.1