tasq/lib/providers/stream_recovery.dart

244 lines
7.5 KiB
Dart

import 'dart:async';
import 'package:flutter/foundation.dart';
/// Connection status for a single stream subscription.
enum StreamConnectionStatus {
/// Connected and receiving live updates.
connected,
/// Attempting to recover the connection; data may be stale.
recovering,
/// Connection failed; attempting to fallback to polling.
polling,
/// Connection and polling both failed; data is stale.
stale,
/// Fatal error; stream will not recover without manual intervention.
failed,
}
/// Represents the result of a polling attempt.
class PollResult<T> {
final List<T> data;
final bool success;
final String? error;
PollResult({required this.data, required this.success, this.error});
}
/// Configuration for stream recovery behavior.
class StreamRecoveryConfig {
/// Maximum number of automatic recovery attempts before giving up.
final int maxRecoveryAttempts;
/// Initial delay (in milliseconds) before first recovery attempt.
final int initialDelayMs;
/// Maximum delay (in milliseconds) for exponential backoff.
final int maxDelayMs;
/// Multiplier for exponential backoff (e.g., 2.0 = double each attempt).
final double backoffMultiplier;
/// Enable polling fallback when realtime fails.
final bool enablePollingFallback;
/// Polling interval (in milliseconds) when realtime is unavailable.
final int pollingIntervalMs;
const StreamRecoveryConfig({
this.maxRecoveryAttempts = 4,
this.initialDelayMs = 1000,
this.maxDelayMs = 32000, // 32 seconds max
this.backoffMultiplier = 2.0,
this.enablePollingFallback = true,
this.pollingIntervalMs = 5000, // Poll every 5 seconds
});
}
/// Wraps a Supabase realtime stream with automatic recovery, polling fallback,
/// and connection status tracking. Provides graceful degradation when the
/// realtime connection fails.
///
/// Usage:
/// ```dart
/// final wrappedStream = StreamRecoveryWrapper(
/// stream: client.from('tasks').stream(primaryKey: ['id']),
/// onPollData: () => fetchTasksViaRest(),
/// );
/// // wrappedStream.stream emits data with connection status in metadata
/// ```
class StreamRecoveryWrapper<T> {
final Stream<List<Map<String, dynamic>>> _realtimeStream;
final Future<List<T>> Function() _onPollData;
final T Function(Map<String, dynamic>) _fromMap;
final StreamRecoveryConfig _config;
StreamConnectionStatus _connectionStatus = StreamConnectionStatus.connected;
int _recoveryAttempts = 0;
Timer? _pollingTimer;
StreamController<StreamConnectionStatus>? _statusController;
Stream<StreamRecoveryResult<T>>? _cachedStream;
StreamRecoveryWrapper({
required Stream<List<Map<String, dynamic>>> stream,
required Future<List<T>> Function() onPollData,
required T Function(Map<String, dynamic>) fromMap,
StreamRecoveryConfig config = const StreamRecoveryConfig(),
}) : _realtimeStream = stream,
_onPollData = onPollData,
_fromMap = fromMap,
_config = config;
/// The wrapped stream that emits recovery results with metadata.
Stream<StreamRecoveryResult<T>> get stream =>
_cachedStream ??= _buildStream();
/// Current connection status of this stream.
StreamConnectionStatus get connectionStatus => _connectionStatus;
/// Notifies listeners when connection status changes.
Stream<StreamConnectionStatus> get statusChanges {
_statusController ??= StreamController<StreamConnectionStatus>.broadcast();
return _statusController!.stream;
}
/// Builds the wrapped stream with recovery and polling logic.
Stream<StreamRecoveryResult<T>> _buildStream() async* {
int delayMs = _config.initialDelayMs;
while (true) {
try {
_setStatus(StreamConnectionStatus.connected);
// Try realtime stream first
yield* _realtimeStream
.map(
(rows) => StreamRecoveryResult<T>(
data: rows.map(_fromMap).toList(),
connectionStatus: StreamConnectionStatus.connected,
isStale: false,
),
)
.handleError((error) {
debugPrint(
'StreamRecoveryWrapper: realtime stream error: $error',
);
_setStatus(StreamConnectionStatus.recovering);
throw error; // Propagate to outer handler
});
// If we get here, stream completed normally (shouldn't happen)
break;
} catch (e) {
debugPrint(
'StreamRecoveryWrapper: realtime failed, error=$e, '
'attempts=$_recoveryAttempts/${_config.maxRecoveryAttempts}',
);
// Exceeded max recovery attempts?
if (_recoveryAttempts >= _config.maxRecoveryAttempts) {
if (_config.enablePollingFallback) {
_setStatus(StreamConnectionStatus.polling);
yield* _pollingFallback();
break;
} else {
_setStatus(StreamConnectionStatus.failed);
yield StreamRecoveryResult<T>(
data: const [],
connectionStatus: StreamConnectionStatus.failed,
isStale: true,
error: e.toString(),
);
break;
}
}
// Exponential backoff before retry
_recoveryAttempts++;
await Future.delayed(Duration(milliseconds: delayMs));
delayMs = (delayMs * _config.backoffMultiplier).toInt();
if (delayMs > _config.maxDelayMs) {
delayMs = _config.maxDelayMs;
}
}
}
}
/// Fallback to periodic REST polling when realtime is unavailable.
Stream<StreamRecoveryResult<T>> _pollingFallback() async* {
while (_connectionStatus == StreamConnectionStatus.polling) {
try {
final data = await _onPollData();
yield StreamRecoveryResult<T>(
data: data,
connectionStatus: StreamConnectionStatus.polling,
isStale: true, // Mark as stale since it's no longer live
);
await Future.delayed(Duration(milliseconds: _config.pollingIntervalMs));
} catch (e) {
debugPrint('StreamRecoveryWrapper: polling error: $e');
_setStatus(StreamConnectionStatus.stale);
yield StreamRecoveryResult<T>(
data: const [],
connectionStatus: StreamConnectionStatus.stale,
isStale: true,
error: e.toString(),
);
break;
}
}
}
/// Update connection status and notify listeners.
void _setStatus(StreamConnectionStatus status) {
if (_connectionStatus != status) {
_connectionStatus = status;
_statusController?.add(status);
}
}
/// Manually trigger a recovery attempt.
void retry() {
_recoveryAttempts = 0;
_setStatus(StreamConnectionStatus.recovering);
}
/// Clean up resources.
void dispose() {
_pollingTimer?.cancel();
_statusController?.close();
}
}
/// Result of a stream emission, including metadata about connection status.
class StreamRecoveryResult<T> {
/// The data emitted by the stream.
final List<T> data;
/// Current connection status.
final StreamConnectionStatus connectionStatus;
/// Whether the data is stale (not live from realtime).
final bool isStale;
/// Error message, if any.
final String? error;
StreamRecoveryResult({
required this.data,
required this.connectionStatus,
required this.isStale,
this.error,
});
/// True if data is live and reliable.
bool get isLive => connectionStatus == StreamConnectionStatus.connected;
/// True if we should show a "data may be stale" indicator.
bool get shouldIndicateStale =>
isStale || connectionStatus == StreamConnectionStatus.polling;
}