Docs-Builder

# Event System Architecture

Version: 1.1.1
Last Updated: January 23, 2026


Overview

BranchPy implements a unified event bus architecture for real-time system monitoring, audit logging, and dashboard updates. All events flow through a central event bus and are delivered to clients via Server-Sent Events (SSE) streaming.

The event system provides:

  • Real-time notifications of system operations and state changes
  • Audit trail for compliance and debugging
  • Correlation tracking across multi-step operations
  • Topic-based filtering for selective event consumption
  • Structured event schema for consistent data exchange

Architecture

Event Bus

The central event bus (branchpy.server.events.bus) coordinates event emission and delivery:

┌─────────────────────────────────────────────────────────┐
│                     Event Bus                           │
│                                                         │
│  ┌─────────────┐    ┌──────────────┐    ┌──────────┐  │
│  │   Emitter   │───▶│  Event Queue │───▶│ Delivery │  │
│  │  (helpers)  │    │   (in-mem)   │    │  (SSE)   │  │
│  └─────────────┘    └──────────────┘    └──────────┘  │
│         │                   │                   │      │
│         ▼                   ▼                   ▼      │
│    validate()          enqueue()           stream()    │
└─────────────────────────────────────────────────────────┘
         │                                         │
         │                                         │
    [Components]                             [Clients]
  (SAE, validator,                        (Web UI, CLI,
   daemon, etc.)                           extensions)

Key Components:

  • Emitters: Helper functions that construct and emit events (helpers.py)
  • Event Queue: In-memory FIFO queue for buffering events
  • SSE Delivery: HTTP endpoint streaming events to clients (/api/v1/events/stream)
  • JSONL Persistence: Optional disk logging for audit trail (.branchpy/governance/events.jsonl)

Event Structure

All events conform to a base schema with common fields:

{
  "type": "sae.done",               // Event type identifier
  "ts": "2026-01-23T14:30:00Z",     // ISO 8601 timestamp
  "level": "info",                  // Severity: debug | info | warn | error
  "topic": "sae",                   // Category: audit, sae, validation, etc.
  "source": "solver",               // Originating component
  "correlation_id": "a1b2c3d4-...", // UUID for operation tracing
  ...                               // Event-specific payload
}

Required Fields:

  • type: Event type (e.g., sae.start, validation.done, audit.save)
  • ts: Timestamp (auto-generated if omitted)
  • level: Severity level for filtering and alerting
  • topic: Category for topic-based subscriptions
  • correlation_id: UUID linking related events (auto-generated if omitted)

Optional Fields:

  • source: Component that emitted the event
  • Custom payload fields specific to event type

Event Topics

Events are categorized by topic to enable selective consumption:

Topic Description Example Types
audit User actions and system changes audit.save, audit.command
sae Static analysis engine operations sae.start, sae.done
validation Validation checks and policy validation.start, validation.done
cfg Control Flow Graph extraction cfg.start, cfg.done
daemon WebSocket daemon lifecycle daemon.started, daemon.stopped
compare Run-to-run comparison operations compare.start, compare.done
system System-level messages and errors error, info
stats Performance metrics stats.memory, stats.timing
lint Linting operations lint.start, lint.done
policy Policy enforcement policy.violated, policy.applied
general Uncategorized events message

See Event Catalog for complete event reference.


SSE Streaming

Clients subscribe to the event stream via Server-Sent Events:

Endpoint: GET /api/v1/events/stream

Response Format:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

data: {"type":"sae.start","ts":"2026-01-23T14:30:00Z","level":"info",...}

data: {"type":"sae.done","ts":"2026-01-23T14:30:15Z","level":"info",...}

Client Example (JavaScript):

const es = new EventSource('/api/v1/events/stream');

es.onmessage = (e) => {
  const event = JSON.parse(e.data);
  
  // Filter by topic
  if (event.topic === 'sae') {
    console.log('SAE event:', event);
  }
  
  // Filter by level
  if (event.level === 'error') {
    showError(event.message);
  }
};

es.onerror = () => {
  console.error('SSE connection lost, reconnecting...');
};

Features:

  • Automatic reconnection: Browsers reconnect on disconnect
  • Filtering: Clients filter events by topic, level, correlation_id
  • Low latency: Sub-second event delivery
  • Stateless: No server-side session management

See …/server/sse.md for SSE implementation details.


Correlation Tracking

Correlation IDs link events across multi-step operations:

import uuid
from branchpy.server.events.helpers import emit_sae_start, emit_sae_done

# Start operation with correlation ID
corr_id = str(uuid.uuid4())
emit_sae_start(project="/path", correlation_id=corr_id)

# ... perform analysis ...

emit_sae_done(labels=123, pruned=7, correlation_id=corr_id)

Client Filtering:

// Track specific operation
const requestId = "a1b2c3d4-e5f6-...";

es.onmessage = (e) => {
  const event = JSON.parse(e.data);
  
  if (event.correlation_id === requestId) {
    updateProgress(event);
  }
};

Benefits:

  • Request tracing: Follow operation across components
  • Performance profiling: Measure end-to-end latency
  • Error attribution: Link errors to initiating request
  • Dashboard filtering: Show events for specific operation

Event Emission

Using Helper Functions

Always use helper functions from branchpy.server.events.helpers:

from branchpy.server.events.helpers import (
    emit_event,
    emit_sae_start,
    emit_sae_done,
    emit_validation_start,
    emit_validation_done,
)

# Use typed helpers for common events
emit_sae_start(project="/path/to/project", correlation_id=corr_id)
emit_sae_done(labels=123, pruned=7, correlation_id=corr_id)

# Use generic emit_event for custom events
emit_event(
    "custom.event",
    level="info",
    topic="custom",
    source="my_module",
    correlation_id=corr_id,
    custom_field="value"
)

Best Practices

  1. Always use helpers: Never call current_bus.emit() directly
  2. Pass correlation_id: Enable tracing for multi-step operations
  3. Use appropriate topics: Categorize events for filtering
  4. Set correct levels: Use debug < info < warn < error
  5. Include context: Add payload fields (file paths, counts, messages)
  6. Fail silently: Don’t crash if bus unavailable (events are optional)

Event Persistence

Events can be persisted to disk for audit and replay:

Storage: .branchpy/governance/events.jsonl
Format: JSON Lines (one event per line)

Example:

{"type":"sae.start","ts":"2026-01-23T14:30:00Z","level":"info","topic":"sae",...}
{"type":"sae.done","ts":"2026-01-23T14:30:15Z","level":"info","topic":"sae",...}

Retention:

  • Default: 90 days
  • Auto-rotation: 50MB per file
  • Archive: .branchpy/governance/archive/*.jsonl.gz

Replay:

  • Load historical events from JSONL files
  • Filter by correlation_id to replay specific operations
  • See Event Bus & Replay Guide for UI


Source References

Based on:

  • docs/v1.1.0/Events/events.schema.md
  • docs/v1.1.0/Events/event-bus-replay-guide.md
  • branchpy/server/events/bus.py
  • branchpy/server/events/helpers.py

Last Updated: January 23, 2026
Version: 1.1.1