tasq/node_modules/@claude-flow/shared/dist/resilience/bulkhead.js

206 lines
5.3 KiB
JavaScript

/**
* Bulkhead Pattern
*
* Isolates failures by limiting concurrent executions.
*
* @module v3/shared/resilience/bulkhead
*/
import { EventEmitter } from 'events';
/**
* Default options
*/
const DEFAULT_OPTIONS = {
maxConcurrent: 10,
maxQueue: 100,
queueTimeout: 30000,
};
/**
* Bulkhead
*
* Limits concurrent executions to prevent resource exhaustion.
*
* @example
* const bulkhead = new Bulkhead({
* name: 'database',
* maxConcurrent: 10,
* maxQueue: 50,
* });
*
* try {
* const result = await bulkhead.execute(() => dbQuery());
* } catch (error) {
* if (error.message.includes('Bulkhead full')) {
* // Handle capacity exceeded
* }
* }
*/
export class Bulkhead extends EventEmitter {
options;
active = 0;
queue = [];
completed = 0;
rejected = 0;
timedOut = 0;
constructor(options) {
super();
this.options = { ...DEFAULT_OPTIONS, ...options };
}
/**
* Execute a function within the bulkhead
*/
async execute(fn) {
// If there's room for execution, run immediately
if (this.active < this.options.maxConcurrent) {
return this.runNow(fn);
}
// Check if queue is full
if (this.queue.length >= this.options.maxQueue) {
this.rejected++;
this.options.onRejected?.('full');
throw new Error(`Bulkhead '${this.options.name}' is full. Max concurrent: ${this.options.maxConcurrent}, queue: ${this.options.maxQueue}`);
}
// Add to queue
return this.addToQueue(fn);
}
/**
* Get current statistics
*/
getStats() {
return {
active: this.active,
queued: this.queue.length,
maxConcurrent: this.options.maxConcurrent,
maxQueue: this.options.maxQueue,
completed: this.completed,
rejected: this.rejected,
timedOut: this.timedOut,
};
}
/**
* Check if there's capacity available
*/
hasCapacity() {
return this.active < this.options.maxConcurrent || this.queue.length < this.options.maxQueue;
}
/**
* Get available capacity (concurrent + queue)
*/
availableCapacity() {
const concurrentAvailable = this.options.maxConcurrent - this.active;
const queueAvailable = this.options.maxQueue - this.queue.length;
return concurrentAvailable + queueAvailable;
}
/**
* Reset statistics
*/
resetStats() {
this.completed = 0;
this.rejected = 0;
this.timedOut = 0;
}
/**
* Run function immediately
*/
async runNow(fn) {
this.active++;
this.emit('acquire');
try {
const result = await fn();
this.completed++;
return result;
}
finally {
this.active--;
this.emit('release');
this.processQueue();
}
}
/**
* Add function to queue
*/
addToQueue(fn) {
return new Promise((resolve, reject) => {
const item = {
fn,
resolve,
reject,
queuedAt: Date.now(),
};
// Set timeout for queued item
item.timeoutId = setTimeout(() => {
const index = this.queue.indexOf(item);
if (index !== -1) {
this.queue.splice(index, 1);
this.timedOut++;
this.options.onRejected?.('timeout');
reject(new Error(`Bulkhead '${this.options.name}' queue timeout after ${this.options.queueTimeout}ms`));
}
}, this.options.queueTimeout);
this.queue.push(item);
this.emit('queued', { queueLength: this.queue.length });
});
}
/**
* Process next item in queue
*/
processQueue() {
if (this.active >= this.options.maxConcurrent) {
return;
}
const item = this.queue.shift();
if (!item) {
return;
}
// Clear timeout
if (item.timeoutId) {
clearTimeout(item.timeoutId);
}
// Execute the queued function
this.active++;
this.emit('acquire');
item.fn()
.then((result) => {
this.completed++;
item.resolve(result);
})
.catch((error) => {
item.reject(error);
})
.finally(() => {
this.active--;
this.emit('release');
this.processQueue();
});
}
}
/**
* Create a semaphore for limiting concurrent access
*/
export function createSemaphore(maxConcurrent) {
let current = 0;
const waiting = [];
return {
async acquire() {
if (current < maxConcurrent) {
current++;
return;
}
return new Promise((resolve) => {
waiting.push(resolve);
});
},
release() {
const next = waiting.shift();
if (next) {
next();
}
else {
current = Math.max(0, current - 1);
}
},
available() {
return maxConcurrent - current;
},
};
}
//# sourceMappingURL=bulkhead.js.map