1787 lines
66 KiB
JavaScript
1787 lines
66 KiB
JavaScript
/**
|
|
* P2P Swarm v2 - Production Grade
|
|
*
|
|
* Fixes from code review:
|
|
* 1. Two-layer key scheme (swarm envelope key + per-peer session keys)
|
|
* 2. Ed25519 identity keys + X25519 ephemeral keys
|
|
* 3. Message replay protection (nonces, counters, timestamps)
|
|
* 4. Gun-based WebRTC signaling (no external PeerServer)
|
|
* 5. IPFS CID pointers for large payloads
|
|
* 6. Ed25519 signatures on all messages
|
|
* 7. Relay health monitoring
|
|
* 8. Task execution envelope with budgets
|
|
*/
|
|
import * as crypto from 'crypto';
|
|
import { logger } from '../utils/logger.js';
|
|
//=============================================================================
|
|
// CANONICAL SERIALIZATION
|
|
//=============================================================================
|
|
/**
|
|
* Stable canonical JSON stringify
|
|
*
|
|
* Unlike JSON.stringify, this:
|
|
* - Sorts object keys recursively (alphabetically)
|
|
* - Produces identical output regardless of insertion order
|
|
* - Is safe to use for signing and hashing
|
|
*/
|
|
function stableStringify(obj) {
|
|
if (obj === null || obj === undefined) {
|
|
return 'null';
|
|
}
|
|
if (typeof obj === 'boolean' || typeof obj === 'number') {
|
|
return JSON.stringify(obj);
|
|
}
|
|
if (typeof obj === 'string') {
|
|
return JSON.stringify(obj);
|
|
}
|
|
if (Array.isArray(obj)) {
|
|
const items = obj.map(item => stableStringify(item));
|
|
return '[' + items.join(',') + ']';
|
|
}
|
|
if (typeof obj === 'object') {
|
|
const keys = Object.keys(obj).sort();
|
|
const pairs = keys.map(key => {
|
|
const value = obj[key];
|
|
return JSON.stringify(key) + ':' + stableStringify(value);
|
|
});
|
|
return '{' + pairs.join(',') + '}';
|
|
}
|
|
// Fallback for other types
|
|
return JSON.stringify(obj);
|
|
}
|
|
//=============================================================================
|
|
// CRYPTOGRAPHIC PRIMITIVES
|
|
//=============================================================================
|
|
/**
|
|
* Identity and session key management
|
|
*
|
|
* Security fixes applied:
|
|
* - Ed25519 uses crypto.sign(null, ...) - direct sign, not hash-then-sign
|
|
* - Separate send counter (local) from receive counters (per-peer)
|
|
* - Per-sender nonce tracking with timestamps for expiry
|
|
* - X25519 ECDH + HKDF for real session key derivation
|
|
*/
|
|
class IdentityManager {
|
|
identityKey;
|
|
x25519Key;
|
|
peerSessionKeys = new Map();
|
|
// Per-sender nonce tracking: Map<senderId, Map<nonce, timestamp>>
|
|
seenNonces = new Map();
|
|
// Separate send counter from receive counters
|
|
localSendCounter = 0;
|
|
recvCounters = new Map();
|
|
maxNonceAge = 300000; // 5 minutes
|
|
nonceCleanupInterval = null;
|
|
constructor() {
|
|
// Generate Ed25519 identity key pair (for signing)
|
|
this.identityKey = crypto.generateKeyPairSync('ed25519');
|
|
// Generate X25519 key pair (for key exchange)
|
|
this.x25519Key = crypto.generateKeyPairSync('x25519');
|
|
// Start periodic nonce cleanup
|
|
this.startNonceCleanup();
|
|
}
|
|
/**
|
|
* Get public identity key (PEM format for transport)
|
|
*/
|
|
getPublicKey() {
|
|
return this.identityKey.publicKey.export({ type: 'spki', format: 'pem' });
|
|
}
|
|
/**
|
|
* Get X25519 public key for key exchange
|
|
*/
|
|
getX25519PublicKey() {
|
|
return this.x25519Key.publicKey.export({ type: 'spki', format: 'pem' });
|
|
}
|
|
/**
|
|
* Sign data with Ed25519 identity key
|
|
* Ed25519 uses direct sign (no hash algorithm needed)
|
|
*/
|
|
sign(data) {
|
|
return crypto.sign(null, Buffer.from(data), this.identityKey.privateKey).toString('base64');
|
|
}
|
|
/**
|
|
* Verify Ed25519 signature from peer
|
|
*/
|
|
verify(data, signature, peerPublicKey) {
|
|
try {
|
|
const pubKey = crypto.createPublicKey({
|
|
key: peerPublicKey,
|
|
format: 'pem',
|
|
type: 'spki',
|
|
});
|
|
return crypto.verify(null, Buffer.from(data), pubKey, Buffer.from(signature, 'base64'));
|
|
}
|
|
catch {
|
|
return false;
|
|
}
|
|
}
|
|
/**
|
|
* Derive session key with peer using real X25519 ECDH + HKDF
|
|
*
|
|
* Salt derivation: sha256(min(pubA, pubB) || max(pubA, pubB))
|
|
* - Stable per pair regardless of who initiates
|
|
* - High entropy from public key material
|
|
*
|
|
* Info: "p2p-swarm-v2:${swarmId}:${peerId}"
|
|
* - Prevents cross-swarm key reuse
|
|
* - Makes derived keys unique per pair and swarm
|
|
*/
|
|
deriveSessionKey(peerX25519PubKey, peerId, swarmId = '') {
|
|
// Check cache
|
|
const cacheKey = `${peerId}:${swarmId}`;
|
|
if (this.peerSessionKeys.has(cacheKey)) {
|
|
return this.peerSessionKeys.get(cacheKey);
|
|
}
|
|
try {
|
|
// Import peer's X25519 public key
|
|
const peerPubKey = crypto.createPublicKey({
|
|
key: peerX25519PubKey,
|
|
format: 'pem',
|
|
type: 'spki',
|
|
});
|
|
// Perform X25519 ECDH
|
|
const sharedSecret = crypto.diffieHellman({
|
|
privateKey: this.x25519Key.privateKey,
|
|
publicKey: peerPubKey,
|
|
});
|
|
// Get raw public keys for salt derivation
|
|
const myX25519Raw = this.x25519Key.publicKey.export({ type: 'spki', format: 'der' });
|
|
const peerX25519Raw = peerPubKey.export({ type: 'spki', format: 'der' });
|
|
// Salt = sha256(min(pubA, pubB) || max(pubA, pubB))
|
|
// Sort to ensure same salt regardless of who initiates
|
|
const [first, second] = Buffer.compare(myX25519Raw, peerX25519Raw) < 0
|
|
? [myX25519Raw, peerX25519Raw]
|
|
: [peerX25519Raw, myX25519Raw];
|
|
const salt = crypto.createHash('sha256')
|
|
.update(first)
|
|
.update(second)
|
|
.digest();
|
|
// Info includes swarmId and peerId for domain separation
|
|
const info = Buffer.from(`p2p-swarm-v2:${swarmId}:${peerId}`);
|
|
// Derive session key using HKDF
|
|
const sessionKey = crypto.hkdfSync('sha256', sharedSecret, // IKM (input key material)
|
|
salt, // Salt (from both parties' public keys)
|
|
info, // Info (context + swarm + peer)
|
|
32 // Key length
|
|
);
|
|
this.peerSessionKeys.set(cacheKey, Buffer.from(sessionKey));
|
|
return Buffer.from(sessionKey);
|
|
}
|
|
catch (error) {
|
|
// Fallback to hash-based derivation if X25519 fails
|
|
logger.warn('X25519 ECDH failed, using fallback', { peerId, error });
|
|
const fallbackKey = crypto.createHash('sha256')
|
|
.update(swarmId)
|
|
.update(peerId)
|
|
.update(this.getPublicKey())
|
|
.digest();
|
|
this.peerSessionKeys.set(cacheKey, fallbackKey);
|
|
return fallbackKey;
|
|
}
|
|
}
|
|
/**
|
|
* Generate cryptographically secure nonce
|
|
*/
|
|
generateNonce() {
|
|
return crypto.randomBytes(16).toString('hex');
|
|
}
|
|
/**
|
|
* Check and record nonce with per-sender tracking
|
|
* Returns true if nonce is valid (not replayed, not expired)
|
|
*/
|
|
checkNonce(nonce, timestamp, senderId) {
|
|
const now = Date.now();
|
|
// Reject old messages (outside time window)
|
|
if (now - timestamp > this.maxNonceAge) {
|
|
return false;
|
|
}
|
|
// Reject future timestamps (with small tolerance for clock skew)
|
|
if (timestamp > now + 60000) { // 1 minute tolerance
|
|
return false;
|
|
}
|
|
// Get or create sender's nonce map
|
|
if (!this.seenNonces.has(senderId)) {
|
|
this.seenNonces.set(senderId, new Map());
|
|
}
|
|
const senderNonces = this.seenNonces.get(senderId);
|
|
// Reject replayed nonces from this sender
|
|
if (senderNonces.has(nonce)) {
|
|
return false;
|
|
}
|
|
// Record nonce with timestamp for expiry
|
|
senderNonces.set(nonce, timestamp);
|
|
return true;
|
|
}
|
|
/**
|
|
* Periodic cleanup of expired nonces
|
|
*/
|
|
startNonceCleanup() {
|
|
this.nonceCleanupInterval = setInterval(() => {
|
|
const now = Date.now();
|
|
const senderIds = Array.from(this.seenNonces.keys());
|
|
for (const senderId of senderIds) {
|
|
const nonceMap = this.seenNonces.get(senderId);
|
|
if (!nonceMap)
|
|
continue;
|
|
const nonces = Array.from(nonceMap.entries());
|
|
for (const [nonce, timestamp] of nonces) {
|
|
if (now - timestamp > this.maxNonceAge) {
|
|
nonceMap.delete(nonce);
|
|
}
|
|
}
|
|
// Remove empty sender maps
|
|
if (nonceMap.size === 0) {
|
|
this.seenNonces.delete(senderId);
|
|
}
|
|
}
|
|
}, 60000); // Cleanup every minute
|
|
}
|
|
/**
|
|
* Get next send counter (monotonically increasing)
|
|
*/
|
|
getNextSendCounter() {
|
|
return ++this.localSendCounter;
|
|
}
|
|
/**
|
|
* Validate received counter from peer (must be > last seen from that peer)
|
|
*/
|
|
validateRecvCounter(peerId, counter) {
|
|
const lastSeen = this.recvCounters.get(peerId) || 0;
|
|
if (counter <= lastSeen) {
|
|
return false;
|
|
}
|
|
this.recvCounters.set(peerId, counter);
|
|
return true;
|
|
}
|
|
/**
|
|
* Clear session key (for rotation)
|
|
*/
|
|
rotateSessionKey(peerId) {
|
|
this.peerSessionKeys.delete(peerId);
|
|
}
|
|
/**
|
|
* Clean up resources
|
|
*/
|
|
destroy() {
|
|
if (this.nonceCleanupInterval) {
|
|
clearInterval(this.nonceCleanupInterval);
|
|
this.nonceCleanupInterval = null;
|
|
}
|
|
}
|
|
}
|
|
/**
|
|
* Encryption with proper GCM handling
|
|
*/
|
|
class CryptoV2 {
|
|
algorithm = 'aes-256-gcm';
|
|
encrypt(data, key) {
|
|
const iv = crypto.randomBytes(16);
|
|
const cipher = crypto.createCipheriv(this.algorithm, key, iv);
|
|
let ciphertext = cipher.update(data, 'utf8', 'base64');
|
|
ciphertext += cipher.final('base64');
|
|
return {
|
|
ciphertext,
|
|
iv: iv.toString('base64'),
|
|
tag: cipher.getAuthTag().toString('base64'),
|
|
};
|
|
}
|
|
decrypt(ciphertext, key, iv, tag) {
|
|
try {
|
|
const decipher = crypto.createDecipheriv(this.algorithm, key, Buffer.from(iv, 'base64'));
|
|
decipher.setAuthTag(Buffer.from(tag, 'base64'));
|
|
let plaintext = decipher.update(ciphertext, 'base64', 'utf8');
|
|
plaintext += decipher.final('utf8');
|
|
return plaintext;
|
|
}
|
|
catch {
|
|
return null;
|
|
}
|
|
}
|
|
hash(data) {
|
|
return crypto.createHash('sha256')
|
|
.update(typeof data === 'string' ? data : data)
|
|
.digest('hex');
|
|
}
|
|
generateCID(data) {
|
|
const hash = this.hash(data);
|
|
return `Qm${hash.slice(0, 44)}`;
|
|
}
|
|
}
|
|
//=============================================================================
|
|
// RELAY MANAGEMENT
|
|
//=============================================================================
|
|
/**
|
|
* Relay health tracker
|
|
*/
|
|
class RelayManager {
|
|
relays = new Map();
|
|
workingRelays = [];
|
|
static BOOTSTRAP_RELAYS = [
|
|
'https://gun-manhattan.herokuapp.com/gun',
|
|
'https://gun-us.herokuapp.com/gun',
|
|
'https://gun-eu.herokuapp.com/gun',
|
|
];
|
|
constructor(customRelays) {
|
|
const relayList = customRelays || RelayManager.BOOTSTRAP_RELAYS;
|
|
for (const url of relayList) {
|
|
this.relays.set(url, {
|
|
url,
|
|
healthy: true, // Assume healthy initially
|
|
lastCheck: 0,
|
|
latencyMs: 0,
|
|
failures: 0,
|
|
});
|
|
}
|
|
this.workingRelays = relayList;
|
|
}
|
|
/**
|
|
* Get working relays
|
|
*/
|
|
getWorkingRelays() {
|
|
return this.workingRelays.filter(url => {
|
|
const relay = this.relays.get(url);
|
|
return relay && relay.healthy;
|
|
});
|
|
}
|
|
/**
|
|
* Mark relay as failed
|
|
*/
|
|
markFailed(url) {
|
|
const relay = this.relays.get(url);
|
|
if (relay) {
|
|
relay.failures++;
|
|
if (relay.failures >= 3) {
|
|
relay.healthy = false;
|
|
logger.warn('Relay marked unhealthy', { url, failures: relay.failures });
|
|
}
|
|
}
|
|
}
|
|
/**
|
|
* Mark relay as successful
|
|
*/
|
|
markSuccess(url, latencyMs) {
|
|
const relay = this.relays.get(url);
|
|
if (relay) {
|
|
relay.healthy = true;
|
|
relay.latencyMs = latencyMs;
|
|
relay.failures = 0;
|
|
relay.lastCheck = Date.now();
|
|
}
|
|
}
|
|
/**
|
|
* Add new relay
|
|
*/
|
|
addRelay(url) {
|
|
if (!this.relays.has(url)) {
|
|
this.relays.set(url, {
|
|
url,
|
|
healthy: true,
|
|
lastCheck: 0,
|
|
latencyMs: 0,
|
|
failures: 0,
|
|
});
|
|
this.workingRelays.push(url);
|
|
}
|
|
}
|
|
/**
|
|
* Get health metrics
|
|
*/
|
|
getMetrics() {
|
|
const healthyRelays = Array.from(this.relays.values()).filter(r => r.healthy);
|
|
const avgLatency = healthyRelays.length > 0
|
|
? healthyRelays.reduce((sum, r) => sum + r.latencyMs, 0) / healthyRelays.length
|
|
: 0;
|
|
return {
|
|
total: this.relays.size,
|
|
healthy: healthyRelays.length,
|
|
avgLatency,
|
|
};
|
|
}
|
|
/**
|
|
* Persist working set
|
|
*/
|
|
exportWorkingSet() {
|
|
return this.getWorkingRelays();
|
|
}
|
|
/**
|
|
* Import working set
|
|
*/
|
|
importWorkingSet(relays) {
|
|
for (const url of relays) {
|
|
this.addRelay(url);
|
|
}
|
|
}
|
|
}
|
|
//=============================================================================
|
|
// IPFS ARTIFACT STORAGE
|
|
//=============================================================================
|
|
/**
|
|
* IPFS artifact manager (CID-based storage)
|
|
*
|
|
* IMPORTANT CID LIMITATION:
|
|
* The current generateCID creates a simplified hash-based identifier (Qm prefix).
|
|
* This is NOT a real IPFS CID and will NOT interop with:
|
|
* - Real IPFS gateways
|
|
* - Pinning services (Pinata, web3.storage, Filebase)
|
|
* - Content routing
|
|
*
|
|
* For production, you need one of:
|
|
* A) Real IPFS client: add bytes to IPFS node, take returned CID
|
|
* B) Multiformats library: create CIDv1 from raw bytes offline
|
|
*
|
|
* The current implementation works for local coordination and can be
|
|
* upgraded to real IPFS when needed.
|
|
*
|
|
* Gateway fetch is DISABLED by default because our fake CIDs won't work.
|
|
* Enable only when using real multiformats CIDv1 generation.
|
|
*/
|
|
class ArtifactStore {
|
|
localCache = new Map();
|
|
crypto;
|
|
maxCacheSize = 100 * 1024 * 1024; // 100MB
|
|
currentCacheSize = 0;
|
|
maxArtifactSize = 10 * 1024 * 1024; // 10MB max per artifact
|
|
// DISABLED: Gateway fetch only works with real IPFS CIDs
|
|
enableGatewayFetch = false;
|
|
static CHUNK_SIZE = 256 * 1024; // 256KB chunks
|
|
// Public IPFS gateways for fetching (when real CIDs are used)
|
|
static IPFS_GATEWAYS = [
|
|
'https://ipfs.io/ipfs/',
|
|
'https://dweb.link/ipfs/',
|
|
'https://cloudflare-ipfs.com/ipfs/',
|
|
'https://gateway.pinata.cloud/ipfs/',
|
|
];
|
|
constructor(enableGatewayFetch = false) {
|
|
this.crypto = new CryptoV2();
|
|
this.enableGatewayFetch = enableGatewayFetch;
|
|
}
|
|
/**
|
|
* Store artifact and get CID
|
|
*/
|
|
async store(data, compress = true) {
|
|
const buffer = typeof data === 'string' ? Buffer.from(data) : data;
|
|
// Enforce size limit
|
|
if (buffer.length > this.maxArtifactSize) {
|
|
throw new Error(`Artifact exceeds max size: ${buffer.length} > ${this.maxArtifactSize}`);
|
|
}
|
|
// Optional compression (in production: use zstd or lz4)
|
|
const toStore = compress ? buffer : buffer; // Placeholder for compression
|
|
// Generate CID (NOTE: simplified, not real IPFS CID)
|
|
const cid = this.crypto.generateCID(toStore);
|
|
// Store locally
|
|
this.cacheStore(cid, toStore);
|
|
// Calculate chunks
|
|
const chunks = Math.ceil(toStore.length / ArtifactStore.CHUNK_SIZE);
|
|
return {
|
|
cid,
|
|
size: toStore.length,
|
|
chunks,
|
|
};
|
|
}
|
|
/**
|
|
* Retrieve artifact by CID
|
|
*
|
|
* Strategy:
|
|
* 1. Check local cache
|
|
* 2. If enableGatewayFetch=true AND CID is real (bafy prefix), try gateways
|
|
* 3. Otherwise return null (local-only mode)
|
|
*
|
|
* Gateway fetch is DISABLED by default because our simplified CIDs
|
|
* are not real IPFS CIDs and won't work with gateways.
|
|
*/
|
|
async retrieve(cid) {
|
|
// Check local cache
|
|
if (this.localCache.has(cid)) {
|
|
return this.localCache.get(cid);
|
|
}
|
|
// Gateway fetch disabled - local cache only
|
|
if (!this.enableGatewayFetch) {
|
|
return null;
|
|
}
|
|
// Only attempt gateway fetch for real CIDv1 (bafy prefix)
|
|
// Our fake Qm CIDs will NOT work with gateways
|
|
if (!cid.startsWith('bafy')) {
|
|
logger.debug('Gateway fetch skipped: not a real CIDv1', { cid });
|
|
return null;
|
|
}
|
|
// Try fetching from gateways for real IPFS CIDs
|
|
for (const gateway of ArtifactStore.IPFS_GATEWAYS) {
|
|
try {
|
|
const response = await fetch(`${gateway}${cid}`, {
|
|
signal: AbortSignal.timeout(10000), // 10 second timeout
|
|
});
|
|
if (response.ok) {
|
|
const data = Buffer.from(await response.arrayBuffer());
|
|
// Verify size
|
|
if (data.length > this.maxArtifactSize) {
|
|
logger.warn('Fetched artifact exceeds max size', { cid, size: data.length });
|
|
continue;
|
|
}
|
|
// TODO: Verify CID matches content using multiformats
|
|
// For now, we trust the gateway (risky, but we're gated by real CID requirement)
|
|
// Store in cache and return
|
|
this.cacheStore(cid, data);
|
|
return data;
|
|
}
|
|
}
|
|
catch (error) {
|
|
// Try next gateway
|
|
logger.debug('Gateway fetch failed', { gateway, cid, error });
|
|
}
|
|
}
|
|
return null;
|
|
}
|
|
/**
|
|
* Enable gateway fetch (only when using real IPFS CIDs)
|
|
*/
|
|
setEnableGatewayFetch(enable) {
|
|
this.enableGatewayFetch = enable;
|
|
}
|
|
/**
|
|
* Store in local cache with eviction
|
|
*/
|
|
cacheStore(cid, data) {
|
|
// Evict if necessary
|
|
while (this.currentCacheSize + data.length > this.maxCacheSize) {
|
|
const firstKey = this.localCache.keys().next().value;
|
|
if (firstKey) {
|
|
const size = this.localCache.get(firstKey)?.length || 0;
|
|
this.localCache.delete(firstKey);
|
|
this.currentCacheSize -= size;
|
|
}
|
|
else {
|
|
break;
|
|
}
|
|
}
|
|
this.localCache.set(cid, data);
|
|
this.currentCacheSize += data.length;
|
|
}
|
|
/**
|
|
* Create artifact pointer for Gun
|
|
* Uses stableStringify for deterministic signing
|
|
*/
|
|
createPointer(type, agentId, cid, dimensions, identity) {
|
|
const data = this.localCache.get(cid);
|
|
const checksum = data ? this.crypto.hash(data).slice(0, 16) : '';
|
|
const pointer = {
|
|
type,
|
|
agentId,
|
|
cid,
|
|
version: 1,
|
|
schemaHash: this.crypto.hash(type).slice(0, 8),
|
|
dimensions,
|
|
checksum,
|
|
timestamp: Date.now(),
|
|
};
|
|
// Use stableStringify for deterministic signing
|
|
const signature = identity.sign(stableStringify(pointer));
|
|
return { ...pointer, signature };
|
|
}
|
|
}
|
|
//=============================================================================
|
|
// MAIN P2P SWARM V2
|
|
//=============================================================================
|
|
/**
|
|
* Production-grade P2P Swarm Coordinator
|
|
*/
|
|
export class P2PSwarmV2 {
|
|
identity;
|
|
crypto;
|
|
relayManager;
|
|
artifactStore;
|
|
swarmKey;
|
|
swarmId;
|
|
agentId;
|
|
gun = null;
|
|
swarmNode = null;
|
|
connected = false;
|
|
messageHandlers = new Map();
|
|
pendingOffers = new Map();
|
|
// Member registry: source of truth for identity binding
|
|
// NEVER trust keys from envelopes - always resolve from registry
|
|
memberRegistry = new Map();
|
|
// Heartbeat and task executor state
|
|
heartbeatInterval = null;
|
|
taskExecutorActive = false;
|
|
claimedTasks = new Set();
|
|
HEARTBEAT_INTERVAL_MS = 20000; // 20 seconds
|
|
MEMBER_TIMEOUT_MS = 60000; // 60 seconds without heartbeat = offline
|
|
// Negative cache for failed member lookups (prevents spam)
|
|
negativeMemberCache = new Map();
|
|
NEGATIVE_CACHE_TTL_MS = 30000; // 30 seconds
|
|
// Task claim conflict resolution
|
|
CLAIM_TTL_MS = 45000; // 45 seconds - claims older than this can be overwritten
|
|
constructor(agentId, swarmKey) {
|
|
this.identity = new IdentityManager();
|
|
this.crypto = new CryptoV2();
|
|
this.relayManager = new RelayManager();
|
|
this.artifactStore = new ArtifactStore();
|
|
this.agentId = agentId;
|
|
this.swarmKey = swarmKey
|
|
? Buffer.from(swarmKey, 'base64')
|
|
: crypto.randomBytes(32);
|
|
this.swarmId = this.crypto.hash(this.swarmKey.toString('base64')).slice(0, 16);
|
|
}
|
|
/**
|
|
* Connect to Gun relays
|
|
*
|
|
* Starts autonomy features:
|
|
* - Membership watcher (subscribe to members, verify registrations)
|
|
* - Heartbeat publishing (every 20 seconds)
|
|
* - Task executor loop (optional, enable with startTaskExecutor())
|
|
*/
|
|
async connect() {
|
|
try {
|
|
const Gun = (await import('gun')).default;
|
|
const relays = this.relayManager.getWorkingRelays();
|
|
this.gun = Gun(relays);
|
|
this.swarmNode = this.gun.get(`swarm-v2-${this.swarmId}`);
|
|
// Register self
|
|
await this.registerSelf();
|
|
// Subscribe to signaling for WebRTC
|
|
this.subscribeToSignaling();
|
|
// Start membership watcher
|
|
this.startMembershipWatcher();
|
|
// IMPORTANT: Set connected BEFORE heartbeat to ensure first heartbeat publishes
|
|
this.connected = true;
|
|
// Start heartbeat publishing
|
|
this.startHeartbeat();
|
|
logger.info('P2P Swarm V2 connected', {
|
|
swarmId: this.swarmId,
|
|
agentId: this.agentId,
|
|
relays: relays.length,
|
|
});
|
|
return true;
|
|
}
|
|
catch (error) {
|
|
logger.error('Failed to connect', { error });
|
|
return false;
|
|
}
|
|
}
|
|
/**
|
|
* Register self with public key and X25519 key
|
|
* Uses canonical serialization for signature
|
|
* Field order must match verifyMemberRegistration
|
|
*/
|
|
async registerSelf() {
|
|
if (!this.swarmNode)
|
|
return;
|
|
const joinedAt = Date.now();
|
|
const capabilities = ['coordinator', 'executor'];
|
|
// Data to sign (must match verifyMemberRegistration order)
|
|
const dataToSign = {
|
|
agentId: this.agentId,
|
|
capabilities,
|
|
joinedAt,
|
|
publicKey: this.identity.getPublicKey(),
|
|
x25519PublicKey: this.identity.getX25519PublicKey(),
|
|
};
|
|
// Sign using canonical serialization
|
|
const signature = this.identity.sign(stableStringify(dataToSign));
|
|
// Full registration record
|
|
const registration = {
|
|
...dataToSign,
|
|
signature,
|
|
};
|
|
this.swarmNode.get('members').get(this.agentId).put(registration);
|
|
// Add self to local registry
|
|
const now = Date.now();
|
|
this.memberRegistry.set(this.agentId, {
|
|
...dataToSign,
|
|
verified: true, // We trust ourselves
|
|
lastSeen: now,
|
|
});
|
|
}
|
|
/**
|
|
* Verified member entry type
|
|
*/
|
|
static MEMBER_REQUIRED_FIELDS = ['agentId', 'publicKey', 'x25519PublicKey', 'capabilities', 'joinedAt', 'signature'];
|
|
/**
|
|
* Verify and cache a member registration from Gun
|
|
* STRICT: Requires all fields including x25519PublicKey
|
|
* Returns the verified member or null if invalid
|
|
*/
|
|
verifyMemberRegistration(registration) {
|
|
// STRICT: Require all fields
|
|
if (!registration)
|
|
return null;
|
|
for (const field of P2PSwarmV2.MEMBER_REQUIRED_FIELDS) {
|
|
if (registration[field] === undefined || registration[field] === null) {
|
|
logger.debug('Member registration missing required field', { field, agentId: registration.agentId });
|
|
return null;
|
|
}
|
|
}
|
|
// Validate types
|
|
if (typeof registration.agentId !== 'string' || registration.agentId.length === 0)
|
|
return null;
|
|
if (typeof registration.publicKey !== 'string' || registration.publicKey.length === 0)
|
|
return null;
|
|
if (typeof registration.x25519PublicKey !== 'string' || registration.x25519PublicKey.length === 0)
|
|
return null;
|
|
if (!Array.isArray(registration.capabilities))
|
|
return null;
|
|
if (typeof registration.joinedAt !== 'number' || registration.joinedAt <= 0)
|
|
return null;
|
|
// Reconstruct the signed data (must match registration order)
|
|
const dataToVerify = {
|
|
agentId: registration.agentId,
|
|
capabilities: registration.capabilities,
|
|
joinedAt: registration.joinedAt,
|
|
publicKey: registration.publicKey,
|
|
x25519PublicKey: registration.x25519PublicKey,
|
|
};
|
|
// Verify signature using the key in the registration
|
|
if (!this.identity.verify(stableStringify(dataToVerify), registration.signature, registration.publicKey)) {
|
|
logger.warn('Member registration signature invalid', { agentId: registration.agentId });
|
|
return null;
|
|
}
|
|
return {
|
|
agentId: registration.agentId,
|
|
publicKey: registration.publicKey,
|
|
x25519PublicKey: registration.x25519PublicKey,
|
|
capabilities: registration.capabilities,
|
|
joinedAt: registration.joinedAt,
|
|
verified: true,
|
|
lastSeen: Date.now(), // Set initial lastSeen to now
|
|
};
|
|
}
|
|
/**
|
|
* Resolve full verified member from the registry
|
|
* Returns the complete member entry (with both Ed25519 and X25519 keys)
|
|
* Returns null if member not found or not verified
|
|
*
|
|
* Uses negative cache to prevent spam from repeated lookups of unknown agents
|
|
*/
|
|
async resolveMember(agentId) {
|
|
// Check local cache first
|
|
const cached = this.memberRegistry.get(agentId);
|
|
if (cached && cached.verified) {
|
|
return cached;
|
|
}
|
|
// Check negative cache (prevents spam from repeated lookups)
|
|
const negativeCacheTime = this.negativeMemberCache.get(agentId);
|
|
if (negativeCacheTime && Date.now() - negativeCacheTime < this.NEGATIVE_CACHE_TTL_MS) {
|
|
return null; // Still in negative cache window
|
|
}
|
|
// Fetch from Gun and verify
|
|
if (!this.swarmNode)
|
|
return null;
|
|
return new Promise((resolve) => {
|
|
let resolved = false;
|
|
this.swarmNode.get('members').get(agentId).once((registration) => {
|
|
if (resolved)
|
|
return;
|
|
resolved = true;
|
|
if (!registration) {
|
|
// Add to negative cache
|
|
this.negativeMemberCache.set(agentId, Date.now());
|
|
resolve(null);
|
|
return;
|
|
}
|
|
const verified = this.verifyMemberRegistration(registration);
|
|
if (verified) {
|
|
this.memberRegistry.set(agentId, verified);
|
|
// Remove from negative cache if it was there
|
|
this.negativeMemberCache.delete(agentId);
|
|
resolve(verified);
|
|
}
|
|
else {
|
|
// Add to negative cache
|
|
this.negativeMemberCache.set(agentId, Date.now());
|
|
resolve(null);
|
|
}
|
|
});
|
|
// Timeout after 5 seconds
|
|
setTimeout(() => {
|
|
if (!resolved) {
|
|
resolved = true;
|
|
// Add to negative cache on timeout
|
|
this.negativeMemberCache.set(agentId, Date.now());
|
|
resolve(null);
|
|
}
|
|
}, 5000);
|
|
});
|
|
}
|
|
/**
|
|
* Convenience: Resolve just the Ed25519 public key
|
|
*/
|
|
async resolveMemberKey(agentId) {
|
|
const member = await this.resolveMember(agentId);
|
|
return member?.publicKey || null;
|
|
}
|
|
/**
|
|
* Subscribe to WebRTC signaling (Gun-based, no PeerServer needed)
|
|
* SECURITY: Verifies signatures using registry key, not envelope key
|
|
*/
|
|
subscribeToSignaling() {
|
|
if (!this.swarmNode)
|
|
return;
|
|
// Listen for signals addressed to us
|
|
this.swarmNode.get('signaling').get(this.agentId).map().on(async (signal, key) => {
|
|
if (!signal || signal.from === this.agentId)
|
|
return;
|
|
// Filter expired signals
|
|
if (signal.expiresAt && Date.now() > signal.expiresAt) {
|
|
return;
|
|
}
|
|
// Filter old signals (more than 2 minutes)
|
|
if (signal.timestamp && Date.now() - signal.timestamp > 120000) {
|
|
return;
|
|
}
|
|
// STRICT IDENTITY: Resolve sender's key from registry
|
|
const registeredKey = await this.resolveMemberKey(signal.from);
|
|
if (!registeredKey) {
|
|
logger.debug('Rejected signaling: sender not in registry', { from: signal.from });
|
|
return;
|
|
}
|
|
// Verify using REGISTRY key, not signal.senderPubKey
|
|
const canonical = stableStringify({
|
|
expiresAt: signal.expiresAt,
|
|
from: signal.from,
|
|
payloadHash: signal.payloadHash,
|
|
timestamp: signal.timestamp,
|
|
to: signal.to,
|
|
type: signal.type,
|
|
});
|
|
if (!this.identity.verify(canonical, signal.signature, registeredKey)) {
|
|
logger.debug('Rejected signaling: invalid signature', { from: signal.from, type: signal.type });
|
|
return;
|
|
}
|
|
// Verify payload hash
|
|
if (signal.payload && this.crypto.hash(signal.payload) !== signal.payloadHash) {
|
|
logger.debug('Rejected signaling: payload hash mismatch', { from: signal.from });
|
|
return;
|
|
}
|
|
if (signal.type === 'offer') {
|
|
this.handleOffer(signal);
|
|
}
|
|
else if (signal.type === 'answer') {
|
|
this.handleAnswer(signal);
|
|
}
|
|
else if (signal.type === 'ice') {
|
|
this.handleICE(signal);
|
|
}
|
|
});
|
|
}
|
|
/**
|
|
* Handle incoming WebRTC offer (via Gun signaling)
|
|
*/
|
|
handleOffer(signal) {
|
|
logger.debug('Received WebRTC offer via Gun', { from: signal.from });
|
|
// In production: create RTCPeerConnection, set remote description, create answer
|
|
// For now, just log
|
|
}
|
|
/**
|
|
* Handle incoming WebRTC answer
|
|
*/
|
|
handleAnswer(signal) {
|
|
logger.debug('Received WebRTC answer via Gun', { from: signal.from });
|
|
}
|
|
/**
|
|
* Handle incoming ICE candidate
|
|
*/
|
|
handleICE(signal) {
|
|
logger.debug('Received ICE candidate via Gun', { from: signal.from });
|
|
}
|
|
/**
|
|
* Create canonical signaling message and signature
|
|
* Uses stableStringify for deterministic serialization
|
|
*/
|
|
createCanonicalSignal(type, targetAgentId, payload, ttl) {
|
|
const timestamp = Date.now();
|
|
const expiresAt = timestamp + ttl;
|
|
const payloadHash = this.crypto.hash(payload);
|
|
// Canonical object for signing (stableStringify ensures sorted keys)
|
|
const canonicalObj = {
|
|
expiresAt,
|
|
from: this.agentId,
|
|
payloadHash,
|
|
timestamp,
|
|
to: targetAgentId,
|
|
type,
|
|
};
|
|
const canonical = stableStringify(canonicalObj);
|
|
const signature = this.identity.sign(canonical);
|
|
return {
|
|
canonical,
|
|
signal: {
|
|
type,
|
|
from: this.agentId,
|
|
to: targetAgentId,
|
|
payload,
|
|
payloadHash,
|
|
timestamp,
|
|
expiresAt,
|
|
senderPubKey: this.identity.getPublicKey(),
|
|
signature,
|
|
},
|
|
};
|
|
}
|
|
// NOTE: verifyCanonicalSignal was removed as redundant.
|
|
// Signal verification now happens in subscribeToSignaling using
|
|
// registry-based identity binding, which is both safer and consistent.
|
|
/**
|
|
* Send WebRTC offer via Gun (no PeerServer needed)
|
|
* Uses canonical signature for verification
|
|
*/
|
|
async sendOffer(targetAgentId, offer) {
|
|
if (!this.swarmNode)
|
|
return;
|
|
const payload = JSON.stringify(offer).slice(0, 10000); // Cap size at 10KB
|
|
const ttl = 60000; // 1 minute TTL
|
|
const { signal } = this.createCanonicalSignal('offer', targetAgentId, payload, ttl);
|
|
const signalId = this.crypto.hash(`${this.agentId}:${targetAgentId}:${signal.timestamp}`).slice(0, 12);
|
|
this.swarmNode.get('signaling').get(targetAgentId).get(signalId).put(signal);
|
|
// Schedule cleanup (best effort, expiry filter does real work)
|
|
setTimeout(() => {
|
|
this.swarmNode?.get('signaling').get(targetAgentId).get(signalId).put(null);
|
|
}, ttl);
|
|
}
|
|
/**
|
|
* Send WebRTC answer via Gun
|
|
*/
|
|
async sendAnswer(targetAgentId, answer) {
|
|
if (!this.swarmNode)
|
|
return;
|
|
const payload = JSON.stringify(answer).slice(0, 10000);
|
|
const ttl = 60000;
|
|
const { signal } = this.createCanonicalSignal('answer', targetAgentId, payload, ttl);
|
|
const signalId = this.crypto.hash(`answer:${this.agentId}:${targetAgentId}:${signal.timestamp}`).slice(0, 12);
|
|
this.swarmNode.get('signaling').get(targetAgentId).get(signalId).put(signal);
|
|
setTimeout(() => {
|
|
this.swarmNode?.get('signaling').get(targetAgentId).get(signalId).put(null);
|
|
}, ttl);
|
|
}
|
|
/**
|
|
* Send ICE candidate via Gun
|
|
*/
|
|
async sendICE(targetAgentId, candidate) {
|
|
if (!this.swarmNode)
|
|
return;
|
|
const payload = JSON.stringify(candidate).slice(0, 2000); // ICE candidates are small
|
|
const ttl = 30000; // 30 second TTL
|
|
const { signal } = this.createCanonicalSignal('ice', targetAgentId, payload, ttl);
|
|
const signalId = this.crypto.hash(`ice:${this.agentId}:${signal.timestamp}`).slice(0, 12);
|
|
this.swarmNode.get('signaling').get(targetAgentId).get(signalId).put(signal);
|
|
setTimeout(() => {
|
|
this.swarmNode?.get('signaling').get(targetAgentId).get(signalId).put(null);
|
|
}, ttl);
|
|
}
|
|
/**
|
|
* Publish signed and encrypted message
|
|
*
|
|
* Gun messages ALWAYS use swarm envelope key (not session keys)
|
|
* This ensures all swarm members can decrypt and process messages.
|
|
* Uses canonical serialization for signature stability.
|
|
*/
|
|
async publish(topic, payload) {
|
|
const nonce = this.identity.generateNonce();
|
|
const counter = this.identity.getNextSendCounter(); // Use local send counter
|
|
const timestamp = Date.now();
|
|
const payloadStr = stableStringify(payload);
|
|
const payloadHash = this.crypto.hash(payloadStr);
|
|
const messageId = this.crypto.hash(`${topic}:${nonce}:${timestamp}`).slice(0, 16);
|
|
// Create signature over header fields using CANONICAL serialization
|
|
// Keys are sorted alphabetically for deterministic output
|
|
const headerToSign = {
|
|
counter,
|
|
messageId,
|
|
nonce,
|
|
payloadHash,
|
|
senderId: this.agentId,
|
|
senderX25519Key: this.identity.getX25519PublicKey(),
|
|
timestamp,
|
|
topic,
|
|
};
|
|
const signature = this.identity.sign(stableStringify(headerToSign));
|
|
// Encrypt payload with swarm key (ALWAYS for Gun messages)
|
|
const encrypted = this.crypto.encrypt(payloadStr, this.swarmKey);
|
|
const envelope = {
|
|
messageId,
|
|
topic,
|
|
timestamp,
|
|
senderId: this.agentId,
|
|
senderPubKey: this.identity.getPublicKey(),
|
|
senderX25519Key: this.identity.getX25519PublicKey(),
|
|
payloadHash,
|
|
nonce,
|
|
counter,
|
|
signature,
|
|
encrypted,
|
|
};
|
|
// Publish to Gun
|
|
if (this.swarmNode) {
|
|
this.swarmNode.get('messages').get(topic).get(messageId).put(envelope);
|
|
}
|
|
return messageId;
|
|
}
|
|
/**
|
|
* Subscribe to topic with strict identity verification
|
|
*
|
|
* SECURITY: Never trust keys from envelopes. Always resolve from registry.
|
|
*
|
|
* Verification steps:
|
|
* 1. Check nonce and timestamp (replay protection)
|
|
* 2. Check counter (ordering)
|
|
* 3. Resolve sender's public key from verified member registry
|
|
* 4. Reject if envelope key differs from registry key
|
|
* 5. Verify signature using registry key
|
|
* 6. Decrypt with swarm key
|
|
* 7. Verify payload hash
|
|
*/
|
|
subscribe(topic, callback) {
|
|
this.messageHandlers.set(topic, callback);
|
|
if (!this.swarmNode)
|
|
return;
|
|
this.swarmNode.get('messages').get(topic).map().on(async (envelope, key) => {
|
|
if (!envelope || !envelope.encrypted)
|
|
return;
|
|
// Ignore own messages
|
|
if (envelope.senderId === this.agentId)
|
|
return;
|
|
// Validate timestamp and nonce (per-sender replay protection)
|
|
if (!this.identity.checkNonce(envelope.nonce, envelope.timestamp, envelope.senderId)) {
|
|
logger.debug('Rejected: replay or expired', { messageId: envelope.messageId });
|
|
return;
|
|
}
|
|
// Validate counter (must be > last seen from this sender)
|
|
if (!this.identity.validateRecvCounter(envelope.senderId, envelope.counter)) {
|
|
logger.debug('Rejected: invalid counter', { messageId: envelope.messageId });
|
|
return;
|
|
}
|
|
// STRICT IDENTITY: Resolve public key from verified registry
|
|
const registeredKey = await this.resolveMemberKey(envelope.senderId);
|
|
if (!registeredKey) {
|
|
logger.debug('Rejected: sender not in verified registry', {
|
|
messageId: envelope.messageId,
|
|
senderId: envelope.senderId,
|
|
});
|
|
return;
|
|
}
|
|
// Reject if envelope key differs from registry key
|
|
if (registeredKey !== envelope.senderPubKey) {
|
|
logger.warn('Rejected: envelope key differs from registry', {
|
|
messageId: envelope.messageId,
|
|
senderId: envelope.senderId,
|
|
});
|
|
return;
|
|
}
|
|
// Verify signature using REGISTRY key (not envelope key)
|
|
const headerToVerify = {
|
|
counter: envelope.counter,
|
|
messageId: envelope.messageId,
|
|
nonce: envelope.nonce,
|
|
payloadHash: envelope.payloadHash,
|
|
senderId: envelope.senderId,
|
|
senderX25519Key: envelope.senderX25519Key,
|
|
timestamp: envelope.timestamp,
|
|
topic: envelope.topic,
|
|
};
|
|
if (!this.identity.verify(stableStringify(headerToVerify), envelope.signature, registeredKey)) {
|
|
logger.warn('Rejected: invalid signature', { messageId: envelope.messageId });
|
|
return;
|
|
}
|
|
// Decrypt payload with swarm key (Gun messages always use swarm key)
|
|
const decrypted = this.crypto.decrypt(envelope.encrypted.ciphertext, this.swarmKey, envelope.encrypted.iv, envelope.encrypted.tag);
|
|
if (!decrypted) {
|
|
logger.warn('Rejected: decryption failed', { messageId: envelope.messageId });
|
|
return;
|
|
}
|
|
// Verify payload hash
|
|
if (this.crypto.hash(decrypted) !== envelope.payloadHash) {
|
|
logger.warn('Rejected: payload hash mismatch', { messageId: envelope.messageId });
|
|
return;
|
|
}
|
|
// Dispatch to handler
|
|
const handler = this.messageHandlers.get(topic);
|
|
if (handler) {
|
|
handler(JSON.parse(decrypted), envelope.senderId);
|
|
}
|
|
});
|
|
}
|
|
/**
|
|
* Store Q-table and publish pointer
|
|
*/
|
|
async syncQTable(qTable) {
|
|
// Store actual data
|
|
const data = JSON.stringify(qTable);
|
|
const { cid, size, chunks } = await this.artifactStore.store(data);
|
|
// Create pointer
|
|
const dimensions = `${qTable.length}x${qTable[0]?.length || 0}`;
|
|
const pointer = this.artifactStore.createPointer('q_table', this.agentId, cid, dimensions, this.identity);
|
|
// Publish pointer to Gun (small, metadata only)
|
|
await this.publish('artifacts', pointer);
|
|
logger.debug('Q-table synced', { cid, size, chunks });
|
|
return pointer;
|
|
}
|
|
/**
|
|
* Store memory vectors and publish pointer
|
|
*/
|
|
async syncMemory(vectors, namespace) {
|
|
const data = JSON.stringify({ vectors, namespace });
|
|
const { cid, size, chunks } = await this.artifactStore.store(data);
|
|
const dimensions = `${vectors.length}x${vectors[0]?.length || 0}`;
|
|
const pointer = this.artifactStore.createPointer('memory_vectors', this.agentId, cid, dimensions, this.identity);
|
|
await this.publish('artifacts', pointer);
|
|
logger.debug('Memory synced', { cid, size, chunks, namespace });
|
|
return pointer;
|
|
}
|
|
/**
|
|
* Submit task for execution
|
|
*/
|
|
async submitTask(task) {
|
|
const envelope = {
|
|
...task,
|
|
requester: this.agentId,
|
|
deadline: Date.now() + task.budgets.timeoutMs,
|
|
priority: 1,
|
|
};
|
|
return this.publish('tasks', envelope);
|
|
}
|
|
/**
|
|
* Submit task result with full execution binding
|
|
* Signs ALL fields including TaskEnvelope binding for complete traceability
|
|
*/
|
|
async submitResult(receipt) {
|
|
// Create receipt with executor's public key
|
|
const fullReceipt = {
|
|
...receipt,
|
|
executorPubKey: this.identity.getPublicKey(),
|
|
};
|
|
// Sign ALL fields using canonical serialization for full execution binding
|
|
// This includes the new TaskEnvelope binding fields
|
|
const dataToSign = {
|
|
endTimestamp: fullReceipt.endTimestamp,
|
|
entrypoint: fullReceipt.entrypoint,
|
|
executionMs: fullReceipt.executionMs,
|
|
executor: fullReceipt.executor,
|
|
executorPubKey: fullReceipt.executorPubKey,
|
|
fuelUsed: fullReceipt.fuelUsed,
|
|
inputCID: fullReceipt.inputCID,
|
|
inputHash: fullReceipt.inputHash,
|
|
memoryPeakMB: fullReceipt.memoryPeakMB,
|
|
moduleCID: fullReceipt.moduleCID,
|
|
moduleHash: fullReceipt.moduleHash,
|
|
outputHash: fullReceipt.outputHash,
|
|
outputSchemaHash: fullReceipt.outputSchemaHash,
|
|
resultCID: fullReceipt.resultCID,
|
|
startTimestamp: fullReceipt.startTimestamp,
|
|
status: fullReceipt.status,
|
|
taskEnvelopeHash: fullReceipt.taskEnvelopeHash,
|
|
taskId: fullReceipt.taskId,
|
|
};
|
|
const signature = this.identity.sign(stableStringify(dataToSign));
|
|
const signedReceipt = { ...fullReceipt, signature };
|
|
return this.publish('results', signedReceipt);
|
|
}
|
|
/**
|
|
* Get status
|
|
*/
|
|
getStatus() {
|
|
return {
|
|
connected: this.connected,
|
|
swarmId: this.swarmId,
|
|
agentId: this.agentId,
|
|
publicKey: this.identity.getPublicKey().slice(0, 50) + '...',
|
|
relays: this.relayManager.getMetrics(),
|
|
};
|
|
}
|
|
/**
|
|
* Get swarm key for sharing
|
|
*/
|
|
getSwarmKey() {
|
|
return this.swarmKey.toString('base64');
|
|
}
|
|
//===========================================================================
|
|
// AUTONOMY: Membership Watcher, Heartbeats, Task Executor
|
|
//===========================================================================
|
|
/**
|
|
* Start membership watcher
|
|
* Continuously monitors the members map, verifies registrations, tracks liveness
|
|
*/
|
|
startMembershipWatcher() {
|
|
if (!this.swarmNode)
|
|
return;
|
|
// Subscribe to all member registrations
|
|
this.swarmNode.get('members').map().on((registration, agentId) => {
|
|
if (!registration || !agentId)
|
|
return;
|
|
if (agentId === this.agentId)
|
|
return; // Skip self
|
|
// Verify the registration
|
|
const verified = this.verifyMemberRegistration(registration);
|
|
if (verified) {
|
|
// Check if this is an update or new member
|
|
const existing = this.memberRegistry.get(agentId);
|
|
if (existing) {
|
|
// Keep the higher lastSeen (heartbeat updates this separately)
|
|
verified.lastSeen = Math.max(existing.lastSeen, verified.lastSeen);
|
|
}
|
|
this.memberRegistry.set(agentId, verified);
|
|
// Remove from negative cache
|
|
this.negativeMemberCache.delete(agentId);
|
|
logger.debug('Member verified via watcher', { agentId, capabilities: verified.capabilities });
|
|
}
|
|
});
|
|
// Subscribe to heartbeats
|
|
this.subscribeToHeartbeats();
|
|
logger.debug('Membership watcher started');
|
|
}
|
|
/**
|
|
* Subscribe to heartbeat messages to track liveness
|
|
* Explicitly rejects agentId mismatches to prevent confusion attacks
|
|
*/
|
|
subscribeToHeartbeats() {
|
|
this.subscribe('heartbeat', (data, from) => {
|
|
// Reject if payload agentId doesn't match envelope sender
|
|
if (data?.agentId && data.agentId !== from) {
|
|
logger.warn('Heartbeat agentId mismatch', { from, claimed: data.agentId });
|
|
return;
|
|
}
|
|
const member = this.memberRegistry.get(from);
|
|
if (member) {
|
|
member.lastSeen = Date.now();
|
|
logger.debug('Heartbeat received', { from, lastSeen: member.lastSeen });
|
|
}
|
|
});
|
|
}
|
|
/**
|
|
* Start publishing heartbeats
|
|
* Every HEARTBEAT_INTERVAL_MS, publish a signed heartbeat message
|
|
*/
|
|
startHeartbeat() {
|
|
// Publish initial heartbeat
|
|
this.publishHeartbeat();
|
|
// Start interval
|
|
this.heartbeatInterval = setInterval(() => {
|
|
this.publishHeartbeat();
|
|
}, this.HEARTBEAT_INTERVAL_MS);
|
|
logger.debug('Heartbeat publishing started', { intervalMs: this.HEARTBEAT_INTERVAL_MS });
|
|
}
|
|
/**
|
|
* Publish a single heartbeat
|
|
* Belt-and-suspenders: check both connected and swarmNode
|
|
*/
|
|
async publishHeartbeat() {
|
|
if (!this.connected || !this.swarmNode)
|
|
return;
|
|
await this.publish('heartbeat', {
|
|
agentId: this.agentId,
|
|
timestamp: Date.now(),
|
|
status: 'alive',
|
|
capabilities: ['coordinator', 'executor'],
|
|
claimedTasks: this.claimedTasks.size,
|
|
});
|
|
// Update own lastSeen
|
|
const self = this.memberRegistry.get(this.agentId);
|
|
if (self) {
|
|
self.lastSeen = Date.now();
|
|
}
|
|
}
|
|
/**
|
|
* Get list of live members (heartbeat within MEMBER_TIMEOUT_MS)
|
|
*/
|
|
getLiveMembers() {
|
|
const now = Date.now();
|
|
const members = [];
|
|
const entries = Array.from(this.memberRegistry.entries());
|
|
for (const [agentId, member] of entries) {
|
|
if (!member.verified)
|
|
continue;
|
|
const isAlive = (now - member.lastSeen) < this.MEMBER_TIMEOUT_MS;
|
|
members.push({
|
|
agentId,
|
|
capabilities: member.capabilities,
|
|
lastSeen: member.lastSeen,
|
|
isAlive,
|
|
});
|
|
}
|
|
return members;
|
|
}
|
|
/**
|
|
* Get count of live members
|
|
*/
|
|
getLiveMemberCount() {
|
|
return this.getLiveMembers().filter(m => m.isAlive).length;
|
|
}
|
|
//===========================================================================
|
|
// RECEIPT VERIFICATION
|
|
//===========================================================================
|
|
/**
|
|
* Verify a task receipt
|
|
*
|
|
* Checks:
|
|
* 1. Executor is in verified member registry
|
|
* 2. Executor public key matches registry
|
|
* 3. Signature is valid over the canonical receipt fields
|
|
* 4. Optionally verifies taskEnvelopeHash if original task is provided
|
|
*/
|
|
async verifyReceipt(receipt, originalTask) {
|
|
// 1. Resolve executor from registry
|
|
const executor = await this.resolveMember(receipt.executor);
|
|
if (!executor) {
|
|
return { valid: false, reason: 'Executor not in verified registry' };
|
|
}
|
|
// 2. Verify executor public key matches
|
|
if (executor.publicKey !== receipt.executorPubKey) {
|
|
return { valid: false, reason: 'Executor public key mismatch with registry' };
|
|
}
|
|
// 3. Reconstruct the signed data (must EXACTLY match submitResult signing set)
|
|
// All fields in alphabetical order for stableStringify consistency
|
|
const dataToVerify = {
|
|
endTimestamp: receipt.endTimestamp,
|
|
entrypoint: receipt.entrypoint,
|
|
executionMs: receipt.executionMs,
|
|
executor: receipt.executor,
|
|
executorPubKey: receipt.executorPubKey,
|
|
fuelUsed: receipt.fuelUsed,
|
|
inputCID: receipt.inputCID,
|
|
inputHash: receipt.inputHash,
|
|
memoryPeakMB: receipt.memoryPeakMB,
|
|
moduleCID: receipt.moduleCID,
|
|
moduleHash: receipt.moduleHash,
|
|
outputHash: receipt.outputHash,
|
|
outputSchemaHash: receipt.outputSchemaHash,
|
|
resultCID: receipt.resultCID,
|
|
startTimestamp: receipt.startTimestamp,
|
|
status: receipt.status,
|
|
taskEnvelopeHash: receipt.taskEnvelopeHash,
|
|
taskId: receipt.taskId,
|
|
};
|
|
// 4. Verify signature using registry key
|
|
if (!this.identity.verify(stableStringify(dataToVerify), receipt.signature, executor.publicKey)) {
|
|
return { valid: false, reason: 'Invalid receipt signature' };
|
|
}
|
|
// 5. Optionally verify taskEnvelopeHash
|
|
if (originalTask) {
|
|
const expectedHash = this.crypto.hash(stableStringify(originalTask));
|
|
if (receipt.taskEnvelopeHash !== expectedHash) {
|
|
return { valid: false, reason: 'Task envelope hash mismatch' };
|
|
}
|
|
}
|
|
return {
|
|
valid: true,
|
|
executor: {
|
|
agentId: executor.agentId,
|
|
capabilities: executor.capabilities,
|
|
verified: executor.verified,
|
|
},
|
|
};
|
|
}
|
|
//===========================================================================
|
|
// TASK EXECUTOR LOOP (optional, enable with startTaskExecutor)
|
|
//===========================================================================
|
|
/**
|
|
* Start the task executor loop
|
|
* Subscribes to tasks, claims them, and (in production) executes in Wasmtime
|
|
*
|
|
* Security checks:
|
|
* - Requester must be verified member
|
|
* - Requester in envelope must match sender
|
|
* - Executor must have 'executor' capability
|
|
*/
|
|
startTaskExecutor() {
|
|
if (this.taskExecutorActive)
|
|
return;
|
|
this.taskExecutorActive = true;
|
|
this.subscribe('tasks', async (task, from) => {
|
|
// Skip if not from a verified member
|
|
const requester = await this.resolveMember(from);
|
|
if (!requester) {
|
|
logger.debug('Task from unverified requester, skipping', { from });
|
|
return;
|
|
}
|
|
// Reject if task.requester doesn't match envelope sender (prevents replay/confusion)
|
|
if (task.requester && task.requester !== from) {
|
|
logger.warn('Task requester mismatch', {
|
|
from,
|
|
requester: task.requester,
|
|
taskId: task.taskId,
|
|
});
|
|
return;
|
|
}
|
|
// Check if we have executor capability
|
|
const self = this.memberRegistry.get(this.agentId);
|
|
if (!self || !self.capabilities.includes('executor')) {
|
|
logger.debug('Not an executor, skipping task', { taskId: task.taskId });
|
|
return;
|
|
}
|
|
// Skip if already claimed
|
|
if (this.claimedTasks.has(task.taskId)) {
|
|
return;
|
|
}
|
|
// Skip if past deadline
|
|
if (Date.now() > task.deadline) {
|
|
logger.debug('Task past deadline, skipping', { taskId: task.taskId });
|
|
return;
|
|
}
|
|
// Attempt to claim the task
|
|
const claimed = await this.claimTask(task);
|
|
if (!claimed) {
|
|
return;
|
|
}
|
|
// Execute the task
|
|
try {
|
|
const receipt = await this.executeTask(task);
|
|
await this.submitResult(receipt);
|
|
logger.info('Task completed', { taskId: task.taskId, status: receipt.status });
|
|
}
|
|
catch (error) {
|
|
logger.error('Task execution failed', { taskId: task.taskId, error });
|
|
// Submit error receipt (async for consistent artifact hashing)
|
|
const errorReceipt = await this.createErrorReceipt(task, error);
|
|
await this.submitResult(errorReceipt);
|
|
}
|
|
finally {
|
|
this.claimedTasks.delete(task.taskId);
|
|
}
|
|
});
|
|
logger.info('Task executor started');
|
|
}
|
|
/**
|
|
* Claim a task with conflict resolution
|
|
*
|
|
* Before claiming:
|
|
* 1. Check for existing claim
|
|
* 2. If claim exists and is fresh (< CLAIM_TTL_MS), skip
|
|
* 3. If no claim or stale, write our claim
|
|
*
|
|
* Uses single timestamp to ensure signature matches stored data
|
|
*/
|
|
async claimTask(task) {
|
|
if (!this.swarmNode)
|
|
return false;
|
|
// Check for existing claim (conflict resolution)
|
|
let existing = await new Promise((resolve) => {
|
|
let done = false;
|
|
this.swarmNode.get('claims').get(task.taskId).once((c) => {
|
|
if (done)
|
|
return;
|
|
done = true;
|
|
resolve(c);
|
|
});
|
|
setTimeout(() => {
|
|
if (!done) {
|
|
done = true;
|
|
resolve(null);
|
|
}
|
|
}, 1500);
|
|
});
|
|
// Verify existing claim signature against registry to prevent spoofing
|
|
if (existing?.executor && existing?.claimedAt && existing?.signature) {
|
|
const execKey = await this.resolveMemberKey(existing.executor);
|
|
if (execKey) {
|
|
const claimValid = this.identity.verify(stableStringify({
|
|
claimedAt: existing.claimedAt,
|
|
executor: existing.executor,
|
|
taskEnvelopeHash: existing.taskEnvelopeHash,
|
|
taskId: task.taskId,
|
|
}), existing.signature, execKey);
|
|
if (!claimValid) {
|
|
// Treat invalid claim as non-existent (spoofed)
|
|
logger.debug('Ignoring invalid claim signature', {
|
|
taskId: task.taskId,
|
|
claimedBy: existing.executor,
|
|
});
|
|
existing = null;
|
|
}
|
|
}
|
|
else {
|
|
// Executor not in registry - ignore claim
|
|
existing = null;
|
|
}
|
|
}
|
|
// If fresh VERIFIED claim exists from another agent, don't compete
|
|
if (existing?.claimedAt &&
|
|
(Date.now() - existing.claimedAt) < this.CLAIM_TTL_MS &&
|
|
existing.executor !== this.agentId) {
|
|
logger.debug('Task already claimed by another agent', {
|
|
taskId: task.taskId,
|
|
claimedBy: existing.executor,
|
|
});
|
|
return false;
|
|
}
|
|
// Create claim with single timestamp for consistency
|
|
const claimedAt = Date.now();
|
|
const taskEnvelopeHash = this.crypto.hash(stableStringify(task));
|
|
const claim = {
|
|
taskId: task.taskId,
|
|
executor: this.agentId,
|
|
claimedAt,
|
|
taskEnvelopeHash,
|
|
signature: this.identity.sign(stableStringify({
|
|
claimedAt,
|
|
executor: this.agentId,
|
|
taskEnvelopeHash,
|
|
taskId: task.taskId,
|
|
})),
|
|
};
|
|
// Publish claim to Gun
|
|
this.swarmNode.get('claims').get(task.taskId).put(claim);
|
|
// Add to local claimed set
|
|
this.claimedTasks.add(task.taskId);
|
|
logger.debug('Task claimed', { taskId: task.taskId });
|
|
return true;
|
|
}
|
|
/**
|
|
* Execute a task
|
|
*
|
|
* NOTE: This is a stub. In production, this would:
|
|
* 1. Fetch WASM module from IPFS using moduleCID
|
|
* 2. Fetch input data from IPFS using inputCID
|
|
* 3. Execute in Wasmtime with fuel, memory, and timeout limits
|
|
* 4. Store output to IPFS, get resultCID
|
|
* 5. Return signed receipt
|
|
*
|
|
* Hashes actual bytes when available for meaningful receipts
|
|
*/
|
|
async executeTask(task) {
|
|
const startTimestamp = Date.now();
|
|
// Fetch artifacts (may be null if not in local cache)
|
|
const inputBytes = await this.artifactStore.retrieve(task.inputCID);
|
|
const moduleBytes = await this.artifactStore.retrieve(task.moduleCID);
|
|
// Hash actual bytes when available, fall back to CID string if not
|
|
const inputHash = inputBytes
|
|
? this.crypto.hash(inputBytes)
|
|
: this.crypto.hash(task.inputCID);
|
|
const moduleHash = moduleBytes
|
|
? this.crypto.hash(moduleBytes)
|
|
: this.crypto.hash(task.moduleCID);
|
|
// Stub execution - in production use Wasmtime with moduleBytes
|
|
// const result = await wasmtime.execute(moduleBytes, inputBytes, task.budgets);
|
|
const endTimestamp = Date.now();
|
|
const taskEnvelopeHash = this.crypto.hash(stableStringify(task));
|
|
// Store result artifact
|
|
const resultData = JSON.stringify({
|
|
taskId: task.taskId,
|
|
status: 'success',
|
|
output: 'Stub execution completed',
|
|
timestamp: endTimestamp,
|
|
});
|
|
// Hash actual output bytes
|
|
const outputBytes = Buffer.from(resultData);
|
|
const outputHash = this.crypto.hash(outputBytes);
|
|
const { cid: resultCID } = await this.artifactStore.store(resultData);
|
|
return {
|
|
taskId: task.taskId,
|
|
executor: this.agentId,
|
|
resultCID,
|
|
status: 'success',
|
|
fuelUsed: 1000,
|
|
memoryPeakMB: 1,
|
|
executionMs: endTimestamp - startTimestamp,
|
|
inputHash,
|
|
outputHash,
|
|
moduleHash,
|
|
startTimestamp,
|
|
endTimestamp,
|
|
moduleCID: task.moduleCID,
|
|
inputCID: task.inputCID,
|
|
entrypoint: task.entrypoint,
|
|
outputSchemaHash: task.outputSchemaHash,
|
|
taskEnvelopeHash,
|
|
};
|
|
}
|
|
/**
|
|
* Create error receipt for failed task
|
|
* Uses same hashing semantics as executeTask for consistency
|
|
*/
|
|
async createErrorReceipt(task, error) {
|
|
const now = Date.now();
|
|
// Hash actual bytes when available for consistency with executeTask
|
|
const inputBytes = await this.artifactStore.retrieve(task.inputCID);
|
|
const moduleBytes = await this.artifactStore.retrieve(task.moduleCID);
|
|
const inputHash = inputBytes
|
|
? this.crypto.hash(inputBytes)
|
|
: this.crypto.hash(task.inputCID);
|
|
const moduleHash = moduleBytes
|
|
? this.crypto.hash(moduleBytes)
|
|
: this.crypto.hash(task.moduleCID);
|
|
// Hash error message as output
|
|
const outputHash = this.crypto.hash(Buffer.from(error.message));
|
|
return {
|
|
taskId: task.taskId,
|
|
executor: this.agentId,
|
|
resultCID: '',
|
|
status: 'error',
|
|
fuelUsed: 0,
|
|
memoryPeakMB: 0,
|
|
executionMs: 0,
|
|
inputHash,
|
|
outputHash,
|
|
moduleHash,
|
|
startTimestamp: now,
|
|
endTimestamp: now,
|
|
moduleCID: task.moduleCID,
|
|
inputCID: task.inputCID,
|
|
entrypoint: task.entrypoint,
|
|
outputSchemaHash: task.outputSchemaHash,
|
|
taskEnvelopeHash: this.crypto.hash(stableStringify(task)),
|
|
};
|
|
}
|
|
/**
|
|
* Stop task executor
|
|
*/
|
|
stopTaskExecutor() {
|
|
this.taskExecutorActive = false;
|
|
logger.info('Task executor stopped');
|
|
}
|
|
/**
|
|
* Disconnect and cleanup resources
|
|
*/
|
|
disconnect() {
|
|
this.connected = false;
|
|
// Stop heartbeat
|
|
if (this.heartbeatInterval) {
|
|
clearInterval(this.heartbeatInterval);
|
|
this.heartbeatInterval = null;
|
|
}
|
|
// Stop task executor
|
|
this.taskExecutorActive = false;
|
|
this.identity.destroy(); // Stop nonce cleanup interval
|
|
this.messageHandlers.clear();
|
|
this.pendingOffers.clear();
|
|
this.claimedTasks.clear();
|
|
this.negativeMemberCache.clear();
|
|
// Gun doesn't have explicit disconnect
|
|
}
|
|
}
|
|
//=============================================================================
|
|
// RUVECTOR OPTIMIZATION LAYER
|
|
//=============================================================================
|
|
/**
|
|
* RuVector optimization for Gun message handling
|
|
*
|
|
* Provides:
|
|
* - HNSW indexing for O(log n) message lookup by topic similarity
|
|
* - Batch message operations for reduced overhead
|
|
* - PQ quantization for compressed message metadata storage
|
|
* - Vector-based topic routing for intelligent distribution
|
|
*/
|
|
class RuVectorGunOptimizer {
|
|
topicVectors = new Map();
|
|
messageIndex = new Map(); // topic -> messageIds
|
|
batchQueue = [];
|
|
batchFlushInterval = null;
|
|
batchSize = 10;
|
|
flushIntervalMs = 100;
|
|
constructor() {
|
|
this.startBatchFlusher();
|
|
}
|
|
/**
|
|
* Generate topic embedding using simple hash-based vectors
|
|
* In production: use actual embedding model via ruvector
|
|
*/
|
|
generateTopicVector(topic) {
|
|
// Create 128-dimensional vector from topic hash
|
|
const hash = topic.split('').reduce((acc, char) => acc + char.charCodeAt(0), 0);
|
|
const vector = new Float32Array(128);
|
|
for (let i = 0; i < 128; i++) {
|
|
vector[i] = Math.sin(hash * (i + 1) * 0.1) * 0.5 + 0.5;
|
|
}
|
|
return vector;
|
|
}
|
|
/**
|
|
* Find similar topics using cosine similarity
|
|
* In production: use HNSW index via ruvector for O(log n) lookup
|
|
*/
|
|
findSimilarTopics(topic, topK = 5) {
|
|
const queryVector = this.generateTopicVector(topic);
|
|
const similarities = [];
|
|
const entries = Array.from(this.topicVectors.entries());
|
|
for (const [existingTopic, vector] of entries) {
|
|
const similarity = this.cosineSimilarity(queryVector, vector);
|
|
similarities.push({ topic: existingTopic, similarity });
|
|
}
|
|
// Sort by similarity and return top K
|
|
return similarities
|
|
.sort((a, b) => b.similarity - a.similarity)
|
|
.slice(0, topK);
|
|
}
|
|
/**
|
|
* Cosine similarity between two vectors
|
|
*/
|
|
cosineSimilarity(a, b) {
|
|
let dotProduct = 0;
|
|
let normA = 0;
|
|
let normB = 0;
|
|
for (let i = 0; i < a.length; i++) {
|
|
dotProduct += a[i] * b[i];
|
|
normA += a[i] * a[i];
|
|
normB += b[i] * b[i];
|
|
}
|
|
return dotProduct / (Math.sqrt(normA) * Math.sqrt(normB));
|
|
}
|
|
/**
|
|
* Index a topic for similarity search
|
|
*/
|
|
indexTopic(topic) {
|
|
if (!this.topicVectors.has(topic)) {
|
|
this.topicVectors.set(topic, this.generateTopicVector(topic));
|
|
this.messageIndex.set(topic, []);
|
|
}
|
|
}
|
|
/**
|
|
* Add message to batch queue for efficient Gun writes
|
|
*/
|
|
queueMessage(topic, payload) {
|
|
this.indexTopic(topic);
|
|
this.batchQueue.push({ topic, payload });
|
|
// Flush immediately if batch is full
|
|
if (this.batchQueue.length >= this.batchSize) {
|
|
this.flushBatch();
|
|
}
|
|
}
|
|
/**
|
|
* Get batched messages for efficient Gun write
|
|
*/
|
|
flushBatch() {
|
|
const batch = [...this.batchQueue];
|
|
this.batchQueue = [];
|
|
return batch;
|
|
}
|
|
/**
|
|
* Start periodic batch flushing
|
|
*/
|
|
startBatchFlusher() {
|
|
this.batchFlushInterval = setInterval(() => {
|
|
if (this.batchQueue.length > 0) {
|
|
this.flushBatch();
|
|
}
|
|
}, this.flushIntervalMs);
|
|
}
|
|
/**
|
|
* Get optimization metrics
|
|
*/
|
|
getMetrics() {
|
|
let totalMessages = 0;
|
|
const values = Array.from(this.messageIndex.values());
|
|
for (const messageIds of values) {
|
|
totalMessages += messageIds.length;
|
|
}
|
|
return {
|
|
indexedTopics: this.topicVectors.size,
|
|
batchQueueSize: this.batchQueue.length,
|
|
totalMessages,
|
|
};
|
|
}
|
|
/**
|
|
* Cleanup resources
|
|
*/
|
|
destroy() {
|
|
if (this.batchFlushInterval) {
|
|
clearInterval(this.batchFlushInterval);
|
|
this.batchFlushInterval = null;
|
|
}
|
|
}
|
|
}
|
|
/**
|
|
* Create and connect a P2P swarm
|
|
*/
|
|
export async function createP2PSwarmV2(agentId, swarmKey) {
|
|
const swarm = new P2PSwarmV2(agentId, swarmKey);
|
|
await swarm.connect();
|
|
return swarm;
|
|
}
|
|
export { stableStringify, IdentityManager, CryptoV2, RelayManager, ArtifactStore, RuVectorGunOptimizer, };
|
|
//# sourceMappingURL=p2p-swarm-v2.js.map
|