import 'dart:async'; import 'package:flutter/foundation.dart'; /// Connection status for a single stream subscription. enum StreamConnectionStatus { /// Connected and receiving live updates. connected, /// Attempting to recover the connection; data may be stale. recovering, /// Connection failed; attempting to fallback to polling. polling, /// Connection and polling both failed; data is stale. stale, /// Fatal error; stream will not recover without manual intervention. failed, } /// Represents the result of a polling attempt. class PollResult { final List data; final bool success; final String? error; PollResult({required this.data, required this.success, this.error}); } /// Configuration for stream recovery behavior. class StreamRecoveryConfig { /// Maximum number of automatic recovery attempts before giving up. final int maxRecoveryAttempts; /// Initial delay (in milliseconds) before first recovery attempt. final int initialDelayMs; /// Maximum delay (in milliseconds) for exponential backoff. final int maxDelayMs; /// Multiplier for exponential backoff (e.g., 2.0 = double each attempt). final double backoffMultiplier; /// Enable polling fallback when realtime fails. final bool enablePollingFallback; /// Polling interval (in milliseconds) when realtime is unavailable. final int pollingIntervalMs; const StreamRecoveryConfig({ this.maxRecoveryAttempts = 4, this.initialDelayMs = 1000, this.maxDelayMs = 32000, // 32 seconds max this.backoffMultiplier = 2.0, this.enablePollingFallback = true, this.pollingIntervalMs = 5000, // Poll every 5 seconds }); } /// Wraps a Supabase realtime stream with automatic recovery, polling fallback, /// and connection status tracking. Provides graceful degradation when the /// realtime connection fails. /// /// Usage: /// ```dart /// final wrappedStream = StreamRecoveryWrapper( /// stream: client.from('tasks').stream(primaryKey: ['id']), /// onPollData: () => fetchTasksViaRest(), /// ); /// // wrappedStream.stream emits data with connection status in metadata /// ``` class StreamRecoveryWrapper { final Stream>> _realtimeStream; final Future> Function() _onPollData; final T Function(Map) _fromMap; final StreamRecoveryConfig _config; StreamConnectionStatus _connectionStatus = StreamConnectionStatus.connected; int _recoveryAttempts = 0; Timer? _pollingTimer; StreamController? _statusController; Stream>? _cachedStream; StreamRecoveryWrapper({ required Stream>> stream, required Future> Function() onPollData, required T Function(Map) fromMap, StreamRecoveryConfig config = const StreamRecoveryConfig(), }) : _realtimeStream = stream, _onPollData = onPollData, _fromMap = fromMap, _config = config; /// The wrapped stream that emits recovery results with metadata. Stream> get stream => _cachedStream ??= _buildStream(); /// Current connection status of this stream. StreamConnectionStatus get connectionStatus => _connectionStatus; /// Notifies listeners when connection status changes. Stream get statusChanges { _statusController ??= StreamController.broadcast(); return _statusController!.stream; } /// Builds the wrapped stream with recovery and polling logic. Stream> _buildStream() async* { int delayMs = _config.initialDelayMs; while (true) { try { _setStatus(StreamConnectionStatus.connected); // Try realtime stream first yield* _realtimeStream .map( (rows) => StreamRecoveryResult( data: rows.map(_fromMap).toList(), connectionStatus: StreamConnectionStatus.connected, isStale: false, ), ) .handleError((error) { debugPrint( 'StreamRecoveryWrapper: realtime stream error: $error', ); _setStatus(StreamConnectionStatus.recovering); throw error; // Propagate to outer handler }); // If we get here, stream completed normally (shouldn't happen) break; } catch (e) { debugPrint( 'StreamRecoveryWrapper: realtime failed, error=$e, ' 'attempts=$_recoveryAttempts/${_config.maxRecoveryAttempts}', ); // Exceeded max recovery attempts? if (_recoveryAttempts >= _config.maxRecoveryAttempts) { if (_config.enablePollingFallback) { _setStatus(StreamConnectionStatus.polling); yield* _pollingFallback(); break; } else { _setStatus(StreamConnectionStatus.failed); yield StreamRecoveryResult( data: const [], connectionStatus: StreamConnectionStatus.failed, isStale: true, error: e.toString(), ); break; } } // Exponential backoff before retry _recoveryAttempts++; await Future.delayed(Duration(milliseconds: delayMs)); delayMs = (delayMs * _config.backoffMultiplier).toInt(); if (delayMs > _config.maxDelayMs) { delayMs = _config.maxDelayMs; } } } } /// Fallback to periodic REST polling when realtime is unavailable. Stream> _pollingFallback() async* { while (_connectionStatus == StreamConnectionStatus.polling) { try { final data = await _onPollData(); yield StreamRecoveryResult( data: data, connectionStatus: StreamConnectionStatus.polling, isStale: true, // Mark as stale since it's no longer live ); await Future.delayed(Duration(milliseconds: _config.pollingIntervalMs)); } catch (e) { debugPrint('StreamRecoveryWrapper: polling error: $e'); _setStatus(StreamConnectionStatus.stale); yield StreamRecoveryResult( data: const [], connectionStatus: StreamConnectionStatus.stale, isStale: true, error: e.toString(), ); break; } } } /// Update connection status and notify listeners. void _setStatus(StreamConnectionStatus status) { if (_connectionStatus != status) { _connectionStatus = status; _statusController?.add(status); } } /// Manually trigger a recovery attempt. void retry() { _recoveryAttempts = 0; _setStatus(StreamConnectionStatus.recovering); } /// Clean up resources. void dispose() { _pollingTimer?.cancel(); _statusController?.close(); } } /// Result of a stream emission, including metadata about connection status. class StreamRecoveryResult { /// The data emitted by the stream. final List data; /// Current connection status. final StreamConnectionStatus connectionStatus; /// Whether the data is stale (not live from realtime). final bool isStale; /// Error message, if any. final String? error; StreamRecoveryResult({ required this.data, required this.connectionStatus, required this.isStale, this.error, }); /// True if data is live and reliable. bool get isLive => connectionStatus == StreamConnectionStatus.connected; /// True if we should show a "data may be stale" indicator. bool get shouldIndicateStale => isStale || connectionStatus == StreamConnectionStatus.polling; }