/** * Event Store Persistence (ADR-007) * * Provides persistent storage for domain events using SQLite. * Supports event replay, snapshots, and projections. * * Key Features: * - Append-only event log * - Event versioning per aggregate * - Event filtering and queries * - Snapshot support for performance * - Event replay for projections * - Cross-platform SQLite (sql.js fallback) * * @module v3/shared/events/event-store */ import { EventEmitter } from 'node:events'; import { existsSync, readFileSync, writeFileSync } from 'node:fs'; import initSqlJs from 'sql.js'; const DEFAULT_CONFIG = { databasePath: ':memory:', verbose: false, autoPersistInterval: 5000, // 5 seconds snapshotThreshold: 100, }; // ============================================================================= // Event Store Implementation // ============================================================================= export class EventStore extends EventEmitter { config; db = null; initialized = false; persistTimer = null; SQL = null; // Version tracking per aggregate aggregateVersions = new Map(); constructor(config = {}) { super(); this.config = { ...DEFAULT_CONFIG, ...config }; } /** * Initialize the event store */ async initialize() { if (this.initialized) return; // Load sql.js WASM this.SQL = await initSqlJs({ locateFile: this.config.wasmPath ? () => this.config.wasmPath : (file) => `https://sql.js.org/dist/${file}`, }); // Load existing database if exists if (this.config.databasePath !== ':memory:' && existsSync(this.config.databasePath)) { const buffer = readFileSync(this.config.databasePath); this.db = new this.SQL.Database(new Uint8Array(buffer)); if (this.config.verbose) { console.log(`[EventStore] Loaded database from ${this.config.databasePath}`); } } else { this.db = new this.SQL.Database(); if (this.config.verbose) { console.log('[EventStore] Created new event store database'); } } // Create schema this.createSchema(); // Load aggregate versions this.loadAggregateVersions(); // Set up auto-persist if (this.config.autoPersistInterval > 0 && this.config.databasePath !== ':memory:') { this.persistTimer = setInterval(() => { this.persist().catch((err) => { this.emit('error', { operation: 'auto-persist', error: err }); }); }, this.config.autoPersistInterval); } this.initialized = true; this.emit('initialized'); } /** * Shutdown the event store */ async shutdown() { if (!this.initialized || !this.db) return; // Stop auto-persist if (this.persistTimer) { clearInterval(this.persistTimer); this.persistTimer = null; } // Final persist if (this.config.databasePath !== ':memory:') { await this.persist(); } this.db.close(); this.db = null; this.initialized = false; this.emit('shutdown'); } /** * Append a new event to the store */ async append(event) { this.ensureInitialized(); // Get next version for aggregate const currentVersion = this.aggregateVersions.get(event.aggregateId) || 0; const nextVersion = currentVersion + 1; // Set version on event event.version = nextVersion; // Insert event const stmt = ` INSERT INTO events ( id, type, aggregate_id, aggregate_type, version, timestamp, source, payload, metadata, causation_id, correlation_id ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `; this.db.run(stmt, [ event.id, event.type, event.aggregateId, event.aggregateType, event.version, event.timestamp, event.source, JSON.stringify(event.payload), JSON.stringify(event.metadata || {}), event.causationId || null, event.correlationId || null, ]); // Update version tracker this.aggregateVersions.set(event.aggregateId, nextVersion); // Emit event appended notification this.emit('event:appended', event); // Check if snapshot needed if (nextVersion % this.config.snapshotThreshold === 0) { this.emit('snapshot:recommended', { aggregateId: event.aggregateId, version: nextVersion }); } } /** * Get events for a specific aggregate */ async getEvents(aggregateId, fromVersion) { this.ensureInitialized(); let sql = 'SELECT * FROM events WHERE aggregate_id = ?'; const params = [aggregateId]; if (fromVersion !== undefined) { sql += ' AND version >= ?'; params.push(fromVersion); } sql += ' ORDER BY version ASC'; const stmt = this.db.prepare(sql); const events = []; stmt.bind(params); while (stmt.step()) { const row = stmt.getAsObject(); events.push(this.rowToEvent(row)); } stmt.free(); return events; } /** * Get events by type */ async getEventsByType(type) { this.ensureInitialized(); const stmt = this.db.prepare('SELECT * FROM events WHERE type = ? ORDER BY timestamp ASC'); const events = []; stmt.bind([type]); while (stmt.step()) { const row = stmt.getAsObject(); events.push(this.rowToEvent(row)); } stmt.free(); return events; } /** * Query events with filters */ async query(filter) { this.ensureInitialized(); let sql = 'SELECT * FROM events WHERE 1=1'; const params = []; // Aggregate ID filter if (filter.aggregateIds && filter.aggregateIds.length > 0) { sql += ` AND aggregate_id IN (${filter.aggregateIds.map(() => '?').join(',')})`; params.push(...filter.aggregateIds); } // Aggregate type filter if (filter.aggregateTypes && filter.aggregateTypes.length > 0) { sql += ` AND aggregate_type IN (${filter.aggregateTypes.map(() => '?').join(',')})`; params.push(...filter.aggregateTypes); } // Event type filter if (filter.eventTypes && filter.eventTypes.length > 0) { sql += ` AND type IN (${filter.eventTypes.map(() => '?').join(',')})`; params.push(...filter.eventTypes); } // Timestamp filters if (filter.afterTimestamp) { sql += ' AND timestamp > ?'; params.push(filter.afterTimestamp); } if (filter.beforeTimestamp) { sql += ' AND timestamp < ?'; params.push(filter.beforeTimestamp); } // Version filter if (filter.fromVersion) { sql += ' AND version >= ?'; params.push(filter.fromVersion); } // Order by timestamp sql += ' ORDER BY timestamp ASC'; // Pagination if (filter.limit) { sql += ' LIMIT ?'; params.push(filter.limit); } if (filter.offset) { sql += ' OFFSET ?'; params.push(filter.offset); } const stmt = this.db.prepare(sql); const events = []; stmt.bind(params); while (stmt.step()) { const row = stmt.getAsObject(); events.push(this.rowToEvent(row)); } stmt.free(); return events; } /** * Replay events from a specific version */ async *replay(fromVersion = 0) { this.ensureInitialized(); const stmt = this.db.prepare('SELECT * FROM events WHERE version >= ? ORDER BY version ASC'); stmt.bind([fromVersion]); while (stmt.step()) { const row = stmt.getAsObject(); yield this.rowToEvent(row); } stmt.free(); } /** * Save a snapshot for an aggregate */ async saveSnapshot(snapshot) { this.ensureInitialized(); const stmt = ` INSERT OR REPLACE INTO snapshots ( aggregate_id, aggregate_type, version, state, timestamp ) VALUES (?, ?, ?, ?, ?) `; this.db.run(stmt, [ snapshot.aggregateId, snapshot.aggregateType, snapshot.version, JSON.stringify(snapshot.state), snapshot.timestamp, ]); this.emit('snapshot:saved', snapshot); } /** * Get snapshot for an aggregate */ async getSnapshot(aggregateId) { this.ensureInitialized(); const stmt = this.db.prepare('SELECT * FROM snapshots WHERE aggregate_id = ? ORDER BY version DESC LIMIT 1'); const row = stmt.getAsObject([aggregateId]); stmt.free(); if (!row || Object.keys(row).length === 0) { return null; } return { aggregateId: row.aggregate_id, aggregateType: row.aggregate_type, version: row.version, state: JSON.parse(row.state), timestamp: row.timestamp, }; } /** * Get event store statistics */ async getStats() { this.ensureInitialized(); // Total events const totalStmt = this.db.prepare('SELECT COUNT(*) as count FROM events'); const totalRow = totalStmt.getAsObject(); totalStmt.free(); const totalEvents = totalRow.count || 0; // Events by type const typeStmt = this.db.prepare('SELECT type, COUNT(*) as count FROM events GROUP BY type'); const eventsByType = {}; while (typeStmt.step()) { const row = typeStmt.getAsObject(); eventsByType[row.type] = row.count || 0; } typeStmt.free(); // Events by aggregate const aggStmt = this.db.prepare('SELECT aggregate_id, COUNT(*) as count FROM events GROUP BY aggregate_id'); const eventsByAggregate = {}; while (aggStmt.step()) { const row = aggStmt.getAsObject(); eventsByAggregate[row.aggregate_id] = row.count || 0; } aggStmt.free(); // Timestamp range const rangeStmt = this.db.prepare('SELECT MIN(timestamp) as oldest, MAX(timestamp) as newest FROM events'); const rangeRow = rangeStmt.getAsObject(); rangeStmt.free(); // Snapshot count const snapshotStmt = this.db.prepare('SELECT COUNT(*) as count FROM snapshots'); const snapshotRow = snapshotStmt.getAsObject(); snapshotStmt.free(); return { totalEvents, eventsByType, eventsByAggregate, oldestEvent: rangeRow.oldest || null, newestEvent: rangeRow.newest || null, snapshotCount: snapshotRow.count || 0, }; } /** * Persist to disk */ async persist() { if (!this.db || this.config.databasePath === ':memory:') { return; } const data = this.db.export(); const buffer = Buffer.from(data); writeFileSync(this.config.databasePath, buffer); if (this.config.verbose) { console.log(`[EventStore] Persisted ${buffer.length} bytes to ${this.config.databasePath}`); } this.emit('persisted', { size: buffer.length, path: this.config.databasePath }); } // ===== Private Methods ===== createSchema() { if (!this.db) return; // Events table this.db.run(` CREATE TABLE IF NOT EXISTS events ( id TEXT PRIMARY KEY, type TEXT NOT NULL, aggregate_id TEXT NOT NULL, aggregate_type TEXT NOT NULL, version INTEGER NOT NULL, timestamp INTEGER NOT NULL, source TEXT NOT NULL, payload TEXT NOT NULL, metadata TEXT, causation_id TEXT, correlation_id TEXT ) `); // Indexes for performance this.db.run('CREATE INDEX IF NOT EXISTS idx_aggregate_id ON events(aggregate_id)'); this.db.run('CREATE INDEX IF NOT EXISTS idx_aggregate_type ON events(aggregate_type)'); this.db.run('CREATE INDEX IF NOT EXISTS idx_event_type ON events(type)'); this.db.run('CREATE INDEX IF NOT EXISTS idx_timestamp ON events(timestamp)'); this.db.run('CREATE INDEX IF NOT EXISTS idx_version ON events(version)'); this.db.run('CREATE UNIQUE INDEX IF NOT EXISTS idx_aggregate_version ON events(aggregate_id, version)'); // Snapshots table this.db.run(` CREATE TABLE IF NOT EXISTS snapshots ( aggregate_id TEXT PRIMARY KEY, aggregate_type TEXT NOT NULL, version INTEGER NOT NULL, state TEXT NOT NULL, timestamp INTEGER NOT NULL ) `); if (this.config.verbose) { console.log('[EventStore] Schema created successfully'); } } loadAggregateVersions() { if (!this.db) return; const stmt = this.db.prepare('SELECT aggregate_id, MAX(version) as max_version FROM events GROUP BY aggregate_id'); while (stmt.step()) { const row = stmt.getAsObject(); this.aggregateVersions.set(row.aggregate_id, row.max_version || 0); } stmt.free(); } rowToEvent(row) { return { id: row.id, type: row.type, aggregateId: row.aggregate_id, aggregateType: row.aggregate_type, version: row.version, timestamp: row.timestamp, source: row.source, payload: JSON.parse(row.payload), metadata: row.metadata ? JSON.parse(row.metadata) : undefined, causationId: row.causation_id, correlationId: row.correlation_id, }; } ensureInitialized() { if (!this.initialized || !this.db) { throw new Error('EventStore not initialized. Call initialize() first.'); } } } //# sourceMappingURL=event-store.js.map