Per channel skeleton
This commit is contained in:
parent
b9153a070f
commit
029e671367
|
|
@ -7,6 +7,7 @@ import '../models/notification_item.dart';
|
|||
import 'profile_provider.dart';
|
||||
import 'supabase_provider.dart';
|
||||
import 'stream_recovery.dart';
|
||||
import 'realtime_controller.dart';
|
||||
import '../utils/app_time.dart';
|
||||
|
||||
final notificationsProvider = StreamProvider<List<NotificationItem>>((ref) {
|
||||
|
|
@ -31,6 +32,8 @@ final notificationsProvider = StreamProvider<List<NotificationItem>>((ref) {
|
|||
return data.map(NotificationItem.fromMap).toList();
|
||||
},
|
||||
fromMap: NotificationItem.fromMap,
|
||||
channelName: 'notifications',
|
||||
onStatusChanged: ref.read(realtimeControllerProvider).handleChannelStatus,
|
||||
);
|
||||
|
||||
ref.onDispose(wrapper.dispose);
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import 'package:flutter/foundation.dart';
|
|||
import 'package:flutter_riverpod/flutter_riverpod.dart';
|
||||
import 'package:supabase_flutter/supabase_flutter.dart';
|
||||
|
||||
import 'stream_recovery.dart';
|
||||
import 'supabase_provider.dart';
|
||||
|
||||
final realtimeControllerProvider = ChangeNotifierProvider<RealtimeController>((
|
||||
|
|
@ -15,18 +16,22 @@ final realtimeControllerProvider = ChangeNotifierProvider<RealtimeController>((
|
|||
return controller;
|
||||
});
|
||||
|
||||
/// Simplified realtime controller for app-lifecycle awareness.
|
||||
/// Individual streams now handle their own recovery via [StreamRecoveryWrapper].
|
||||
/// This controller only coordinates:
|
||||
/// - App lifecycle transitions (background/foreground)
|
||||
/// - Auth token refreshes
|
||||
/// - Global connection state notification (for UI indicators)
|
||||
/// Per-channel realtime controller for UI skeleton indicators.
|
||||
///
|
||||
/// Individual streams handle their own recovery via [StreamRecoveryWrapper].
|
||||
/// This controller aggregates channel-level status so the UI can show
|
||||
/// per-channel skeleton shimmers (e.g. only the tasks list shimmers when
|
||||
/// the `tasks` channel is recovering, not the whole app).
|
||||
///
|
||||
/// Coordinates:
|
||||
/// - Per-channel recovering state for pinpoint skeleton indicators
|
||||
/// - Auth token refreshes for realtime connections
|
||||
class RealtimeController extends ChangeNotifier {
|
||||
final SupabaseClient _client;
|
||||
bool _disposed = false;
|
||||
|
||||
/// Global flag: true if any stream is recovering; used for subtle UI indicator.
|
||||
bool isAnyStreamRecovering = false;
|
||||
/// Channels currently in a recovering/polling/stale state.
|
||||
final Set<String> _recoveringChannels = {};
|
||||
|
||||
RealtimeController(this._client) {
|
||||
_init();
|
||||
|
|
@ -34,12 +39,8 @@ class RealtimeController extends ChangeNotifier {
|
|||
|
||||
void _init() {
|
||||
try {
|
||||
// Listen for auth changes; ensure tokens are fresh for realtime.
|
||||
// Individual streams will handle their own reconnection.
|
||||
_client.auth.onAuthStateChange.listen((data) {
|
||||
final event = data.event;
|
||||
// Only refresh token on existing session refreshes, not immediately after sign-in
|
||||
// (sign-in already provides a fresh token)
|
||||
if (event == AuthChangeEvent.tokenRefreshed) {
|
||||
_ensureTokenFresh();
|
||||
}
|
||||
|
|
@ -49,12 +50,9 @@ class RealtimeController extends ChangeNotifier {
|
|||
}
|
||||
}
|
||||
|
||||
/// Ensure auth token is fresh for upcoming realtime operations.
|
||||
/// This is called after token refresh events, not immediately after sign-in.
|
||||
Future<void> _ensureTokenFresh() async {
|
||||
if (_disposed) return;
|
||||
try {
|
||||
// Defensive: only refresh if the method exists (SDK version compatibility)
|
||||
final authDynamic = _client.auth as dynamic;
|
||||
if (authDynamic.refreshSession != null) {
|
||||
await authDynamic.refreshSession?.call();
|
||||
|
|
@ -64,24 +62,59 @@ class RealtimeController extends ChangeNotifier {
|
|||
}
|
||||
}
|
||||
|
||||
/// Notify that a stream is starting recovery. Used for global UI indicator.
|
||||
void markStreamRecovering() {
|
||||
if (!isAnyStreamRecovering) {
|
||||
isAnyStreamRecovering = true;
|
||||
// ── Per-channel status ─────────────────────────────────────────────────
|
||||
|
||||
/// Whether a specific channel is currently recovering.
|
||||
bool isChannelRecovering(String channel) =>
|
||||
_recoveringChannels.contains(channel);
|
||||
|
||||
/// Global flag: true if **any** channel is recovering. Useful for global
|
||||
/// indicators (e.g. dashboard) where per-channel granularity isn't needed.
|
||||
bool get isAnyStreamRecovering => _recoveringChannels.isNotEmpty;
|
||||
|
||||
/// The set of channels currently recovering, for UI display.
|
||||
Set<String> get recoveringChannels => Set.unmodifiable(_recoveringChannels);
|
||||
|
||||
/// Mark a channel as recovering. Called by [StreamRecoveryWrapper] via its
|
||||
/// [ChannelStatusCallback].
|
||||
void markChannelRecovering(String channel) {
|
||||
if (_recoveringChannels.add(channel)) {
|
||||
notifyListeners();
|
||||
}
|
||||
}
|
||||
|
||||
/// Notify that stream recovery completed. If all streams recovered, update state.
|
||||
void markStreamRecovered() {
|
||||
// In practice, individual streams notify their own status via statusChanges.
|
||||
// This is kept for potential future global coordination.
|
||||
if (isAnyStreamRecovering) {
|
||||
isAnyStreamRecovering = false;
|
||||
/// Mark a channel as recovered. Called when realtime reconnects
|
||||
/// successfully.
|
||||
void markChannelRecovered(String channel) {
|
||||
if (_recoveringChannels.remove(channel)) {
|
||||
notifyListeners();
|
||||
}
|
||||
}
|
||||
|
||||
/// Convenience callback suitable for [StreamRecoveryWrapper.onStatusChanged].
|
||||
///
|
||||
/// Routes [StreamConnectionStatus] to the appropriate mark method.
|
||||
void handleChannelStatus(String channel, StreamConnectionStatus status) {
|
||||
if (status == StreamConnectionStatus.connected) {
|
||||
markChannelRecovered(channel);
|
||||
} else {
|
||||
markChannelRecovering(channel);
|
||||
}
|
||||
}
|
||||
|
||||
// ── Legacy compat ─────────────────────────────────────────────────────
|
||||
|
||||
/// @deprecated Use [markChannelRecovering] instead.
|
||||
void markStreamRecovering() {
|
||||
// Kept for backward compatibility; maps to a synthetic channel.
|
||||
markChannelRecovering('_global');
|
||||
}
|
||||
|
||||
/// @deprecated Use [markChannelRecovered] instead.
|
||||
void markStreamRecovered() {
|
||||
markChannelRecovered('_global');
|
||||
}
|
||||
|
||||
@override
|
||||
void dispose() {
|
||||
_disposed = true;
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import 'package:flutter_riverpod/flutter_riverpod.dart';
|
|||
import '../models/service.dart';
|
||||
import 'supabase_provider.dart';
|
||||
import 'stream_recovery.dart';
|
||||
import 'realtime_controller.dart';
|
||||
|
||||
final servicesProvider = StreamProvider<List<Service>>((ref) {
|
||||
final client = ref.watch(supabaseClientProvider);
|
||||
|
|
@ -14,6 +15,8 @@ final servicesProvider = StreamProvider<List<Service>>((ref) {
|
|||
return data.map(Service.fromMap).toList();
|
||||
},
|
||||
fromMap: Service.fromMap,
|
||||
channelName: 'services',
|
||||
onStatusChanged: ref.read(realtimeControllerProvider).handleChannelStatus,
|
||||
);
|
||||
|
||||
ref.onDispose(wrapper.dispose);
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import 'dart:async';
|
||||
import 'dart:math' as math;
|
||||
import 'package:flutter/foundation.dart';
|
||||
|
||||
/// Connection status for a single stream subscription.
|
||||
|
|
@ -58,17 +59,37 @@ class StreamRecoveryConfig {
|
|||
});
|
||||
}
|
||||
|
||||
/// Callback type for per-channel status change notifications.
|
||||
///
|
||||
/// Used by [StreamRecoveryWrapper] to notify [RealtimeController] about
|
||||
/// individual channel recovery state so the UI can show per-channel skeletons.
|
||||
typedef ChannelStatusCallback =
|
||||
void Function(String channel, StreamConnectionStatus status);
|
||||
|
||||
/// Wraps a Supabase realtime stream with automatic recovery, polling fallback,
|
||||
/// and connection status tracking. Provides graceful degradation when the
|
||||
/// realtime connection fails.
|
||||
///
|
||||
/// Error handling:
|
||||
/// - **Timeout**: detected and handled internally with exponential backoff.
|
||||
/// - **ChannelRateLimitReached**: detected and handled with a longer minimum
|
||||
/// delay (5 s) before retrying. During recovery, a REST poll keeps data
|
||||
/// fresh so the UI shows shimmer/skeleton instead of an error.
|
||||
/// - **Generic errors**: same recovery flow with standard backoff.
|
||||
///
|
||||
/// Errors are **never** forwarded to consumers; instead the wrapper emits
|
||||
/// [StreamRecoveryResult] events with appropriate [connectionStatus] so the
|
||||
/// UI can react per-channel (e.g. show skeleton shimmer).
|
||||
///
|
||||
/// Usage:
|
||||
/// ```dart
|
||||
/// final wrappedStream = StreamRecoveryWrapper(
|
||||
/// final wrapper = StreamRecoveryWrapper<Task>(
|
||||
/// stream: client.from('tasks').stream(primaryKey: ['id']),
|
||||
/// onPollData: () => fetchTasksViaRest(),
|
||||
/// fromMap: Task.fromMap,
|
||||
/// channelName: 'tasks',
|
||||
/// onStatusChanged: (channel, status) { ... },
|
||||
/// );
|
||||
/// // wrappedStream.stream emits data with connection status in metadata
|
||||
/// ```
|
||||
class StreamRecoveryWrapper<T> {
|
||||
final Stream<List<Map<String, dynamic>>> _realtimeStream;
|
||||
|
|
@ -76,140 +97,245 @@ class StreamRecoveryWrapper<T> {
|
|||
final T Function(Map<String, dynamic>) _fromMap;
|
||||
final StreamRecoveryConfig _config;
|
||||
|
||||
/// Human-readable channel name for logging and per-channel status tracking.
|
||||
final String channelName;
|
||||
|
||||
/// Optional callback invoked whenever this channel's connection status
|
||||
/// changes. Used to integrate with [RealtimeController] for per-channel
|
||||
/// skeleton indicators in the UI.
|
||||
final ChannelStatusCallback? _onStatusChanged;
|
||||
|
||||
StreamConnectionStatus _connectionStatus = StreamConnectionStatus.connected;
|
||||
int _recoveryAttempts = 0;
|
||||
Timer? _pollingTimer;
|
||||
StreamController<StreamConnectionStatus>? _statusController;
|
||||
Stream<StreamRecoveryResult<T>>? _cachedStream;
|
||||
Timer? _recoveryTimer;
|
||||
StreamSubscription<List<Map<String, dynamic>>>? _realtimeSub;
|
||||
StreamController<StreamRecoveryResult<T>>? _controller;
|
||||
bool _disposed = false;
|
||||
bool _listening = false;
|
||||
|
||||
StreamRecoveryWrapper({
|
||||
required Stream<List<Map<String, dynamic>>> stream,
|
||||
required Future<List<T>> Function() onPollData,
|
||||
required T Function(Map<String, dynamic>) fromMap,
|
||||
StreamRecoveryConfig config = const StreamRecoveryConfig(),
|
||||
this.channelName = 'unknown',
|
||||
ChannelStatusCallback? onStatusChanged,
|
||||
}) : _realtimeStream = stream,
|
||||
_onPollData = onPollData,
|
||||
_fromMap = fromMap,
|
||||
_config = config;
|
||||
_config = config,
|
||||
_onStatusChanged = onStatusChanged;
|
||||
|
||||
/// The wrapped stream that emits recovery results with metadata.
|
||||
Stream<StreamRecoveryResult<T>> get stream =>
|
||||
_cachedStream ??= _buildStream();
|
||||
///
|
||||
/// Lazily initializes the internal controller and starts listening to the
|
||||
/// realtime stream on first access. Errors from the realtime channel are
|
||||
/// handled internally — consumers only see [StreamRecoveryResult] events.
|
||||
Stream<StreamRecoveryResult<T>> get stream {
|
||||
if (_controller == null) {
|
||||
_controller = StreamController<StreamRecoveryResult<T>>.broadcast();
|
||||
_startRealtimeSubscription();
|
||||
}
|
||||
return _controller!.stream;
|
||||
}
|
||||
|
||||
/// Current connection status of this stream.
|
||||
StreamConnectionStatus get connectionStatus => _connectionStatus;
|
||||
|
||||
/// Notifies listeners when connection status changes.
|
||||
Stream<StreamConnectionStatus> get statusChanges {
|
||||
_statusController ??= StreamController<StreamConnectionStatus>.broadcast();
|
||||
return _statusController!.stream;
|
||||
// ── Realtime subscription ───────────────────────────────────────────────
|
||||
|
||||
void _startRealtimeSubscription() {
|
||||
if (_disposed || _listening) return;
|
||||
_listening = true;
|
||||
_realtimeSub?.cancel();
|
||||
_realtimeSub = _realtimeStream.listen(
|
||||
_onRealtimeData,
|
||||
onError: _onRealtimeError,
|
||||
onDone: _onRealtimeDone,
|
||||
cancelOnError: false, // keep listening even after transient errors
|
||||
);
|
||||
}
|
||||
|
||||
/// Builds the wrapped stream with recovery and polling logic.
|
||||
Stream<StreamRecoveryResult<T>> _buildStream() async* {
|
||||
int delayMs = _config.initialDelayMs;
|
||||
void _onRealtimeData(List<Map<String, dynamic>> rows) {
|
||||
if (_disposed) return;
|
||||
// Successful data — reset recovery counters.
|
||||
_recoveryAttempts = 0;
|
||||
_setStatus(StreamConnectionStatus.connected);
|
||||
_emit(
|
||||
StreamRecoveryResult<T>(
|
||||
data: rows.map(_fromMap).toList(),
|
||||
connectionStatus: StreamConnectionStatus.connected,
|
||||
isStale: false,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
_setStatus(StreamConnectionStatus.connected);
|
||||
void _onRealtimeError(Object error, [StackTrace? stack]) {
|
||||
if (_disposed) return;
|
||||
|
||||
// Try realtime stream first
|
||||
yield* _realtimeStream
|
||||
.map(
|
||||
(rows) => StreamRecoveryResult<T>(
|
||||
data: rows.map(_fromMap).toList(),
|
||||
connectionStatus: StreamConnectionStatus.connected,
|
||||
isStale: false,
|
||||
),
|
||||
)
|
||||
.handleError((error) {
|
||||
debugPrint(
|
||||
'StreamRecoveryWrapper: realtime stream error: $error',
|
||||
);
|
||||
_setStatus(StreamConnectionStatus.recovering);
|
||||
throw error; // Propagate to outer handler
|
||||
});
|
||||
final isRateLimit = _isRateLimitError(error);
|
||||
final isTimeout = _isTimeoutError(error);
|
||||
final tag = isRateLimit
|
||||
? ' (rate-limited)'
|
||||
: isTimeout
|
||||
? ' (timeout)'
|
||||
: '';
|
||||
|
||||
// If we get here, stream completed normally (shouldn't happen)
|
||||
break;
|
||||
} catch (e) {
|
||||
debugPrint(
|
||||
'StreamRecoveryWrapper: realtime failed, error=$e, '
|
||||
'attempts=$_recoveryAttempts/${_config.maxRecoveryAttempts}',
|
||||
debugPrint('StreamRecoveryWrapper[$channelName]: stream error$tag: $error');
|
||||
|
||||
_setStatus(StreamConnectionStatus.recovering);
|
||||
|
||||
if (_recoveryAttempts >= _config.maxRecoveryAttempts) {
|
||||
if (_config.enablePollingFallback) {
|
||||
_startPollingFallback();
|
||||
} else {
|
||||
_setStatus(StreamConnectionStatus.failed);
|
||||
_emit(
|
||||
StreamRecoveryResult<T>(
|
||||
data: const [],
|
||||
connectionStatus: StreamConnectionStatus.failed,
|
||||
isStale: true,
|
||||
error: error.toString(),
|
||||
),
|
||||
);
|
||||
|
||||
// Exceeded max recovery attempts?
|
||||
if (_recoveryAttempts >= _config.maxRecoveryAttempts) {
|
||||
if (_config.enablePollingFallback) {
|
||||
_setStatus(StreamConnectionStatus.polling);
|
||||
yield* _pollingFallback();
|
||||
break;
|
||||
} else {
|
||||
_setStatus(StreamConnectionStatus.failed);
|
||||
yield StreamRecoveryResult<T>(
|
||||
data: const [],
|
||||
connectionStatus: StreamConnectionStatus.failed,
|
||||
isStale: true,
|
||||
error: e.toString(),
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Exponential backoff before retry
|
||||
_recoveryAttempts++;
|
||||
await Future.delayed(Duration(milliseconds: delayMs));
|
||||
delayMs = (delayMs * _config.backoffMultiplier).toInt();
|
||||
if (delayMs > _config.maxDelayMs) {
|
||||
delayMs = _config.maxDelayMs;
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
_recoveryAttempts++;
|
||||
|
||||
// Compute backoff delay. Rate-limit errors get a longer floor (5 s).
|
||||
final baseDelay =
|
||||
_config.initialDelayMs *
|
||||
math.pow(_config.backoffMultiplier, _recoveryAttempts - 1);
|
||||
final effectiveDelay = isRateLimit
|
||||
? math.max(baseDelay.toInt(), 5000)
|
||||
: baseDelay.toInt();
|
||||
final cappedDelay = math.min(effectiveDelay, _config.maxDelayMs);
|
||||
|
||||
debugPrint(
|
||||
'StreamRecoveryWrapper[$channelName]: recovery attempt '
|
||||
'$_recoveryAttempts/${_config.maxRecoveryAttempts}, '
|
||||
'delay=${cappedDelay}ms$tag',
|
||||
);
|
||||
|
||||
// Fire a single REST poll immediately so the UI can show fresh data
|
||||
// under the skeleton shimmer while waiting for realtime to reconnect.
|
||||
_pollOnce();
|
||||
|
||||
// Schedule re-subscription after backoff.
|
||||
_recoveryTimer?.cancel();
|
||||
_recoveryTimer = Timer(Duration(milliseconds: cappedDelay), () {
|
||||
if (_disposed) return;
|
||||
_listening = false;
|
||||
_startRealtimeSubscription();
|
||||
});
|
||||
}
|
||||
|
||||
void _onRealtimeDone() {
|
||||
if (_disposed) return;
|
||||
debugPrint('StreamRecoveryWrapper[$channelName]: stream completed');
|
||||
// Attempt to reconnect once if the stream closes unexpectedly.
|
||||
if (_recoveryAttempts < _config.maxRecoveryAttempts) {
|
||||
_onRealtimeError(StateError('realtime stream completed unexpectedly'));
|
||||
}
|
||||
}
|
||||
|
||||
/// Fallback to periodic REST polling when realtime is unavailable.
|
||||
Stream<StreamRecoveryResult<T>> _pollingFallback() async* {
|
||||
while (_connectionStatus == StreamConnectionStatus.polling) {
|
||||
try {
|
||||
final data = await _onPollData();
|
||||
yield StreamRecoveryResult<T>(
|
||||
// ── Polling fallback ──────────────────────────────────────────────────
|
||||
|
||||
void _startPollingFallback() {
|
||||
_realtimeSub?.cancel();
|
||||
_listening = false;
|
||||
_setStatus(StreamConnectionStatus.polling);
|
||||
_pollingTimer?.cancel();
|
||||
_pollOnce(); // Immediate first poll
|
||||
_pollingTimer = Timer.periodic(
|
||||
Duration(milliseconds: _config.pollingIntervalMs),
|
||||
(_) => _pollOnce(),
|
||||
);
|
||||
}
|
||||
|
||||
Future<void> _pollOnce() async {
|
||||
if (_disposed) return;
|
||||
try {
|
||||
final data = await _onPollData();
|
||||
_emit(
|
||||
StreamRecoveryResult<T>(
|
||||
data: data,
|
||||
connectionStatus: StreamConnectionStatus.polling,
|
||||
isStale: true, // Mark as stale since it's no longer live
|
||||
);
|
||||
await Future.delayed(Duration(milliseconds: _config.pollingIntervalMs));
|
||||
} catch (e) {
|
||||
debugPrint('StreamRecoveryWrapper: polling error: $e');
|
||||
_setStatus(StreamConnectionStatus.stale);
|
||||
yield StreamRecoveryResult<T>(
|
||||
data: const [],
|
||||
connectionStatus: StreamConnectionStatus.stale,
|
||||
connectionStatus: _connectionStatus,
|
||||
isStale: true,
|
||||
error: e.toString(),
|
||||
),
|
||||
);
|
||||
} catch (e) {
|
||||
debugPrint('StreamRecoveryWrapper[$channelName]: poll error: $e');
|
||||
if (_connectionStatus == StreamConnectionStatus.polling) {
|
||||
_setStatus(StreamConnectionStatus.stale);
|
||||
_emit(
|
||||
StreamRecoveryResult<T>(
|
||||
data: const [],
|
||||
connectionStatus: StreamConnectionStatus.stale,
|
||||
isStale: true,
|
||||
error: e.toString(),
|
||||
),
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Update connection status and notify listeners.
|
||||
// ── Error classification ──────────────────────────────────────────────
|
||||
|
||||
/// Whether [error] indicates a Supabase channel rate limit.
|
||||
static bool _isRateLimitError(Object error) {
|
||||
final msg = error.toString().toLowerCase();
|
||||
return msg.contains('rate limit') ||
|
||||
msg.contains('rate_limit') ||
|
||||
msg.contains('channelratelimitreached') ||
|
||||
msg.contains('too many') ||
|
||||
msg.contains('429');
|
||||
}
|
||||
|
||||
/// Whether [error] indicates a subscription timeout.
|
||||
static bool _isTimeoutError(Object error) {
|
||||
if (error is TimeoutException) return true;
|
||||
final msg = error.toString().toLowerCase();
|
||||
return msg.contains('timeout') ||
|
||||
msg.contains('timed out') ||
|
||||
msg.contains('timed_out');
|
||||
}
|
||||
|
||||
// ── Helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
void _emit(StreamRecoveryResult<T> result) {
|
||||
if (!_disposed && _controller != null && !_controller!.isClosed) {
|
||||
_controller!.add(result);
|
||||
}
|
||||
}
|
||||
|
||||
/// Update connection status and notify the per-channel callback.
|
||||
void _setStatus(StreamConnectionStatus status) {
|
||||
if (_connectionStatus != status) {
|
||||
_connectionStatus = status;
|
||||
_statusController?.add(status);
|
||||
_onStatusChanged?.call(channelName, status);
|
||||
}
|
||||
}
|
||||
|
||||
/// Manually trigger a recovery attempt.
|
||||
void retry() {
|
||||
_recoveryAttempts = 0;
|
||||
_setStatus(StreamConnectionStatus.recovering);
|
||||
_pollingTimer?.cancel();
|
||||
_recoveryTimer?.cancel();
|
||||
_listening = false;
|
||||
_startRealtimeSubscription();
|
||||
}
|
||||
|
||||
/// Clean up resources.
|
||||
/// Clean up all resources.
|
||||
void dispose() {
|
||||
_disposed = true;
|
||||
_pollingTimer?.cancel();
|
||||
_statusController?.close();
|
||||
_recoveryTimer?.cancel();
|
||||
_realtimeSub?.cancel();
|
||||
_controller?.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import 'supabase_provider.dart';
|
|||
import 'tickets_provider.dart';
|
||||
import 'user_offices_provider.dart';
|
||||
import 'stream_recovery.dart';
|
||||
import 'realtime_controller.dart';
|
||||
import '../utils/app_time.dart';
|
||||
|
||||
// Helper to insert activity log rows while sanitizing nulls and
|
||||
|
|
@ -198,6 +199,8 @@ final tasksProvider = StreamProvider<List<Task>>((ref) {
|
|||
return data.cast<Map<String, dynamic>>().map(Task.fromMap).toList();
|
||||
},
|
||||
fromMap: Task.fromMap,
|
||||
channelName: 'tasks',
|
||||
onStatusChanged: ref.read(realtimeControllerProvider).handleChannelStatus,
|
||||
);
|
||||
|
||||
ref.onDispose(wrapper.dispose);
|
||||
|
|
@ -446,6 +449,8 @@ final taskAssignmentsProvider = StreamProvider<List<TaskAssignment>>((ref) {
|
|||
return data.map(TaskAssignment.fromMap).toList();
|
||||
},
|
||||
fromMap: TaskAssignment.fromMap,
|
||||
channelName: 'task_assignments',
|
||||
onStatusChanged: ref.read(realtimeControllerProvider).handleChannelStatus,
|
||||
);
|
||||
|
||||
ref.onDispose(wrapper.dispose);
|
||||
|
|
@ -472,6 +477,10 @@ final taskActivityLogsProvider =
|
|||
return data.map((r) => TaskActivityLog.fromMap(r)).toList();
|
||||
},
|
||||
fromMap: TaskActivityLog.fromMap,
|
||||
channelName: 'task_activity_logs:$taskId',
|
||||
onStatusChanged: ref
|
||||
.read(realtimeControllerProvider)
|
||||
.handleChannelStatus,
|
||||
);
|
||||
|
||||
ref.onDispose(wrapper.dispose);
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ import 'package:flutter_riverpod/flutter_riverpod.dart';
|
|||
|
||||
import 'supabase_provider.dart';
|
||||
import 'stream_recovery.dart';
|
||||
import 'realtime_controller.dart';
|
||||
import '../models/team.dart';
|
||||
import '../models/team_member.dart';
|
||||
|
||||
|
|
@ -16,6 +17,8 @@ final teamsProvider = StreamProvider<List<Team>>((ref) {
|
|||
return data.map(Team.fromMap).toList();
|
||||
},
|
||||
fromMap: Team.fromMap,
|
||||
channelName: 'teams',
|
||||
onStatusChanged: ref.read(realtimeControllerProvider).handleChannelStatus,
|
||||
);
|
||||
|
||||
ref.onDispose(wrapper.dispose);
|
||||
|
|
@ -35,6 +38,8 @@ final teamMembersProvider = StreamProvider<List<TeamMember>>((ref) {
|
|||
return data.map(TeamMember.fromMap).toList();
|
||||
},
|
||||
fromMap: TeamMember.fromMap,
|
||||
channelName: 'team_members',
|
||||
onStatusChanged: ref.read(realtimeControllerProvider).handleChannelStatus,
|
||||
);
|
||||
|
||||
ref.onDispose(wrapper.dispose);
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import 'supabase_provider.dart';
|
|||
import 'user_offices_provider.dart';
|
||||
import 'tasks_provider.dart';
|
||||
import 'stream_recovery.dart';
|
||||
import 'realtime_controller.dart';
|
||||
|
||||
final officesProvider = StreamProvider<List<Office>>((ref) {
|
||||
final client = ref.watch(supabaseClientProvider);
|
||||
|
|
@ -24,6 +25,8 @@ final officesProvider = StreamProvider<List<Office>>((ref) {
|
|||
return data.map(Office.fromMap).toList();
|
||||
},
|
||||
fromMap: Office.fromMap,
|
||||
channelName: 'offices',
|
||||
onStatusChanged: ref.read(realtimeControllerProvider).handleChannelStatus,
|
||||
);
|
||||
|
||||
ref.onDispose(wrapper.dispose);
|
||||
|
|
@ -191,6 +194,8 @@ final ticketsProvider = StreamProvider<List<Ticket>>((ref) {
|
|||
return data.cast<Map<String, dynamic>>().map(Ticket.fromMap).toList();
|
||||
},
|
||||
fromMap: Ticket.fromMap,
|
||||
channelName: 'tickets',
|
||||
onStatusChanged: ref.read(realtimeControllerProvider).handleChannelStatus,
|
||||
);
|
||||
|
||||
ref.onDispose(wrapper.dispose);
|
||||
|
|
@ -373,6 +378,10 @@ final ticketMessagesProvider =
|
|||
return data.map(TicketMessage.fromMap).toList();
|
||||
},
|
||||
fromMap: TicketMessage.fromMap,
|
||||
channelName: 'ticket_messages:$ticketId',
|
||||
onStatusChanged: ref
|
||||
.read(realtimeControllerProvider)
|
||||
.handleChannelStatus,
|
||||
);
|
||||
|
||||
ref.onDispose(wrapper.dispose);
|
||||
|
|
@ -395,6 +404,8 @@ final ticketMessagesAllProvider = StreamProvider<List<TicketMessage>>((ref) {
|
|||
return data.map(TicketMessage.fromMap).toList();
|
||||
},
|
||||
fromMap: TicketMessage.fromMap,
|
||||
channelName: 'ticket_messages_all',
|
||||
onStatusChanged: ref.read(realtimeControllerProvider).handleChannelStatus,
|
||||
);
|
||||
|
||||
ref.onDispose(wrapper.dispose);
|
||||
|
|
@ -420,6 +431,8 @@ final taskMessagesProvider = StreamProvider.family<List<TicketMessage>, String>(
|
|||
return data.map(TicketMessage.fromMap).toList();
|
||||
},
|
||||
fromMap: TicketMessage.fromMap,
|
||||
channelName: 'task_messages:$taskId',
|
||||
onStatusChanged: ref.read(realtimeControllerProvider).handleChannelStatus,
|
||||
);
|
||||
|
||||
ref.onDispose(wrapper.dispose);
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import 'package:supabase_flutter/supabase_flutter.dart';
|
|||
import '../models/user_office.dart';
|
||||
import 'supabase_provider.dart';
|
||||
import 'stream_recovery.dart';
|
||||
import 'realtime_controller.dart';
|
||||
|
||||
final userOfficesProvider = StreamProvider<List<UserOffice>>((ref) {
|
||||
final client = ref.watch(supabaseClientProvider);
|
||||
|
|
@ -21,6 +22,8 @@ final userOfficesProvider = StreamProvider<List<UserOffice>>((ref) {
|
|||
return data.map(UserOffice.fromMap).toList();
|
||||
},
|
||||
fromMap: UserOffice.fromMap,
|
||||
channelName: 'user_offices',
|
||||
onStatusChanged: ref.read(realtimeControllerProvider).handleChannelStatus,
|
||||
);
|
||||
|
||||
ref.onDispose(wrapper.dispose);
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import '../models/swap_request.dart';
|
|||
import 'profile_provider.dart';
|
||||
import 'supabase_provider.dart';
|
||||
import 'stream_recovery.dart';
|
||||
import 'realtime_controller.dart';
|
||||
|
||||
final geofenceProvider = FutureProvider<GeofenceConfig?>((ref) async {
|
||||
final client = ref.watch(supabaseClientProvider);
|
||||
|
|
@ -49,6 +50,8 @@ final dutySchedulesProvider = StreamProvider<List<DutySchedule>>((ref) {
|
|||
return data.map(DutySchedule.fromMap).toList();
|
||||
},
|
||||
fromMap: DutySchedule.fromMap,
|
||||
channelName: 'duty_schedules',
|
||||
onStatusChanged: ref.read(realtimeControllerProvider).handleChannelStatus,
|
||||
);
|
||||
|
||||
ref.onDispose(wrapper.dispose);
|
||||
|
|
@ -122,6 +125,8 @@ final swapRequestsProvider = StreamProvider<List<SwapRequest>>((ref) {
|
|||
return data.map(SwapRequest.fromMap).toList();
|
||||
},
|
||||
fromMap: SwapRequest.fromMap,
|
||||
channelName: 'swap_requests',
|
||||
onStatusChanged: ref.read(realtimeControllerProvider).handleChannelStatus,
|
||||
);
|
||||
|
||||
ref.onDispose(wrapper.dispose);
|
||||
|
|
|
|||
|
|
@ -239,13 +239,14 @@ class _TaskDetailScreenState extends ConsumerState<TaskDetailScreen>
|
|||
|
||||
final realtime = ref.watch(realtimeControllerProvider);
|
||||
final isRetrieving =
|
||||
realtime.isAnyStreamRecovering ||
|
||||
tasksAsync.isLoading ||
|
||||
ticketsAsync.isLoading ||
|
||||
officesAsync.isLoading ||
|
||||
profileAsync.isLoading ||
|
||||
assignmentsAsync.isLoading ||
|
||||
taskMessagesAsync.isLoading;
|
||||
realtime.isChannelRecovering('tasks') ||
|
||||
realtime.isChannelRecovering('task_assignments') ||
|
||||
(!tasksAsync.hasValue && tasksAsync.isLoading) ||
|
||||
(!ticketsAsync.hasValue && ticketsAsync.isLoading) ||
|
||||
(!officesAsync.hasValue && officesAsync.isLoading) ||
|
||||
(!profileAsync.hasValue && profileAsync.isLoading) ||
|
||||
(!assignmentsAsync.hasValue && assignmentsAsync.isLoading) ||
|
||||
(!taskMessagesAsync.hasValue && taskMessagesAsync.isLoading);
|
||||
|
||||
return Skeletonizer(
|
||||
enabled: isRetrieving,
|
||||
|
|
|
|||
|
|
@ -55,7 +55,6 @@ class _TasksListScreenState extends ConsumerState<TasksListScreen>
|
|||
String? _selectedAssigneeId;
|
||||
DateTimeRange? _selectedDateRange;
|
||||
late final TabController _tabController;
|
||||
bool _isSwitchingTab = false;
|
||||
|
||||
@override
|
||||
void dispose() {
|
||||
|
|
@ -69,16 +68,10 @@ class _TasksListScreenState extends ConsumerState<TasksListScreen>
|
|||
void initState() {
|
||||
super.initState();
|
||||
_tabController = TabController(length: 2, vsync: this);
|
||||
// Rebuild when tab changes so filter header can show/hide the
|
||||
// "Assigned staff" dropdown for the All Tasks tab.
|
||||
_tabController.addListener(() {
|
||||
// briefly show a skeleton when switching tabs so the UI can
|
||||
// navigate ahead and avoid a janky synchronous rebuild.
|
||||
if (!_isSwitchingTab) {
|
||||
setState(() => _isSwitchingTab = true);
|
||||
Future.delayed(const Duration(milliseconds: 150), () {
|
||||
if (!mounted) return;
|
||||
setState(() => _isSwitchingTab = false);
|
||||
});
|
||||
}
|
||||
if (mounted) setState(() {});
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -103,15 +96,19 @@ class _TasksListScreenState extends ConsumerState<TasksListScreen>
|
|||
final assignmentsAsync = ref.watch(taskAssignmentsProvider);
|
||||
final realtime = ref.watch(realtimeControllerProvider);
|
||||
|
||||
// Show skeleton only on initial load (no previous data) or when the
|
||||
// tasks channel is recovering. Use per-channel check so unrelated
|
||||
// channel issues (e.g. notifications) don't skeleton the tasks list.
|
||||
final showSkeleton =
|
||||
realtime.isAnyStreamRecovering ||
|
||||
tasksAsync.maybeWhen(loading: () => true, orElse: () => false) ||
|
||||
ticketsAsync.maybeWhen(loading: () => true, orElse: () => false) ||
|
||||
officesAsync.maybeWhen(loading: () => true, orElse: () => false) ||
|
||||
profilesAsync.maybeWhen(loading: () => true, orElse: () => false) ||
|
||||
assignmentsAsync.maybeWhen(loading: () => true, orElse: () => false) ||
|
||||
profileAsync.maybeWhen(loading: () => true, orElse: () => false);
|
||||
final effectiveShowSkeleton = showSkeleton || _isSwitchingTab;
|
||||
realtime.isChannelRecovering('tasks') ||
|
||||
realtime.isChannelRecovering('task_assignments') ||
|
||||
(!tasksAsync.hasValue && tasksAsync.isLoading) ||
|
||||
(!ticketsAsync.hasValue && ticketsAsync.isLoading) ||
|
||||
(!officesAsync.hasValue && officesAsync.isLoading) ||
|
||||
(!profilesAsync.hasValue && profilesAsync.isLoading) ||
|
||||
(!assignmentsAsync.hasValue && assignmentsAsync.isLoading) ||
|
||||
(!profileAsync.hasValue && profileAsync.isLoading);
|
||||
final effectiveShowSkeleton = showSkeleton;
|
||||
|
||||
final canCreate = profileAsync.maybeWhen(
|
||||
data: (profile) =>
|
||||
|
|
@ -141,9 +138,24 @@ class _TasksListScreenState extends ConsumerState<TasksListScreen>
|
|||
maxWidth: double.infinity,
|
||||
child: Skeletonizer(
|
||||
enabled: effectiveShowSkeleton,
|
||||
child: tasksAsync.when(
|
||||
data: (tasks) {
|
||||
if (tasks.isEmpty) {
|
||||
// Always render the full layout structure using valueOrNull so
|
||||
// that Skeletonizer has real widget shapes to shimmer. This
|
||||
// avoids the blank flash caused by the old `.when(loading:
|
||||
// () => SizedBox.shrink())` approach and keeps previous data
|
||||
// visible during provider refreshes.
|
||||
child: Builder(
|
||||
builder: (context) {
|
||||
// Show error only when there is genuinely no data.
|
||||
if (tasksAsync.hasError && !tasksAsync.hasValue) {
|
||||
return Center(
|
||||
child: Text('Failed to load tasks: ${tasksAsync.error}'),
|
||||
);
|
||||
}
|
||||
|
||||
final tasks = tasksAsync.valueOrNull ?? <Task>[];
|
||||
|
||||
// True empty state — data loaded but nothing returned.
|
||||
if (tasks.isEmpty && !effectiveShowSkeleton) {
|
||||
return const Center(child: Text('No tasks yet.'));
|
||||
}
|
||||
final offices = officesAsync.valueOrNull ?? <Office>[];
|
||||
|
|
@ -527,9 +539,6 @@ class _TasksListScreenState extends ConsumerState<TasksListScreen>
|
|||
],
|
||||
);
|
||||
},
|
||||
loading: () => const SizedBox.shrink(),
|
||||
error: (error, _) =>
|
||||
Center(child: Text('Failed to load tasks: $error')),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
|
|
|||
|
|
@ -64,11 +64,12 @@ class _TicketsListScreenState extends ConsumerState<TicketsListScreen> {
|
|||
final profilesAsync = ref.watch(profilesProvider);
|
||||
|
||||
final showSkeleton =
|
||||
realtime.isAnyStreamRecovering ||
|
||||
ticketsAsync.maybeWhen(loading: () => true, orElse: () => false) ||
|
||||
officesAsync.maybeWhen(loading: () => true, orElse: () => false) ||
|
||||
profilesAsync.maybeWhen(loading: () => true, orElse: () => false) ||
|
||||
notificationsAsync.maybeWhen(loading: () => true, orElse: () => false);
|
||||
realtime.isChannelRecovering('tickets') ||
|
||||
realtime.isChannelRecovering('offices') ||
|
||||
(!ticketsAsync.hasValue && ticketsAsync.isLoading) ||
|
||||
(!officesAsync.hasValue && officesAsync.isLoading) ||
|
||||
(!profilesAsync.hasValue && profilesAsync.isLoading) ||
|
||||
(!notificationsAsync.hasValue && notificationsAsync.isLoading);
|
||||
|
||||
if (_isInitial) {
|
||||
WidgetsBinding.instance.addPostFrameCallback((_) {
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import '../providers/realtime_controller.dart';
|
|||
|
||||
/// Subtle, non-blocking connection status indicator.
|
||||
/// Shows in the bottom-right corner when streams are recovering/stale.
|
||||
/// Unlike the old blocking overlay, this does NOT prevent user interaction.
|
||||
/// Displays which channels are reconnecting so the user knows what to expect.
|
||||
class ReconnectIndicator extends ConsumerWidget {
|
||||
const ReconnectIndicator({super.key});
|
||||
|
||||
|
|
@ -18,6 +18,12 @@ class ReconnectIndicator extends ConsumerWidget {
|
|||
return const SizedBox.shrink();
|
||||
}
|
||||
|
||||
// Build a human-readable label for recovering channels.
|
||||
final channels = ctrl.recoveringChannels;
|
||||
final label = channels.length <= 2
|
||||
? channels.map(_humanize).join(', ')
|
||||
: '${channels.length} channels';
|
||||
|
||||
return Positioned(
|
||||
bottom: 16,
|
||||
right: 16,
|
||||
|
|
@ -52,13 +58,23 @@ class ReconnectIndicator extends ConsumerWidget {
|
|||
),
|
||||
),
|
||||
const SizedBox(width: 8),
|
||||
Text(
|
||||
'Reconnecting...',
|
||||
style: Theme.of(context).textTheme.labelSmall,
|
||||
Flexible(
|
||||
child: Text(
|
||||
'Reconnecting $label…',
|
||||
style: Theme.of(context).textTheme.labelSmall,
|
||||
overflow: TextOverflow.ellipsis,
|
||||
),
|
||||
),
|
||||
],
|
||||
),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
/// Converts a channel name like 'task_assignments' to 'task assignments'.
|
||||
static String _humanize(String channel) {
|
||||
// Strip instance suffixes like 'ticket_messages:abc123'
|
||||
final base = channel.contains(':') ? channel.split(':').first : channel;
|
||||
return base.replaceAll('_', ' ');
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user