244 lines
7.5 KiB
Dart
244 lines
7.5 KiB
Dart
import 'dart:async';
|
|
import 'package:flutter/foundation.dart';
|
|
|
|
/// Connection status for a single stream subscription.
|
|
enum StreamConnectionStatus {
|
|
/// Connected and receiving live updates.
|
|
connected,
|
|
|
|
/// Attempting to recover the connection; data may be stale.
|
|
recovering,
|
|
|
|
/// Connection failed; attempting to fallback to polling.
|
|
polling,
|
|
|
|
/// Connection and polling both failed; data is stale.
|
|
stale,
|
|
|
|
/// Fatal error; stream will not recover without manual intervention.
|
|
failed,
|
|
}
|
|
|
|
/// Represents the result of a polling attempt.
|
|
class PollResult<T> {
|
|
final List<T> data;
|
|
final bool success;
|
|
final String? error;
|
|
|
|
PollResult({required this.data, required this.success, this.error});
|
|
}
|
|
|
|
/// Configuration for stream recovery behavior.
|
|
class StreamRecoveryConfig {
|
|
/// Maximum number of automatic recovery attempts before giving up.
|
|
final int maxRecoveryAttempts;
|
|
|
|
/// Initial delay (in milliseconds) before first recovery attempt.
|
|
final int initialDelayMs;
|
|
|
|
/// Maximum delay (in milliseconds) for exponential backoff.
|
|
final int maxDelayMs;
|
|
|
|
/// Multiplier for exponential backoff (e.g., 2.0 = double each attempt).
|
|
final double backoffMultiplier;
|
|
|
|
/// Enable polling fallback when realtime fails.
|
|
final bool enablePollingFallback;
|
|
|
|
/// Polling interval (in milliseconds) when realtime is unavailable.
|
|
final int pollingIntervalMs;
|
|
|
|
const StreamRecoveryConfig({
|
|
this.maxRecoveryAttempts = 4,
|
|
this.initialDelayMs = 1000,
|
|
this.maxDelayMs = 32000, // 32 seconds max
|
|
this.backoffMultiplier = 2.0,
|
|
this.enablePollingFallback = true,
|
|
this.pollingIntervalMs = 5000, // Poll every 5 seconds
|
|
});
|
|
}
|
|
|
|
/// Wraps a Supabase realtime stream with automatic recovery, polling fallback,
|
|
/// and connection status tracking. Provides graceful degradation when the
|
|
/// realtime connection fails.
|
|
///
|
|
/// Usage:
|
|
/// ```dart
|
|
/// final wrappedStream = StreamRecoveryWrapper(
|
|
/// stream: client.from('tasks').stream(primaryKey: ['id']),
|
|
/// onPollData: () => fetchTasksViaRest(),
|
|
/// );
|
|
/// // wrappedStream.stream emits data with connection status in metadata
|
|
/// ```
|
|
class StreamRecoveryWrapper<T> {
|
|
final Stream<List<Map<String, dynamic>>> _realtimeStream;
|
|
final Future<List<T>> Function() _onPollData;
|
|
final T Function(Map<String, dynamic>) _fromMap;
|
|
final StreamRecoveryConfig _config;
|
|
|
|
StreamConnectionStatus _connectionStatus = StreamConnectionStatus.connected;
|
|
int _recoveryAttempts = 0;
|
|
Timer? _pollingTimer;
|
|
StreamController<StreamConnectionStatus>? _statusController;
|
|
Stream<StreamRecoveryResult<T>>? _cachedStream;
|
|
|
|
StreamRecoveryWrapper({
|
|
required Stream<List<Map<String, dynamic>>> stream,
|
|
required Future<List<T>> Function() onPollData,
|
|
required T Function(Map<String, dynamic>) fromMap,
|
|
StreamRecoveryConfig config = const StreamRecoveryConfig(),
|
|
}) : _realtimeStream = stream,
|
|
_onPollData = onPollData,
|
|
_fromMap = fromMap,
|
|
_config = config;
|
|
|
|
/// The wrapped stream that emits recovery results with metadata.
|
|
Stream<StreamRecoveryResult<T>> get stream =>
|
|
_cachedStream ??= _buildStream();
|
|
|
|
/// Current connection status of this stream.
|
|
StreamConnectionStatus get connectionStatus => _connectionStatus;
|
|
|
|
/// Notifies listeners when connection status changes.
|
|
Stream<StreamConnectionStatus> get statusChanges {
|
|
_statusController ??= StreamController<StreamConnectionStatus>.broadcast();
|
|
return _statusController!.stream;
|
|
}
|
|
|
|
/// Builds the wrapped stream with recovery and polling logic.
|
|
Stream<StreamRecoveryResult<T>> _buildStream() async* {
|
|
int delayMs = _config.initialDelayMs;
|
|
|
|
while (true) {
|
|
try {
|
|
_setStatus(StreamConnectionStatus.connected);
|
|
|
|
// Try realtime stream first
|
|
yield* _realtimeStream
|
|
.map(
|
|
(rows) => StreamRecoveryResult<T>(
|
|
data: rows.map(_fromMap).toList(),
|
|
connectionStatus: StreamConnectionStatus.connected,
|
|
isStale: false,
|
|
),
|
|
)
|
|
.handleError((error) {
|
|
debugPrint(
|
|
'StreamRecoveryWrapper: realtime stream error: $error',
|
|
);
|
|
_setStatus(StreamConnectionStatus.recovering);
|
|
throw error; // Propagate to outer handler
|
|
});
|
|
|
|
// If we get here, stream completed normally (shouldn't happen)
|
|
break;
|
|
} catch (e) {
|
|
debugPrint(
|
|
'StreamRecoveryWrapper: realtime failed, error=$e, '
|
|
'attempts=$_recoveryAttempts/${_config.maxRecoveryAttempts}',
|
|
);
|
|
|
|
// Exceeded max recovery attempts?
|
|
if (_recoveryAttempts >= _config.maxRecoveryAttempts) {
|
|
if (_config.enablePollingFallback) {
|
|
_setStatus(StreamConnectionStatus.polling);
|
|
yield* _pollingFallback();
|
|
break;
|
|
} else {
|
|
_setStatus(StreamConnectionStatus.failed);
|
|
yield StreamRecoveryResult<T>(
|
|
data: const [],
|
|
connectionStatus: StreamConnectionStatus.failed,
|
|
isStale: true,
|
|
error: e.toString(),
|
|
);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Exponential backoff before retry
|
|
_recoveryAttempts++;
|
|
await Future.delayed(Duration(milliseconds: delayMs));
|
|
delayMs = (delayMs * _config.backoffMultiplier).toInt();
|
|
if (delayMs > _config.maxDelayMs) {
|
|
delayMs = _config.maxDelayMs;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Fallback to periodic REST polling when realtime is unavailable.
|
|
Stream<StreamRecoveryResult<T>> _pollingFallback() async* {
|
|
while (_connectionStatus == StreamConnectionStatus.polling) {
|
|
try {
|
|
final data = await _onPollData();
|
|
yield StreamRecoveryResult<T>(
|
|
data: data,
|
|
connectionStatus: StreamConnectionStatus.polling,
|
|
isStale: true, // Mark as stale since it's no longer live
|
|
);
|
|
await Future.delayed(Duration(milliseconds: _config.pollingIntervalMs));
|
|
} catch (e) {
|
|
debugPrint('StreamRecoveryWrapper: polling error: $e');
|
|
_setStatus(StreamConnectionStatus.stale);
|
|
yield StreamRecoveryResult<T>(
|
|
data: const [],
|
|
connectionStatus: StreamConnectionStatus.stale,
|
|
isStale: true,
|
|
error: e.toString(),
|
|
);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Update connection status and notify listeners.
|
|
void _setStatus(StreamConnectionStatus status) {
|
|
if (_connectionStatus != status) {
|
|
_connectionStatus = status;
|
|
_statusController?.add(status);
|
|
}
|
|
}
|
|
|
|
/// Manually trigger a recovery attempt.
|
|
void retry() {
|
|
_recoveryAttempts = 0;
|
|
_setStatus(StreamConnectionStatus.recovering);
|
|
}
|
|
|
|
/// Clean up resources.
|
|
void dispose() {
|
|
_pollingTimer?.cancel();
|
|
_statusController?.close();
|
|
}
|
|
}
|
|
|
|
/// Result of a stream emission, including metadata about connection status.
|
|
class StreamRecoveryResult<T> {
|
|
/// The data emitted by the stream.
|
|
final List<T> data;
|
|
|
|
/// Current connection status.
|
|
final StreamConnectionStatus connectionStatus;
|
|
|
|
/// Whether the data is stale (not live from realtime).
|
|
final bool isStale;
|
|
|
|
/// Error message, if any.
|
|
final String? error;
|
|
|
|
StreamRecoveryResult({
|
|
required this.data,
|
|
required this.connectionStatus,
|
|
required this.isStale,
|
|
this.error,
|
|
});
|
|
|
|
/// True if data is live and reliable.
|
|
bool get isLive => connectionStatus == StreamConnectionStatus.connected;
|
|
|
|
/// True if we should show a "data may be stale" indicator.
|
|
bool get shouldIndicateStale =>
|
|
isStale || connectionStatus == StreamConnectionStatus.polling;
|
|
}
|