import 'dart:async'; import 'dart:math' as math; import 'package:flutter/foundation.dart'; /// Connection status for a single stream subscription. enum StreamConnectionStatus { /// Connected and receiving live updates. connected, /// Attempting to recover the connection; data may be stale. recovering, /// Connection failed; attempting to fallback to polling. polling, /// Connection and polling both failed; data is stale. stale, /// Fatal error; stream will not recover without manual intervention. failed, } /// Represents the result of a polling attempt. class PollResult { final List data; final bool success; final String? error; PollResult({required this.data, required this.success, this.error}); } /// Configuration for stream recovery behavior. class StreamRecoveryConfig { /// Maximum number of automatic recovery attempts before giving up. final int maxRecoveryAttempts; /// Initial delay (in milliseconds) before first recovery attempt. final int initialDelayMs; /// Maximum delay (in milliseconds) for exponential backoff. final int maxDelayMs; /// Multiplier for exponential backoff (e.g., 2.0 = double each attempt). final double backoffMultiplier; /// Enable polling fallback when realtime fails. final bool enablePollingFallback; /// Polling interval (in milliseconds) when realtime is unavailable. final int pollingIntervalMs; const StreamRecoveryConfig({ this.maxRecoveryAttempts = 4, this.initialDelayMs = 1000, this.maxDelayMs = 32000, // 32 seconds max this.backoffMultiplier = 2.0, this.enablePollingFallback = true, this.pollingIntervalMs = 5000, // Poll every 5 seconds }); } /// Callback type for per-channel status change notifications. /// /// Used by [StreamRecoveryWrapper] to notify [RealtimeController] about /// individual channel recovery state so the UI can show per-channel skeletons. typedef ChannelStatusCallback = void Function(String channel, StreamConnectionStatus status); /// Wraps a Supabase realtime stream with automatic recovery, polling fallback, /// and connection status tracking. Provides graceful degradation when the /// realtime connection fails. /// /// Error handling: /// - **Timeout**: detected and handled internally with exponential backoff. /// - **ChannelRateLimitReached**: detected and handled with a longer minimum /// delay (5 s) before retrying. During recovery, a REST poll keeps data /// fresh so the UI shows shimmer/skeleton instead of an error. /// - **Generic errors**: same recovery flow with standard backoff. /// /// Errors are **never** forwarded to consumers; instead the wrapper emits /// [StreamRecoveryResult] events with appropriate [connectionStatus] so the /// UI can react per-channel (e.g. show skeleton shimmer). /// /// Usage: /// ```dart /// final wrapper = StreamRecoveryWrapper( /// stream: client.from('tasks').stream(primaryKey: ['id']), /// onPollData: () => fetchTasksViaRest(), /// fromMap: Task.fromMap, /// channelName: 'tasks', /// onStatusChanged: (channel, status) { ... }, /// ); /// ``` class StreamRecoveryWrapper { final Stream>> _realtimeStream; final Future> Function() _onPollData; final T Function(Map) _fromMap; final StreamRecoveryConfig _config; /// Human-readable channel name for logging and per-channel status tracking. final String channelName; /// Optional callback invoked whenever this channel's connection status /// changes. Used to integrate with [RealtimeController] for per-channel /// skeleton indicators in the UI. final ChannelStatusCallback? _onStatusChanged; StreamConnectionStatus _connectionStatus = StreamConnectionStatus.connected; int _recoveryAttempts = 0; Timer? _pollingTimer; Timer? _recoveryTimer; Timer? _stabilityTimer; StreamSubscription>>? _realtimeSub; StreamController>? _controller; bool _disposed = false; bool _listening = false; StreamRecoveryWrapper({ required Stream>> stream, required Future> Function() onPollData, required T Function(Map) fromMap, StreamRecoveryConfig config = const StreamRecoveryConfig(), this.channelName = 'unknown', ChannelStatusCallback? onStatusChanged, }) : _realtimeStream = stream, _onPollData = onPollData, _fromMap = fromMap, _config = config, _onStatusChanged = onStatusChanged; /// The wrapped stream that emits recovery results with metadata. /// /// Lazily initializes the internal controller and starts listening to the /// realtime stream on first access. Errors from the realtime channel are /// handled internally — consumers only see [StreamRecoveryResult] events. Stream> get stream { if (_controller == null) { _controller = StreamController>.broadcast(); _startRealtimeSubscription(); } return _controller!.stream; } /// Current connection status of this stream. StreamConnectionStatus get connectionStatus => _connectionStatus; // ── Realtime subscription ─────────────────────────────────────────────── void _startRealtimeSubscription() { if (_disposed || _listening) return; _listening = true; _realtimeSub?.cancel(); _realtimeSub = _realtimeStream.listen( _onRealtimeData, onError: _onRealtimeError, onDone: _onRealtimeDone, cancelOnError: false, // keep listening even after transient errors ); } void _onRealtimeData(List> rows) { if (_disposed) return; // When recovering, don't reset _recoveryAttempts immediately. // Supabase streams emit an initial REST fetch before the realtime // channel is established. If the channel keeps failing, resetting // on that REST data creates an infinite loop (data → reset → subscribe // → fail → data → reset …). Instead, start a stability timer — only // reset after staying connected without errors for 30 seconds. if (_recoveryAttempts > 0) { _stabilityTimer?.cancel(); _stabilityTimer = Timer(const Duration(seconds: 30), () { if (!_disposed) { _recoveryAttempts = 0; debugPrint( 'StreamRecoveryWrapper[$channelName]: ' 'connection stable for 30s — recovery counter reset', ); } }); } else { _recoveryAttempts = 0; } _setStatus(StreamConnectionStatus.connected); _emit( StreamRecoveryResult( data: rows.map(_fromMap).toList(), connectionStatus: StreamConnectionStatus.connected, isStale: false, ), ); } void _onRealtimeError(Object error, [StackTrace? stack]) { if (_disposed) return; // Cancel any stability timer — the connection is not stable. _stabilityTimer?.cancel(); final isRateLimit = _isRateLimitError(error); final isTimeout = _isTimeoutError(error); final tag = isRateLimit ? ' (rate-limited)' : isTimeout ? ' (timeout)' : ''; debugPrint('StreamRecoveryWrapper[$channelName]: stream error$tag: $error'); _setStatus(StreamConnectionStatus.recovering); if (_recoveryAttempts >= _config.maxRecoveryAttempts) { if (_config.enablePollingFallback) { _startPollingFallback(); } else { _setStatus(StreamConnectionStatus.failed); _emit( StreamRecoveryResult( data: const [], connectionStatus: StreamConnectionStatus.failed, isStale: true, error: error.toString(), ), ); } return; } _recoveryAttempts++; // Compute backoff delay. Rate-limit errors get a longer floor (5 s). final baseDelay = _config.initialDelayMs * math.pow(_config.backoffMultiplier, _recoveryAttempts - 1); final effectiveDelay = isRateLimit ? math.max(baseDelay.toInt(), 5000) : baseDelay.toInt(); final cappedDelay = math.min(effectiveDelay, _config.maxDelayMs); debugPrint( 'StreamRecoveryWrapper[$channelName]: recovery attempt ' '$_recoveryAttempts/${_config.maxRecoveryAttempts}, ' 'delay=${cappedDelay}ms$tag', ); // Fire a single REST poll immediately so the UI can show fresh data // under the skeleton shimmer while waiting for realtime to reconnect. _pollOnce(); // Schedule re-subscription after backoff. _recoveryTimer?.cancel(); _recoveryTimer = Timer(Duration(milliseconds: cappedDelay), () { if (_disposed) return; _listening = false; _startRealtimeSubscription(); }); } void _onRealtimeDone() { if (_disposed) return; debugPrint('StreamRecoveryWrapper[$channelName]: stream completed'); // Attempt to reconnect once if the stream closes unexpectedly. if (_recoveryAttempts < _config.maxRecoveryAttempts) { _onRealtimeError(StateError('realtime stream completed unexpectedly')); } } // ── Polling fallback ────────────────────────────────────────────────── void _startPollingFallback() { _realtimeSub?.cancel(); _listening = false; _setStatus(StreamConnectionStatus.polling); _pollingTimer?.cancel(); _pollOnce(); // Immediate first poll _pollingTimer = Timer.periodic( Duration(milliseconds: _config.pollingIntervalMs), (_) => _pollOnce(), ); } Future _pollOnce() async { if (_disposed) return; try { final data = await _onPollData(); _emit( StreamRecoveryResult( data: data, connectionStatus: _connectionStatus, isStale: true, ), ); } catch (e) { debugPrint('StreamRecoveryWrapper[$channelName]: poll error: $e'); if (_connectionStatus == StreamConnectionStatus.polling) { _setStatus(StreamConnectionStatus.stale); _emit( StreamRecoveryResult( data: const [], connectionStatus: StreamConnectionStatus.stale, isStale: true, error: e.toString(), ), ); } } } // ── Error classification ────────────────────────────────────────────── /// Whether [error] indicates a Supabase channel rate limit. static bool _isRateLimitError(Object error) { final msg = error.toString().toLowerCase(); return msg.contains('rate limit') || msg.contains('rate_limit') || msg.contains('channelratelimitreached') || msg.contains('too many') || msg.contains('429'); } /// Whether [error] indicates a subscription timeout. static bool _isTimeoutError(Object error) { if (error is TimeoutException) return true; final msg = error.toString().toLowerCase(); return msg.contains('timeout') || msg.contains('timed out') || msg.contains('timed_out'); } // ── Helpers ─────────────────────────────────────────────────────────── void _emit(StreamRecoveryResult result) { if (!_disposed && _controller != null && !_controller!.isClosed) { _controller!.add(result); } } /// Update connection status and notify the per-channel callback. void _setStatus(StreamConnectionStatus status) { if (_connectionStatus != status) { _connectionStatus = status; _onStatusChanged?.call(channelName, status); } } /// Immediately fetch fresh data via REST without restarting the realtime /// subscription. Use this as a periodic safety net for missed realtime events /// (e.g., when the table is not yet in the supabase_realtime publication). Future pollNow() async => _pollOnce(); /// Manually trigger a recovery attempt. void retry() { _recoveryAttempts = 0; _pollingTimer?.cancel(); _recoveryTimer?.cancel(); _listening = false; _startRealtimeSubscription(); } /// Clean up all resources and notify the status callback that this /// channel is no longer active, preventing ghost entries in the /// [RealtimeController]'s recovering-channels set. void dispose() { if (_disposed) return; _disposed = true; _pollingTimer?.cancel(); _recoveryTimer?.cancel(); _stabilityTimer?.cancel(); _realtimeSub?.cancel(); // Ensure the channel is removed from the recovering set when the // wrapper is torn down (e.g. provider disposed during navigation). // Without this, disposed wrappers that were mid-recovery leave // orphaned entries that keep the reconnection indicator spinning. if (_connectionStatus != StreamConnectionStatus.connected) { _onStatusChanged?.call(channelName, StreamConnectionStatus.connected); } _controller?.close(); } } /// Result of a stream emission, including metadata about connection status. class StreamRecoveryResult { /// The data emitted by the stream. final List data; /// Current connection status. final StreamConnectionStatus connectionStatus; /// Whether the data is stale (not live from realtime). final bool isStale; /// Error message, if any. final String? error; StreamRecoveryResult({ required this.data, required this.connectionStatus, required this.isStale, this.error, }); /// True if data is live and reliable. bool get isLive => connectionStatus == StreamConnectionStatus.connected; /// True if we should show a "data may be stale" indicator. bool get shouldIndicateStale => isStale || connectionStatus == StreamConnectionStatus.polling; }