10 KiB
Event Store Persistence (ADR-007)
Complete event sourcing implementation for V3 Claude Flow with persistent storage, projections, and event replay.
Overview
The Event Store provides a robust foundation for tracking all state changes in the V3 system through domain events. This enables:
- Complete Audit Trail: Every state change is recorded
- Time Travel: Replay events to reconstruct state at any point
- Projections: Build multiple read models from the same events
- Debugging: Understand exactly what happened and when
- Event-Driven Architecture: Decouple components through events
Architecture
┌─────────────────────────────────────────────────────────────┐
│ Event Store │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ SQLite Database (sql.js for cross-platform) │ │
│ │ - events table (append-only log) │ │
│ │ - snapshots table (performance optimization) │ │
│ └───────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Agent │ │ Task │ │ Memory │
│ Projection │ │ Projection │ │ Projection │
└──────────────┘ └──────────────┘ └──────────────┘
Features
1. Event Store
- Persistent Storage: SQLite with sql.js fallback for Windows compatibility
- Versioning: Automatic version tracking per aggregate
- Filtering: Query events by type, aggregate, timestamp
- Replay: Iterate through all events for rebuilding state
- Snapshots: Performance optimization for large event streams
- Auto-Persist: Configurable auto-save to disk
2. Domain Events
Comprehensive event types for all aggregates:
Agent Events:
agent:spawned- Agent createdagent:started- Agent began workingagent:stopped- Agent finishedagent:failed- Agent encountered erroragent:status-changed- Agent status updatedagent:task-assigned- Task assigned to agentagent:task-completed- Agent completed task
Task Events:
task:created- New task createdtask:started- Task execution begantask:completed- Task finished successfullytask:failed- Task failedtask:blocked- Task blocked by dependenciestask:queued- Task added to queue
Memory Events:
memory:stored- Memory entry savedmemory:retrieved- Memory entry accessedmemory:deleted- Memory entry removedmemory:expired- Memory entry expired
Swarm Events:
swarm:initialized- Swarm startedswarm:scaled- Agent count changedswarm:terminated- Swarm shut downswarm:phase-changed- Execution phase changedswarm:milestone-reached- Milestone achievedswarm:error- Swarm-level error
3. Projections
Build queryable read models from events:
AgentStateProjection:
- Current state of all agents
- Filter by status, domain, role
- Track task completion metrics
- Monitor agent health
TaskHistoryProjection:
- Complete task execution history
- Filter by status, agent, type
- Calculate average durations
- Track success/failure rates
MemoryIndexProjection:
- Memory access patterns
- Track usage by namespace
- Identify hot/cold data
- Monitor memory consumption
Usage
Basic Event Storage
import { EventStore, createAgentSpawnedEvent } from '@claude-flow/shared/events';
// Initialize
const eventStore = new EventStore({
databasePath: './events.db',
verbose: true,
});
await eventStore.initialize();
// Record events
const event = createAgentSpawnedEvent(
'agent-1',
'coder',
'core',
['coding', 'testing']
);
await eventStore.append(event);
// Query events
const agentEvents = await eventStore.getEvents('agent-1');
const allTaskEvents = await eventStore.query({
aggregateTypes: ['task']
});
// Cleanup
await eventStore.shutdown();
Using Projections
import {
EventStore,
AgentStateProjection,
TaskHistoryProjection
} from '@claude-flow/shared/events';
const eventStore = new EventStore({ databasePath: './events.db' });
await eventStore.initialize();
// Build agent state projection
const agentProjection = new AgentStateProjection(eventStore);
await agentProjection.initialize();
// Query agent state
const activeAgents = agentProjection.getAgentsByStatus('active');
const agent1 = agentProjection.getAgent('agent-1');
console.log(`Active agents: ${activeAgents.length}`);
console.log(`Agent 1 completed ${agent1.completedTasks.length} tasks`);
// Build task history projection
const taskProjection = new TaskHistoryProjection(eventStore);
await taskProjection.initialize();
// Query task history
const completedTasks = taskProjection.getTasksByStatus('completed');
const avgDuration = taskProjection.getAverageTaskDuration();
console.log(`Completed: ${completedTasks.length} tasks`);
console.log(`Average duration: ${avgDuration}ms`);
Event Replay
// Replay all events
for await (const event of eventStore.replay()) {
console.log(`${event.type} at ${new Date(event.timestamp)}`);
}
// Replay from specific version
for await (const event of eventStore.replay(100)) {
// Process events starting from version 100
}
Snapshots
// Save snapshot for performance
await eventStore.saveSnapshot({
aggregateId: 'agent-1',
aggregateType: 'agent',
version: 500,
state: { status: 'active', tasks: ['task-1', 'task-2'] },
timestamp: Date.now(),
});
// Load snapshot
const snapshot = await eventStore.getSnapshot('agent-1');
if (snapshot) {
// Resume from snapshot version
const events = await eventStore.getEvents('agent-1', snapshot.version);
}
Configuration
const eventStore = new EventStore({
// Database path (:memory: for in-memory only)
databasePath: './v3-events.db',
// Enable verbose logging
verbose: true,
// Auto-persist interval (0 = manual only)
autoPersistInterval: 5000, // 5 seconds
// Recommend snapshots every N events
snapshotThreshold: 100,
// Custom sql.js WASM path (optional)
wasmPath: './sql-wasm.wasm',
});
Performance
Indexing
The Event Store automatically creates indexes for:
- Aggregate ID + Version (unique)
- Aggregate Type
- Event Type
- Timestamp
- Version
Snapshots
Recommended usage:
- Save snapshot every 100-500 events
- Use snapshots for long-running aggregates
- Snapshots reduce replay time from O(n) to O(1)
Auto-Persist
- Default: 5 seconds
- In-memory mode: No persistence
- Disk mode: Periodic writes to SQLite file
Testing
Run comprehensive tests:
# Run all event store tests
npm test -- event-store.test.ts
# Run specific test suite
npm test -- event-store.test.ts -t "Event Appending"
Example
See example-usage.ts for a complete demonstration:
npx tsx v3/@claude-flow/shared/src/events/example-usage.ts
Output includes:
- Event recording
- Query examples
- Projection building
- Event replay
- Snapshots
- Statistics
Integration with V3
Agent Lifecycle
// Queen coordinator spawns agents
await eventStore.append(
createAgentSpawnedEvent('agent-2', 'security-architect', 'security', ['auditing'])
);
// Track agent execution
await eventStore.append(createAgentStartedEvent('agent-2'));
await eventStore.append(createAgentTaskAssignedEvent('agent-2', 'task-1', Date.now()));
Task Execution
// Create task
await eventStore.append(
createTaskCreatedEvent('task-1', 'security-audit', 'CVE-1 Fix', 'Fix injection', 'critical', [])
);
// Track progress
await eventStore.append(createTaskStartedEvent('task-1', 'agent-2'));
await eventStore.append(createTaskCompletedEvent('task-1', { fixed: true }, 5000));
Memory Operations
// Track memory usage
await eventStore.append(
createMemoryStoredEvent('mem-1', 'agent-context', 'agent-2-state', 'episodic', 2048)
);
await eventStore.append(
createMemoryRetrievedEvent('mem-1', 'agent-context', 'agent-2-state', 1)
);
Cross-Platform Compatibility
The Event Store uses sql.js for cross-platform SQLite support:
- Windows: Pure JavaScript/WASM (no native compilation)
- macOS: Works with standard Node.js
- Linux: Full compatibility
Database files are portable across platforms.
Migration Path
To integrate Event Store into existing V3 code:
- Initialize Event Store: Add to swarm initialization
- Record Events: Emit events on state changes
- Build Projections: Replace direct state queries
- Event Replay: Use for debugging and analytics
- Snapshots: Add for performance optimization
ADR Compliance
This implementation fulfills ADR-007 requirements:
✅ Event Store with append(), getEvents(), getEventsByType(), replay()
✅ Domain events for agent, task, memory, swarm
✅ Projections for AgentState, TaskHistory, MemoryIndex
✅ SQLite persistence with cross-platform support
✅ Event versioning and snapshots
✅ Comprehensive test coverage
Contributing
When adding new domain events:
- Define event interface in
domain-events.ts - Add factory function
- Update projections to handle new event
- Add tests
- Update this README
License
Part of claude-flow V3 - See root LICENSE file.