From c9479f01f05a2d8987b975f85dcb2665a528c13f Mon Sep 17 00:00:00 2001 From: Marc Rejohn Castillano Date: Sun, 1 Mar 2026 17:24:04 +0800 Subject: [PATCH] Implemented per stream subscription recovery with polling fallback --- lib/providers/notifications_provider.dart | 27 +- lib/providers/profile_provider.dart | 44 +++- lib/providers/realtime_controller.dart | 121 +++------ lib/providers/services_provider.dart | 18 +- lib/providers/stream_recovery.dart | 243 ++++++++++++++++++ lib/providers/tasks_provider.dart | 91 +++++-- lib/providers/teams_provider.dart | 40 ++- lib/providers/tickets_provider.dart | 132 +++++++--- lib/providers/typing_provider.dart | 222 ++++++++-------- lib/providers/user_offices_provider.dart | 24 +- lib/providers/workforce_provider.dart | 83 ++++-- lib/routing/app_router.dart | 31 ++- lib/screens/dashboard/dashboard_screen.dart | 45 +--- lib/screens/tasks/task_detail_screen.dart | 55 ++-- lib/screens/tasks/tasks_list_screen.dart | 4 +- lib/screens/tickets/ticket_detail_screen.dart | 40 ++- lib/screens/tickets/tickets_list_screen.dart | 4 +- lib/services/notification_bridge.dart | 9 +- lib/widgets/reconnect_overlay.dart | 155 ++++------- 19 files changed, 894 insertions(+), 494 deletions(-) create mode 100644 lib/providers/stream_recovery.dart diff --git a/lib/providers/notifications_provider.dart b/lib/providers/notifications_provider.dart index 8eb22dd3..27d061bf 100644 --- a/lib/providers/notifications_provider.dart +++ b/lib/providers/notifications_provider.dart @@ -6,6 +6,7 @@ import '../utils/device_id.dart'; import '../models/notification_item.dart'; import 'profile_provider.dart'; import 'supabase_provider.dart'; +import 'stream_recovery.dart'; import '../utils/app_time.dart'; final notificationsProvider = StreamProvider>((ref) { @@ -14,12 +15,26 @@ final notificationsProvider = StreamProvider>((ref) { return const Stream.empty(); } final client = ref.watch(supabaseClientProvider); - return client - .from('notifications') - .stream(primaryKey: ['id']) - .eq('user_id', userId) - .order('created_at', ascending: false) - .map((rows) => rows.map(NotificationItem.fromMap).toList()); + + final wrapper = StreamRecoveryWrapper( + stream: client + .from('notifications') + .stream(primaryKey: ['id']) + .eq('user_id', userId) + .order('created_at', ascending: false), + onPollData: () async { + final data = await client + .from('notifications') + .select() + .eq('user_id', userId) + .order('created_at', ascending: false); + return data.map(NotificationItem.fromMap).toList(); + }, + fromMap: NotificationItem.fromMap, + ); + + ref.onDispose(wrapper.dispose); + return wrapper.stream.map((result) => result.data); }); final unreadNotificationsCountProvider = Provider((ref) { diff --git a/lib/providers/profile_provider.dart b/lib/providers/profile_provider.dart index 3847012e..63ff2909 100644 --- a/lib/providers/profile_provider.dart +++ b/lib/providers/profile_provider.dart @@ -6,6 +6,7 @@ import 'package:supabase_flutter/supabase_flutter.dart'; import '../models/profile.dart'; import 'auth_provider.dart'; import 'supabase_provider.dart'; +import 'stream_recovery.dart'; final currentUserIdProvider = Provider((ref) { final authState = ref.watch(authStateChangesProvider); @@ -23,20 +24,43 @@ final currentProfileProvider = StreamProvider((ref) { return const Stream.empty(); } final client = ref.watch(supabaseClientProvider); - return client - .from('profiles') - .stream(primaryKey: ['id']) - .eq('id', userId) - .map((rows) => rows.isEmpty ? null : Profile.fromMap(rows.first)); + + final wrapper = StreamRecoveryWrapper( + stream: client.from('profiles').stream(primaryKey: ['id']).eq('id', userId), + onPollData: () async { + final data = await client + .from('profiles') + .select() + .eq('id', userId) + .maybeSingle(); + return data == null ? [] : [Profile.fromMap(data)]; + }, + fromMap: Profile.fromMap, + ); + + ref.onDispose(wrapper.dispose); + return wrapper.stream.map((result) { + return result.data.isEmpty ? null : result.data.first; + }); }); final profilesProvider = StreamProvider>((ref) { final client = ref.watch(supabaseClientProvider); - return client - .from('profiles') - .stream(primaryKey: ['id']) - .order('full_name') - .map((rows) => rows.map(Profile.fromMap).toList()); + + final wrapper = StreamRecoveryWrapper( + stream: client + .from('profiles') + .stream(primaryKey: ['id']) + .order('full_name'), + onPollData: () async { + final data = await client.from('profiles').select().order('full_name'); + return data.map(Profile.fromMap).toList(); + }, + fromMap: Profile.fromMap, + ); + + ref.onDispose(wrapper.dispose); + return wrapper.stream.map((result) => result.data); }); /// Controller for the current user's profile (update full name / password). diff --git a/lib/providers/realtime_controller.dart b/lib/providers/realtime_controller.dart index 0938041c..d09ab280 100644 --- a/lib/providers/realtime_controller.dart +++ b/lib/providers/realtime_controller.dart @@ -1,7 +1,6 @@ import 'dart:async'; import 'package:flutter/foundation.dart'; -import 'package:flutter/material.dart'; import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:supabase_flutter/supabase_flutter.dart'; @@ -16,31 +15,33 @@ final realtimeControllerProvider = ChangeNotifierProvider(( return controller; }); -/// A lightweight controller that attempts to recover the Supabase Realtime -/// connection when the app returns to the foreground or when auth tokens -/// are refreshed. +/// Simplified realtime controller for app-lifecycle awareness. +/// Individual streams now handle their own recovery via [StreamRecoveryWrapper]. +/// This controller only coordinates: +/// - App lifecycle transitions (background/foreground) +/// - Auth token refreshes +/// - Global connection state notification (for UI indicators) class RealtimeController extends ChangeNotifier { final SupabaseClient _client; - - bool isConnecting = false; - bool isFailed = false; - String? lastError; - int attempts = 0; - final int maxAttempts; bool _disposed = false; - RealtimeController(this._client, {this.maxAttempts = 4}) { + /// Global flag: true if any stream is recovering; used for subtle UI indicator. + bool isAnyStreamRecovering = false; + + RealtimeController(this._client) { _init(); } void _init() { try { - // Listen for auth changes and try to recover the realtime connection + // Listen for auth changes; ensure tokens are fresh for realtime. + // Individual streams will handle their own reconnection. _client.auth.onAuthStateChange.listen((data) { final event = data.event; - if (event == AuthChangeEvent.tokenRefreshed || - event == AuthChangeEvent.signedIn) { - recoverConnection(); + // Only refresh token on existing session refreshes, not immediately after sign-in + // (sign-in already provides a fresh token) + if (event == AuthChangeEvent.tokenRefreshed) { + _ensureTokenFresh(); } }); } catch (e) { @@ -48,77 +49,37 @@ class RealtimeController extends ChangeNotifier { } } - /// Try to reconnect the realtime client using a small exponential backoff. - Future recoverConnection() async { + /// Ensure auth token is fresh for upcoming realtime operations. + /// This is called after token refresh events, not immediately after sign-in. + Future _ensureTokenFresh() async { if (_disposed) return; - if (isConnecting) return; - - isFailed = false; - lastError = null; - isConnecting = true; - notifyListeners(); - try { - int delaySeconds = 1; - while (attempts < maxAttempts && !_disposed) { - attempts++; - try { - // Best-effort disconnect then connect so the realtime client picks - // up any refreshed tokens. - try { - // Try to refresh session/token if the SDK supports it. Use dynamic - // to avoid depending on a specific SDK version symbol. - try { - await (_client.auth as dynamic).refreshSession?.call(); - } catch (_) {} - - // Best-effort disconnect then connect so the realtime client picks - // up any refreshed tokens. The realtime connect/disconnect are - // marked internal by the SDK; suppress the lint here since this - // is a deliberate best-effort recovery. - // ignore: invalid_use_of_internal_member - _client.realtime.disconnect(); - } catch (_) {} - await Future.delayed(const Duration(milliseconds: 300)); - try { - // ignore: invalid_use_of_internal_member - _client.realtime.connect(); - } catch (_) {} - - // Give the socket a moment to stabilise. - await Future.delayed(const Duration(seconds: 1)); - - // Success (best-effort). Reset attempt counter and clear failure. - attempts = 0; - isFailed = false; - lastError = null; - break; - } catch (e) { - lastError = e.toString(); - if (attempts >= maxAttempts) { - isFailed = true; - break; - } - await Future.delayed(Duration(seconds: delaySeconds)); - delaySeconds = delaySeconds * 2; - } - } - } finally { - if (!_disposed) { - isConnecting = false; - notifyListeners(); + // Defensive: only refresh if the method exists (SDK version compatibility) + final authDynamic = _client.auth as dynamic; + if (authDynamic.refreshSession != null) { + await authDynamic.refreshSession?.call(); } + } catch (e) { + debugPrint('RealtimeController: token refresh failed: $e'); } } - /// Retry a failed recovery attempt. - Future retry() async { - if (_disposed) return; - attempts = 0; - isFailed = false; - lastError = null; - notifyListeners(); - await recoverConnection(); + /// Notify that a stream is starting recovery. Used for global UI indicator. + void markStreamRecovering() { + if (!isAnyStreamRecovering) { + isAnyStreamRecovering = true; + notifyListeners(); + } + } + + /// Notify that stream recovery completed. If all streams recovered, update state. + void markStreamRecovered() { + // In practice, individual streams notify their own status via statusChanges. + // This is kept for potential future global coordination. + if (isAnyStreamRecovering) { + isAnyStreamRecovering = false; + notifyListeners(); + } } @override diff --git a/lib/providers/services_provider.dart b/lib/providers/services_provider.dart index 9c932fe6..d2430fca 100644 --- a/lib/providers/services_provider.dart +++ b/lib/providers/services_provider.dart @@ -2,14 +2,22 @@ import 'package:flutter_riverpod/flutter_riverpod.dart'; import '../models/service.dart'; import 'supabase_provider.dart'; +import 'stream_recovery.dart'; final servicesProvider = StreamProvider>((ref) { final client = ref.watch(supabaseClientProvider); - return client - .from('services') - .stream(primaryKey: ['id']) - .order('name') - .map((rows) => rows.map((r) => Service.fromMap(r)).toList()); + + final wrapper = StreamRecoveryWrapper( + stream: client.from('services').stream(primaryKey: ['id']).order('name'), + onPollData: () async { + final data = await client.from('services').select().order('name'); + return data.map(Service.fromMap).toList(); + }, + fromMap: Service.fromMap, + ); + + ref.onDispose(wrapper.dispose); + return wrapper.stream.map((result) => result.data); }); final servicesOnceProvider = FutureProvider>((ref) async { diff --git a/lib/providers/stream_recovery.dart b/lib/providers/stream_recovery.dart new file mode 100644 index 00000000..e22a97f4 --- /dev/null +++ b/lib/providers/stream_recovery.dart @@ -0,0 +1,243 @@ +import 'dart:async'; +import 'package:flutter/foundation.dart'; + +/// Connection status for a single stream subscription. +enum StreamConnectionStatus { + /// Connected and receiving live updates. + connected, + + /// Attempting to recover the connection; data may be stale. + recovering, + + /// Connection failed; attempting to fallback to polling. + polling, + + /// Connection and polling both failed; data is stale. + stale, + + /// Fatal error; stream will not recover without manual intervention. + failed, +} + +/// Represents the result of a polling attempt. +class PollResult { + final List data; + final bool success; + final String? error; + + PollResult({required this.data, required this.success, this.error}); +} + +/// Configuration for stream recovery behavior. +class StreamRecoveryConfig { + /// Maximum number of automatic recovery attempts before giving up. + final int maxRecoveryAttempts; + + /// Initial delay (in milliseconds) before first recovery attempt. + final int initialDelayMs; + + /// Maximum delay (in milliseconds) for exponential backoff. + final int maxDelayMs; + + /// Multiplier for exponential backoff (e.g., 2.0 = double each attempt). + final double backoffMultiplier; + + /// Enable polling fallback when realtime fails. + final bool enablePollingFallback; + + /// Polling interval (in milliseconds) when realtime is unavailable. + final int pollingIntervalMs; + + const StreamRecoveryConfig({ + this.maxRecoveryAttempts = 4, + this.initialDelayMs = 1000, + this.maxDelayMs = 32000, // 32 seconds max + this.backoffMultiplier = 2.0, + this.enablePollingFallback = true, + this.pollingIntervalMs = 5000, // Poll every 5 seconds + }); +} + +/// Wraps a Supabase realtime stream with automatic recovery, polling fallback, +/// and connection status tracking. Provides graceful degradation when the +/// realtime connection fails. +/// +/// Usage: +/// ```dart +/// final wrappedStream = StreamRecoveryWrapper( +/// stream: client.from('tasks').stream(primaryKey: ['id']), +/// onPollData: () => fetchTasksViaRest(), +/// ); +/// // wrappedStream.stream emits data with connection status in metadata +/// ``` +class StreamRecoveryWrapper { + final Stream>> _realtimeStream; + final Future> Function() _onPollData; + final T Function(Map) _fromMap; + final StreamRecoveryConfig _config; + + StreamConnectionStatus _connectionStatus = StreamConnectionStatus.connected; + int _recoveryAttempts = 0; + Timer? _pollingTimer; + StreamController? _statusController; + Stream>? _cachedStream; + + StreamRecoveryWrapper({ + required Stream>> stream, + required Future> Function() onPollData, + required T Function(Map) fromMap, + StreamRecoveryConfig config = const StreamRecoveryConfig(), + }) : _realtimeStream = stream, + _onPollData = onPollData, + _fromMap = fromMap, + _config = config; + + /// The wrapped stream that emits recovery results with metadata. + Stream> get stream => + _cachedStream ??= _buildStream(); + + /// Current connection status of this stream. + StreamConnectionStatus get connectionStatus => _connectionStatus; + + /// Notifies listeners when connection status changes. + Stream get statusChanges { + _statusController ??= StreamController.broadcast(); + return _statusController!.stream; + } + + /// Builds the wrapped stream with recovery and polling logic. + Stream> _buildStream() async* { + int delayMs = _config.initialDelayMs; + + while (true) { + try { + _setStatus(StreamConnectionStatus.connected); + + // Try realtime stream first + yield* _realtimeStream + .map( + (rows) => StreamRecoveryResult( + data: rows.map(_fromMap).toList(), + connectionStatus: StreamConnectionStatus.connected, + isStale: false, + ), + ) + .handleError((error) { + debugPrint( + 'StreamRecoveryWrapper: realtime stream error: $error', + ); + _setStatus(StreamConnectionStatus.recovering); + throw error; // Propagate to outer handler + }); + + // If we get here, stream completed normally (shouldn't happen) + break; + } catch (e) { + debugPrint( + 'StreamRecoveryWrapper: realtime failed, error=$e, ' + 'attempts=$_recoveryAttempts/${_config.maxRecoveryAttempts}', + ); + + // Exceeded max recovery attempts? + if (_recoveryAttempts >= _config.maxRecoveryAttempts) { + if (_config.enablePollingFallback) { + _setStatus(StreamConnectionStatus.polling); + yield* _pollingFallback(); + break; + } else { + _setStatus(StreamConnectionStatus.failed); + yield StreamRecoveryResult( + data: const [], + connectionStatus: StreamConnectionStatus.failed, + isStale: true, + error: e.toString(), + ); + break; + } + } + + // Exponential backoff before retry + _recoveryAttempts++; + await Future.delayed(Duration(milliseconds: delayMs)); + delayMs = (delayMs * _config.backoffMultiplier).toInt(); + if (delayMs > _config.maxDelayMs) { + delayMs = _config.maxDelayMs; + } + } + } + } + + /// Fallback to periodic REST polling when realtime is unavailable. + Stream> _pollingFallback() async* { + while (_connectionStatus == StreamConnectionStatus.polling) { + try { + final data = await _onPollData(); + yield StreamRecoveryResult( + data: data, + connectionStatus: StreamConnectionStatus.polling, + isStale: true, // Mark as stale since it's no longer live + ); + await Future.delayed(Duration(milliseconds: _config.pollingIntervalMs)); + } catch (e) { + debugPrint('StreamRecoveryWrapper: polling error: $e'); + _setStatus(StreamConnectionStatus.stale); + yield StreamRecoveryResult( + data: const [], + connectionStatus: StreamConnectionStatus.stale, + isStale: true, + error: e.toString(), + ); + break; + } + } + } + + /// Update connection status and notify listeners. + void _setStatus(StreamConnectionStatus status) { + if (_connectionStatus != status) { + _connectionStatus = status; + _statusController?.add(status); + } + } + + /// Manually trigger a recovery attempt. + void retry() { + _recoveryAttempts = 0; + _setStatus(StreamConnectionStatus.recovering); + } + + /// Clean up resources. + void dispose() { + _pollingTimer?.cancel(); + _statusController?.close(); + } +} + +/// Result of a stream emission, including metadata about connection status. +class StreamRecoveryResult { + /// The data emitted by the stream. + final List data; + + /// Current connection status. + final StreamConnectionStatus connectionStatus; + + /// Whether the data is stale (not live from realtime). + final bool isStale; + + /// Error message, if any. + final String? error; + + StreamRecoveryResult({ + required this.data, + required this.connectionStatus, + required this.isStale, + this.error, + }); + + /// True if data is live and reliable. + bool get isLive => connectionStatus == StreamConnectionStatus.connected; + + /// True if we should show a "data may be stale" indicator. + bool get shouldIndicateStale => + isStale || connectionStatus == StreamConnectionStatus.polling; +} diff --git a/lib/providers/tasks_provider.dart b/lib/providers/tasks_provider.dart index 548375b4..880b45fb 100644 --- a/lib/providers/tasks_provider.dart +++ b/lib/providers/tasks_provider.dart @@ -14,6 +14,7 @@ import 'profile_provider.dart'; import 'supabase_provider.dart'; import 'tickets_provider.dart'; import 'user_offices_provider.dart'; +import 'stream_recovery.dart'; import '../utils/app_time.dart'; // Helper to insert activity log rows while sanitizing nulls and @@ -137,12 +138,48 @@ final tasksProvider = StreamProvider>((ref) { return Stream.value(const []); } - // NOTE: Supabase stream builder does not support `.range(...)` — - // apply pagination and remaining filters client-side after mapping. - final baseStream = client.from('tasks').stream(primaryKey: ['id']); + // Wrap realtime stream with recovery logic + final wrapper = StreamRecoveryWrapper( + stream: client.from('tasks').stream(primaryKey: ['id']), + onPollData: () async { + final data = await client.from('tasks').select(); + return data.cast>().map(Task.fromMap).toList(); + }, + fromMap: Task.fromMap, + ); - return baseStream.asyncMap((rows) async { - final rowsList = (rows as List).cast>(); + ref.onDispose(wrapper.dispose); + + // Process tasks with filtering/pagination after recovery + return wrapper.stream.asyncMap((result) async { + final rowsList = result.data + .map( + (task) => { + 'id': task.id, + 'task_number': task.taskNumber, + 'office_id': task.officeId, + 'ticket_id': task.ticketId, + 'title': task.title, + 'description': task.description, + 'status': task.status, + 'priority': task.priority, + 'creator_id': task.creatorId, + 'created_at': task.createdAt.toIso8601String(), + 'started_at': task.startedAt?.toIso8601String(), + 'completed_at': task.completedAt?.toIso8601String(), + 'requested_by': task.requestedBy, + 'noted_by': task.notedBy, + 'received_by': task.receivedBy, + 'queue_order': task.queueOrder, + 'request_type': task.requestType, + 'request_type_other': task.requestTypeOther, + 'request_category': task.requestCategory, + 'action_taken': task.actionTaken, + 'cancellation_reason': task.cancellationReason, + 'cancelled_at': task.cancelledAt?.toIso8601String(), + }, + ) + .toList(); final allowedTicketIds = ticketsAsync.valueOrNull?.map((ticket) => ticket.id).toList() ?? @@ -325,22 +362,46 @@ final tasksQueryProvider = StateProvider((ref) => const TaskQuery()); final taskAssignmentsProvider = StreamProvider>((ref) { final client = ref.watch(supabaseClientProvider); - return client - .from('task_assignments') - .stream(primaryKey: ['task_id', 'user_id']) - .map((rows) => rows.map(TaskAssignment.fromMap).toList()); + + final wrapper = StreamRecoveryWrapper( + stream: client + .from('task_assignments') + .stream(primaryKey: ['task_id', 'user_id']), + onPollData: () async { + final data = await client.from('task_assignments').select(); + return data.map(TaskAssignment.fromMap).toList(); + }, + fromMap: TaskAssignment.fromMap, + ); + + ref.onDispose(wrapper.dispose); + return wrapper.stream.map((result) => result.data); }); /// Stream of activity logs for a single task. final taskActivityLogsProvider = StreamProvider.family, String>((ref, taskId) { final client = ref.watch(supabaseClientProvider); - return client - .from('task_activity_logs') - .stream(primaryKey: ['id']) - .eq('task_id', taskId) - .order('created_at', ascending: false) - .map((rows) => rows.map((r) => TaskActivityLog.fromMap(r)).toList()); + + final wrapper = StreamRecoveryWrapper( + stream: client + .from('task_activity_logs') + .stream(primaryKey: ['id']) + .eq('task_id', taskId) + .order('created_at', ascending: false), + onPollData: () async { + final data = await client + .from('task_activity_logs') + .select() + .eq('task_id', taskId) + .order('created_at', ascending: false); + return data.map((r) => TaskActivityLog.fromMap(r)).toList(); + }, + fromMap: TaskActivityLog.fromMap, + ); + + ref.onDispose(wrapper.dispose); + return wrapper.stream.map((result) => result.data); }); final taskAssignmentsControllerProvider = Provider(( diff --git a/lib/providers/teams_provider.dart b/lib/providers/teams_provider.dart index 4b85c348..76bf0033 100644 --- a/lib/providers/teams_provider.dart +++ b/lib/providers/teams_provider.dart @@ -1,24 +1,42 @@ import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'supabase_provider.dart'; +import 'stream_recovery.dart'; import '../models/team.dart'; import '../models/team_member.dart'; -/// Real-time stream of teams (keeps UI in sync with DB changes). +/// Real-time stream of teams with automatic recovery and graceful degradation. final teamsProvider = StreamProvider>((ref) { final client = ref.watch(supabaseClientProvider); - return client - .from('teams') - .stream(primaryKey: ['id']) - .order('name') - .map((rows) => rows.map((r) => Team.fromMap(r)).toList()); + + final wrapper = StreamRecoveryWrapper( + stream: client.from('teams').stream(primaryKey: ['id']).order('name'), + onPollData: () async { + final data = await client.from('teams').select().order('name'); + return data.map(Team.fromMap).toList(); + }, + fromMap: Team.fromMap, + ); + + ref.onDispose(wrapper.dispose); + return wrapper.stream.map((result) => result.data); }); -/// Real-time stream of team membership rows. +/// Real-time stream of team membership rows with automatic recovery. final teamMembersProvider = StreamProvider>((ref) { final client = ref.watch(supabaseClientProvider); - return client - .from('team_members') - .stream(primaryKey: ['team_id', 'user_id']) - .map((rows) => rows.map((r) => TeamMember.fromMap(r)).toList()); + + final wrapper = StreamRecoveryWrapper( + stream: client + .from('team_members') + .stream(primaryKey: ['team_id', 'user_id']), + onPollData: () async { + final data = await client.from('team_members').select(); + return data.map(TeamMember.fromMap).toList(); + }, + fromMap: TeamMember.fromMap, + ); + + ref.onDispose(wrapper.dispose); + return wrapper.stream.map((result) => result.data); }); diff --git a/lib/providers/tickets_provider.dart b/lib/providers/tickets_provider.dart index bd638bb6..3af91b9a 100644 --- a/lib/providers/tickets_provider.dart +++ b/lib/providers/tickets_provider.dart @@ -12,14 +12,22 @@ import 'profile_provider.dart'; import 'supabase_provider.dart'; import 'user_offices_provider.dart'; import 'tasks_provider.dart'; +import 'stream_recovery.dart'; final officesProvider = StreamProvider>((ref) { final client = ref.watch(supabaseClientProvider); - return client - .from('offices') - .stream(primaryKey: ['id']) - .order('name') - .map((rows) => rows.map(Office.fromMap).toList()); + + final wrapper = StreamRecoveryWrapper( + stream: client.from('offices').stream(primaryKey: ['id']).order('name'), + onPollData: () async { + final data = await client.from('offices').select().order('name'); + return data.map(Office.fromMap).toList(); + }, + fromMap: Office.fromMap, + ); + + ref.onDispose(wrapper.dispose); + return wrapper.stream.map((result) => result.data); }); final officesOnceProvider = FutureProvider>((ref) async { @@ -128,16 +136,38 @@ final ticketsProvider = StreamProvider>((ref) { profile.role == 'dispatcher' || profile.role == 'it_staff'; - // Use stream for realtime updates. Offload expensive client-side - // filtering/sorting/pagination to a background isolate via `compute` - // so UI navigation and builds remain smooth. - final baseStream = client.from('tickets').stream(primaryKey: ['id']); + // Wrap realtime stream with recovery logic + final wrapper = StreamRecoveryWrapper( + stream: client.from('tickets').stream(primaryKey: ['id']), + onPollData: () async { + // Polling fallback: fetch all tickets once + final data = await client.from('tickets').select(); + return data.cast>().map(Ticket.fromMap).toList(); + }, + fromMap: Ticket.fromMap, + ); - return baseStream.asyncMap((rows) async { - // rows is List of maps coming from Supabase - final rowsList = (rows as List).cast>(); + ref.onDispose(wrapper.dispose); + + // Process tickets with filtering/pagination after recovery + return wrapper.stream.asyncMap((result) async { + final rowsList = result.data + .map( + (ticket) => { + 'id': ticket.id, + 'subject': ticket.subject, + 'description': ticket.description, + 'status': ticket.status, + 'office_id': ticket.officeId, + 'creator_id': ticket.creatorId, + 'created_at': ticket.createdAt.toIso8601String(), + 'responded_at': ticket.respondedAt?.toIso8601String(), + 'promoted_at': ticket.promotedAt?.toIso8601String(), + 'closed_at': ticket.closedAt?.toIso8601String(), + }, + ) + .toList(); - // Prepare lightweight serializable args for background processing final allowedOfficeIds = assignmentsAsync.valueOrNull ?.where((assignment) => assignment.userId == profile.id) @@ -160,7 +190,6 @@ final ticketsProvider = StreamProvider>((ref) { final processed = await compute(_processTicketsInIsolate, payload); - // `processed` is List> — convert to Ticket objects final tickets = (processed as List) .cast>() .map(Ticket.fromMap) @@ -246,32 +275,73 @@ final ticketsQueryProvider = StateProvider( final ticketMessagesProvider = StreamProvider.family, String>((ref, ticketId) { final client = ref.watch(supabaseClientProvider); - return client - .from('ticket_messages') - .stream(primaryKey: ['id']) - .eq('ticket_id', ticketId) - .order('created_at', ascending: false) - .map((rows) => rows.map(TicketMessage.fromMap).toList()); + + final wrapper = StreamRecoveryWrapper( + stream: client + .from('ticket_messages') + .stream(primaryKey: ['id']) + .eq('ticket_id', ticketId) + .order('created_at', ascending: false), + onPollData: () async { + final data = await client + .from('ticket_messages') + .select() + .eq('ticket_id', ticketId) + .order('created_at', ascending: false); + return data.map(TicketMessage.fromMap).toList(); + }, + fromMap: TicketMessage.fromMap, + ); + + ref.onDispose(wrapper.dispose); + return wrapper.stream.map((result) => result.data); }); final ticketMessagesAllProvider = StreamProvider>((ref) { final client = ref.watch(supabaseClientProvider); - return client - .from('ticket_messages') - .stream(primaryKey: ['id']) - .order('created_at', ascending: false) - .map((rows) => rows.map(TicketMessage.fromMap).toList()); + + final wrapper = StreamRecoveryWrapper( + stream: client + .from('ticket_messages') + .stream(primaryKey: ['id']) + .order('created_at', ascending: false), + onPollData: () async { + final data = await client + .from('ticket_messages') + .select() + .order('created_at', ascending: false); + return data.map(TicketMessage.fromMap).toList(); + }, + fromMap: TicketMessage.fromMap, + ); + + ref.onDispose(wrapper.dispose); + return wrapper.stream.map((result) => result.data); }); final taskMessagesProvider = StreamProvider.family, String>( (ref, taskId) { final client = ref.watch(supabaseClientProvider); - return client - .from('ticket_messages') - .stream(primaryKey: ['id']) - .eq('task_id', taskId) - .order('created_at', ascending: false) - .map((rows) => rows.map(TicketMessage.fromMap).toList()); + + final wrapper = StreamRecoveryWrapper( + stream: client + .from('ticket_messages') + .stream(primaryKey: ['id']) + .eq('task_id', taskId) + .order('created_at', ascending: false), + onPollData: () async { + final data = await client + .from('ticket_messages') + .select() + .eq('task_id', taskId) + .order('created_at', ascending: false); + return data.map(TicketMessage.fromMap).toList(); + }, + fromMap: TicketMessage.fromMap, + ); + + ref.onDispose(wrapper.dispose); + return wrapper.stream.map((result) => result.data); }, ); diff --git a/lib/providers/typing_provider.dart b/lib/providers/typing_provider.dart index 2a1b331e..31c73262 100644 --- a/lib/providers/typing_provider.dart +++ b/lib/providers/typing_provider.dart @@ -30,11 +30,12 @@ class TypingIndicatorState { } } -final typingIndicatorProvider = StateNotifierProvider.autoDispose - .family(( - ref, - ticketId, - ) { +final typingIndicatorProvider = + StateNotifierProvider.family< + TypingIndicatorController, + TypingIndicatorState, + String + >((ref, ticketId) { final client = ref.watch(supabaseClientProvider); final controller = TypingIndicatorController(client, ticketId); return controller; @@ -65,145 +66,158 @@ class TypingIndicatorController extends StateNotifier { channel.onBroadcast( event: 'typing', callback: (payload) { - // Prevent any work if we're already disposing. Log stack for diagnostics. - if (_disposed || !mounted) { - if (kDebugMode) { - debugPrint( - 'TypingIndicatorController: onBroadcast skipped (disposed|unmounted)', - ); - debugPrint(StackTrace.current.toString()); - } - return; - } + try { + if (_disposed || !mounted) return; - final Map data = _extractPayload(payload); - final userId = data['user_id'] as String?; - final rawType = data['type']?.toString(); - final currentUserId = _client.auth.currentUser?.id; - if (_disposed || !mounted) { - if (kDebugMode) { - debugPrint( - 'TypingIndicatorController: payload received but controller disposed/unmounted', - ); - debugPrint(StackTrace.current.toString()); + final Map data = _extractPayload(payload); + final userId = data['user_id'] as String?; + final rawType = data['type']?.toString(); + final currentUserId = _client.auth.currentUser?.id; + if (_disposed || !mounted) return; + state = state.copyWith(lastPayload: data); + if (userId == null || userId == currentUserId) { + return; } - return; + if (rawType == 'stop') { + _clearRemoteTyping(userId); + return; + } + _markRemoteTyping(userId); + } catch (e, st) { + debugPrint( + 'TypingIndicatorController: broadcast callback error: $e\n$st', + ); } - state = state.copyWith(lastPayload: data); - if (userId == null || userId == currentUserId) { - return; - } - if (rawType == 'stop') { - _clearRemoteTyping(userId); - return; - } - _markRemoteTyping(userId); }, ); channel.subscribe((status, error) { - if (_disposed || !mounted) { - if (kDebugMode) { - debugPrint( - 'TypingIndicatorController: subscribe callback skipped (disposed|unmounted)', - ); - debugPrint(StackTrace.current.toString()); + try { + if (_disposed || !mounted) return; + state = state.copyWith(channelStatus: status.name); + if (error != null) { + debugPrint('TypingIndicatorController: subscribe error: $error'); } - return; + } catch (e, st) { + debugPrint( + 'TypingIndicatorController: subscribe callback error: $e\n$st', + ); } - state = state.copyWith(channelStatus: status.name); }); _channel = channel; } Map _extractPayload(dynamic payload) { - if (payload is Map) { - final inner = payload['payload']; - if (inner is Map) { - return inner; + // The realtime client can wrap the actual broadcast payload inside + // several nested fields (e.g. {payload: {payload: {...}}}). Walk the + // object until we find a map containing `user_id` or `type` keys which + // represent the actual typing payload. + try { + dynamic current = payload; + for (var i = 0; i < 6; i++) { + if (current is Map) { + // Only return when we actually find the `user_id`, otherwise try + // to unwrap nested envelopes. Some wrappers include `type: broadcast` + // at the top-level which should not be treated as the message. + if (current.containsKey('user_id')) { + return Map.from(current); + } + if (current.containsKey('payload') && + current['payload'] is Map) { + current = current['payload']; + continue; + } + // Some realtime envelope stores the payload at `data`. + if (current.containsKey('data') && + current['data'] is Map) { + current = current['data']; + continue; + } + break; + } + // Try common field on wrapper objects (e.g. RealtimeMessage.payload) + try { + final dyn = (current as dynamic).payload; + if (dyn is Map) { + current = dyn; + continue; + } + } catch (_) { + break; + } } - return payload; - } - final dynamic inner = payload.payload; - if (inner is Map) { - return inner; - } + } catch (_) {} + // As a last-resort, do a shallow recursive search for a map containing + // `user_id` in case the realtime client used a Map + // shape that wasn't caught above. + try { + Map? found; + void search(dynamic node, int depth) { + if (found != null || depth > 4) return; + if (node is Map) { + try { + final m = Map.from(node); + if (m.containsKey('user_id')) { + found = m; + return; + } + for (final v in m.values) { + search(v, depth + 1); + if (found != null) return; + } + } catch (_) { + // ignore conversion errors + } + } else if (node is Iterable) { + for (final v in node) { + search(v, depth + 1); + if (found != null) return; + } + } + } + + search(payload, 0); + if (found != null) return found!; + } catch (_) {} return {}; } void userTyping() { - if (_disposed || !mounted) { - if (kDebugMode) { - debugPrint( - 'TypingIndicatorController.userTyping() ignored after dispose', - ); - } - return; - } + if (_disposed || !mounted) return; if (_client.auth.currentUser?.id == null) return; _sendTypingEvent('start'); _typingTimer?.cancel(); - _typingTimer = Timer(const Duration(milliseconds: 150), () { - if (_disposed || !mounted) { - if (kDebugMode) { - debugPrint( - 'TypingIndicatorController._typingTimer callback ignored after dispose', - ); - } - return; - } + // Debounce sending the stop event slightly so quick pauses don't spam + // the network. 150ms was short and caused frequent start/stop bursts; + // increase to 600ms to stabilize UX. + _typingTimer = Timer(const Duration(milliseconds: 600), () { + if (_disposed || !mounted) return; _sendTypingEvent('stop'); }); } void stopTyping() { - if (_disposed || !mounted) { - if (kDebugMode) { - debugPrint( - 'TypingIndicatorController.stopTyping() ignored after dispose', - ); - } - return; - } + if (_disposed || !mounted) return; _typingTimer?.cancel(); _sendTypingEvent('stop'); } void _markRemoteTyping(String userId) { - if (_disposed || !mounted) { - if (kDebugMode) { - debugPrint( - 'TypingIndicatorController._markRemoteTyping ignored after dispose for user: $userId', - ); - debugPrint(StackTrace.current.toString()); - } - return; - } + if (_disposed || !mounted) return; final updated = {...state.userIds, userId}; if (_disposed || !mounted) return; state = state.copyWith(userIds: updated); _remoteTimeouts[userId]?.cancel(); - _remoteTimeouts[userId] = Timer(const Duration(milliseconds: 400), () { - if (_disposed || !mounted) { - if (kDebugMode) { - debugPrint( - 'TypingIndicatorController.remote timeout callback ignored after dispose for user: $userId', - ); - } - return; - } + // Extend timeout to 2500ms to accommodate brief realtime interruptions + // (auth refresh, channel reconnect, message processing, etc.) without + // clearing the presence. This gives a smoother typing experience. + _remoteTimeouts[userId] = Timer(const Duration(milliseconds: 3500), () { + if (_disposed || !mounted) return; _clearRemoteTyping(userId); }); } void _clearRemoteTyping(String userId) { - if (_disposed || !mounted) { - if (kDebugMode) { - debugPrint( - 'TypingIndicatorController._clearRemoteTyping ignored after dispose for user: $userId', - ); - } - return; - } + if (_disposed || !mounted) return; final updated = {...state.userIds}..remove(userId); if (_disposed || !mounted) return; state = state.copyWith(userIds: updated); diff --git a/lib/providers/user_offices_provider.dart b/lib/providers/user_offices_provider.dart index 6742bb66..68ff4596 100644 --- a/lib/providers/user_offices_provider.dart +++ b/lib/providers/user_offices_provider.dart @@ -3,14 +3,28 @@ import 'package:supabase_flutter/supabase_flutter.dart'; import '../models/user_office.dart'; import 'supabase_provider.dart'; +import 'stream_recovery.dart'; final userOfficesProvider = StreamProvider>((ref) { final client = ref.watch(supabaseClientProvider); - return client - .from('user_offices') - .stream(primaryKey: ['user_id', 'office_id']) - .order('created_at') - .map((rows) => rows.map(UserOffice.fromMap).toList()); + + final wrapper = StreamRecoveryWrapper( + stream: client + .from('user_offices') + .stream(primaryKey: ['user_id', 'office_id']) + .order('created_at'), + onPollData: () async { + final data = await client + .from('user_offices') + .select() + .order('created_at'); + return data.map(UserOffice.fromMap).toList(); + }, + fromMap: UserOffice.fromMap, + ); + + ref.onDispose(wrapper.dispose); + return wrapper.stream.map((result) => result.data); }); final userOfficesControllerProvider = Provider((ref) { diff --git a/lib/providers/workforce_provider.dart b/lib/providers/workforce_provider.dart index 0c8bc595..f0113aab 100644 --- a/lib/providers/workforce_provider.dart +++ b/lib/providers/workforce_provider.dart @@ -6,6 +6,7 @@ import '../models/duty_schedule.dart'; import '../models/swap_request.dart'; import 'profile_provider.dart'; import 'supabase_provider.dart'; +import 'stream_recovery.dart'; final geofenceProvider = FutureProvider((ref) async { final client = ref.watch(supabaseClientProvider); @@ -28,16 +29,30 @@ final dutySchedulesProvider = StreamProvider>((ref) { } final isAdmin = profile.role == 'admin' || profile.role == 'dispatcher'; - final base = client.from('duty_schedules').stream(primaryKey: ['id']); - if (isAdmin) { - return base - .order('start_time') - .map((rows) => rows.map(DutySchedule.fromMap).toList()); - } - return base - .eq('user_id', profile.id) - .order('start_time') - .map((rows) => rows.map(DutySchedule.fromMap).toList()); + + final wrapper = StreamRecoveryWrapper( + stream: isAdmin + ? client + .from('duty_schedules') + .stream(primaryKey: ['id']) + .order('start_time') + : client + .from('duty_schedules') + .stream(primaryKey: ['id']) + .eq('user_id', profile.id) + .order('start_time'), + onPollData: () async { + final query = client.from('duty_schedules').select(); + final data = isAdmin + ? await query.order('start_time') + : await query.eq('user_id', profile.id).order('start_time'); + return data.map(DutySchedule.fromMap).toList(); + }, + fromMap: DutySchedule.fromMap, + ); + + ref.onDispose(wrapper.dispose); + return wrapper.stream.map((result) => result.data); }); /// Fetch duty schedules by a list of IDs (used by UI when swap requests reference @@ -88,24 +103,36 @@ final swapRequestsProvider = StreamProvider>((ref) { } final isAdmin = profile.role == 'admin' || profile.role == 'dispatcher'; - final base = client.from('swap_requests').stream(primaryKey: ['id']); - if (isAdmin) { - return base - .order('created_at', ascending: false) - .map((rows) => rows.map(SwapRequest.fromMap).toList()); - } - return base - .order('created_at', ascending: false) - .map( - (rows) => rows - .where( - (row) => - row['requester_id'] == profile.id || - row['recipient_id'] == profile.id, - ) - .map(SwapRequest.fromMap) - .toList(), - ); + + final wrapper = StreamRecoveryWrapper( + stream: isAdmin + ? client + .from('swap_requests') + .stream(primaryKey: ['id']) + .order('created_at', ascending: false) + : client + .from('swap_requests') + .stream(primaryKey: ['id']) + .order('created_at', ascending: false), + onPollData: () async { + final data = await client + .from('swap_requests') + .select() + .order('created_at', ascending: false); + return data.map(SwapRequest.fromMap).toList(); + }, + fromMap: SwapRequest.fromMap, + ); + + ref.onDispose(wrapper.dispose); + return wrapper.stream.map((result) { + return result.data + .where( + (row) => + row.requesterId == profile.id || row.recipientId == profile.id, + ) + .toList(); + }); }); final workforceControllerProvider = Provider((ref) { diff --git a/lib/routing/app_router.dart b/lib/routing/app_router.dart index 012b588b..b4f6ca3a 100644 --- a/lib/routing/app_router.dart +++ b/lib/routing/app_router.dart @@ -161,13 +161,17 @@ final appRouterProvider = Provider((ref) { class RouterNotifier extends ChangeNotifier { RouterNotifier(this.ref) { _authSub = ref.listen(authStateChangesProvider, (previous, next) { - // Enforce auth-level ban when a session becomes available. + // Only enforce lock on successful sign-in events, not on every auth state change if (next is AsyncData) { final authState = next.value; final session = authState?.session; - if (session != null) { - // Fire-and-forget enforcement (best-effort client-side sign-out) - enforceLockForCurrentUser(ref.read(supabaseClientProvider)); + // Only check for bans when we have a session and the previous state didn't + final previousSession = previous is AsyncData + ? previous.value?.session + : null; + if (session != null && previousSession == null) { + // User just signed in; enforce lock check + _enforceLockAsync(); } } notifyListeners(); @@ -180,6 +184,25 @@ class RouterNotifier extends ChangeNotifier { final Ref ref; late final ProviderSubscription _authSub; late final ProviderSubscription _profileSub; + bool _lockEnforcementInProgress = false; + + /// Safely enforce lock in the background, preventing concurrent calls + void _enforceLockAsync() { + // Prevent concurrent enforcement calls + if (_lockEnforcementInProgress) return; + _lockEnforcementInProgress = true; + + // Use Future.microtask to defer execution and avoid blocking + Future.microtask(() async { + try { + await enforceLockForCurrentUser(ref.read(supabaseClientProvider)); + } catch (e) { + debugPrint('RouterNotifier: lock enforcement error: $e'); + } finally { + _lockEnforcementInProgress = false; + } + }); + } @override void dispose() { diff --git a/lib/screens/dashboard/dashboard_screen.dart b/lib/screens/dashboard/dashboard_screen.dart index b118119b..f924e517 100644 --- a/lib/screens/dashboard/dashboard_screen.dart +++ b/lib/screens/dashboard/dashboard_screen.dart @@ -13,6 +13,7 @@ import '../../providers/profile_provider.dart'; import '../../providers/tasks_provider.dart'; import '../../providers/tickets_provider.dart'; import '../../widgets/responsive_body.dart'; +import '../../widgets/reconnect_overlay.dart'; import '../../providers/realtime_controller.dart'; import 'package:skeletonizer/skeletonizer.dart'; import '../../theme/app_surfaces.dart'; @@ -353,7 +354,7 @@ class _DashboardScreenState extends State { return ResponsiveBody( child: Skeletonizer( - enabled: realtime.isConnecting, + enabled: realtime.isAnyStreamRecovering, child: LayoutBuilder( builder: (context, constraints) { final sections = [ @@ -449,43 +450,11 @@ class _DashboardScreenState extends State { ), ), ), - if (realtime.isConnecting) - Positioned.fill( - child: AbsorbPointer( - absorbing: true, - child: Container( - color: Theme.of( - context, - ).colorScheme.surface.withAlpha((0.35 * 255).round()), - alignment: Alignment.topCenter, - padding: const EdgeInsets.only(top: 36), - child: SizedBox( - width: 280, - child: Card( - elevation: 4, - child: Padding( - padding: const EdgeInsets.all(12.0), - child: Row( - mainAxisSize: MainAxisSize.min, - children: const [ - SizedBox( - width: 20, - height: 20, - child: CircularProgressIndicator( - strokeWidth: 2, - ), - ), - SizedBox(width: 12), - Expanded( - child: Text('Reconnecting realtime…'), - ), - ], - ), - ), - ), - ), - ), - ), + if (realtime.isAnyStreamRecovering) + const Positioned( + bottom: 16, + right: 16, + child: ReconnectIndicator(), ), ], ); diff --git a/lib/screens/tasks/task_detail_screen.dart b/lib/screens/tasks/task_detail_screen.dart index db089a8a..fa7e92dc 100644 --- a/lib/screens/tasks/task_detail_screen.dart +++ b/lib/screens/tasks/task_detail_screen.dart @@ -239,7 +239,7 @@ class _TaskDetailScreenState extends ConsumerState final realtime = ref.watch(realtimeControllerProvider); final isRetrieving = - realtime.isConnecting || + realtime.isAnyStreamRecovering || tasksAsync.isLoading || ticketsAsync.isLoading || officesAsync.isLoading || @@ -2684,37 +2684,46 @@ class _TaskDetailScreenState extends ConsumerState final typingController = _maybeTypingController(typingChannelId); typingController?.stopTyping(); - final message = await ref - .read(ticketsControllerProvider) - .sendTaskMessage( - taskId: task.id, - ticketId: task.ticketId, - content: content, - ); + // Capture mentioned user ids and clear the composer immediately so the + // UI does not block while the network call completes. Perform the send + // and mention notification creation in a background Future. final mentionUserIds = _extractMentionedUserIds( content, profiles, currentUserId, ); - if (mentionUserIds.isNotEmpty && currentUserId != null) { - await ref - .read(notificationsControllerProvider) - .createMentionNotifications( - userIds: mentionUserIds, - actorId: currentUserId, - ticketId: task.ticketId, - taskId: task.id, - messageId: message.id, - ); - } - ref.invalidate(taskMessagesProvider(task.id)); - if (task.ticketId != null) { - ref.invalidate(ticketMessagesProvider(task.ticketId!)); - } if (mounted) { _messageController.clear(); _clearMentions(); } + + Future(() async { + try { + final message = await ref + .read(ticketsControllerProvider) + .sendTaskMessage( + taskId: task.id, + ticketId: task.ticketId, + content: content, + ); + + if (mentionUserIds.isNotEmpty && currentUserId != null) { + try { + await ref + .read(notificationsControllerProvider) + .createMentionNotifications( + userIds: mentionUserIds, + actorId: currentUserId, + ticketId: task.ticketId, + taskId: task.id, + messageId: message.id, + ); + } catch (_) {} + } + } catch (e, st) { + debugPrint('sendTaskMessage error: $e\n$st'); + } + }); } void _handleComposerChanged( diff --git a/lib/screens/tasks/tasks_list_screen.dart b/lib/screens/tasks/tasks_list_screen.dart index 193c0d40..90719307 100644 --- a/lib/screens/tasks/tasks_list_screen.dart +++ b/lib/screens/tasks/tasks_list_screen.dart @@ -104,7 +104,7 @@ class _TasksListScreenState extends ConsumerState final realtime = ref.watch(realtimeControllerProvider); final showSkeleton = - realtime.isConnecting || + realtime.isAnyStreamRecovering || tasksAsync.maybeWhen(loading: () => true, orElse: () => false) || ticketsAsync.maybeWhen(loading: () => true, orElse: () => false) || officesAsync.maybeWhen(loading: () => true, orElse: () => false) || @@ -534,7 +534,7 @@ class _TasksListScreenState extends ConsumerState ), ), ), - const ReconnectOverlay(), + const ReconnectIndicator(), ], ); } diff --git a/lib/screens/tickets/ticket_detail_screen.dart b/lib/screens/tickets/ticket_detail_screen.dart index 375ea971..073d46ac 100644 --- a/lib/screens/tickets/ticket_detail_screen.dart +++ b/lib/screens/tickets/ticket_detail_screen.dart @@ -528,29 +528,41 @@ class _TicketDetailScreenState extends ConsumerState { _maybeTypingController(widget.ticketId)?.stopTyping(); - final message = await ref - .read(ticketsControllerProvider) - .sendTicketMessage(ticketId: widget.ticketId, content: content); + // Capture mentions and clear the composer immediately so the UI + // remains snappy. Perform the network send and notification creation + // in a fire-and-forget background Future. final mentionUserIds = _extractMentionedUserIds( content, profiles, currentUserId, ); - if (mentionUserIds.isNotEmpty && currentUserId != null) { - await ref - .read(notificationsControllerProvider) - .createMentionNotifications( - userIds: mentionUserIds, - actorId: currentUserId, - ticketId: widget.ticketId, - messageId: message.id, - ); - } - ref.invalidate(ticketMessagesProvider(widget.ticketId)); if (mounted) { _messageController.clear(); _clearMentions(); } + + Future(() async { + try { + final message = await ref + .read(ticketsControllerProvider) + .sendTicketMessage(ticketId: widget.ticketId, content: content); + + if (mentionUserIds.isNotEmpty && currentUserId != null) { + try { + await ref + .read(notificationsControllerProvider) + .createMentionNotifications( + userIds: mentionUserIds, + actorId: currentUserId, + ticketId: widget.ticketId, + messageId: message.id, + ); + } catch (_) {} + } + } catch (e, st) { + debugPrint('sendTicketMessage error: $e\n$st'); + } + }); } List _extractMentionedUserIds( diff --git a/lib/screens/tickets/tickets_list_screen.dart b/lib/screens/tickets/tickets_list_screen.dart index b8571548..aa290a48 100644 --- a/lib/screens/tickets/tickets_list_screen.dart +++ b/lib/screens/tickets/tickets_list_screen.dart @@ -64,7 +64,7 @@ class _TicketsListScreenState extends ConsumerState { final profilesAsync = ref.watch(profilesProvider); final showSkeleton = - realtime.isConnecting || + realtime.isAnyStreamRecovering || ticketsAsync.maybeWhen(loading: () => true, orElse: () => false) || officesAsync.maybeWhen(loading: () => true, orElse: () => false) || profilesAsync.maybeWhen(loading: () => true, orElse: () => false) || @@ -347,7 +347,7 @@ class _TicketsListScreenState extends ConsumerState { ), ), ), - const ReconnectOverlay(), + const ReconnectIndicator(), ], ); } diff --git a/lib/services/notification_bridge.dart b/lib/services/notification_bridge.dart index 2ebd9e85..80a84f3d 100644 --- a/lib/services/notification_bridge.dart +++ b/lib/services/notification_bridge.dart @@ -4,7 +4,6 @@ import 'package:firebase_messaging/firebase_messaging.dart'; import '../models/notification_item.dart'; import '../providers/notifications_provider.dart'; -import '../providers/realtime_controller.dart'; /// Wraps the app and installs both a Supabase realtime listener and the /// FCM handlers described in the frontend design. @@ -46,11 +45,11 @@ class _NotificationBridgeState extends ConsumerState @override void didChangeAppLifecycleState(AppLifecycleState state) { super.didChangeAppLifecycleState(state); + // App lifecycle is now monitored, but individual streams handle their own + // recovery via StreamRecoveryWrapper. This no longer forces a global reconnect, + // which was the blocking behavior users complained about. if (state == AppLifecycleState.resumed) { - try { - // Trigger a best-effort realtime reconnection when the app resumes. - ref.read(realtimeControllerProvider).recoverConnection(); - } catch (_) {} + // Future: Could trigger stream-specific recovery hints if needed. } } diff --git a/lib/widgets/reconnect_overlay.dart b/lib/widgets/reconnect_overlay.dart index 5b275aba..7e0a4519 100644 --- a/lib/widgets/reconnect_overlay.dart +++ b/lib/widgets/reconnect_overlay.dart @@ -3,127 +3,60 @@ import 'package:flutter_riverpod/flutter_riverpod.dart'; import '../providers/realtime_controller.dart'; -class ReconnectOverlay extends ConsumerWidget { - const ReconnectOverlay({super.key}); +/// Subtle, non-blocking connection status indicator. +/// Shows in the bottom-right corner when streams are recovering/stale. +/// Unlike the old blocking overlay, this does NOT prevent user interaction. +class ReconnectIndicator extends ConsumerWidget { + const ReconnectIndicator({super.key}); @override Widget build(BuildContext context, WidgetRef ref) { final ctrl = ref.watch(realtimeControllerProvider); - if (!ctrl.isConnecting && !ctrl.isFailed) return const SizedBox.shrink(); - if (ctrl.isFailed) { - return Positioned.fill( - child: AbsorbPointer( - absorbing: true, - child: Center( - child: SizedBox( - width: 420, - child: Card( - elevation: 6, - child: Padding( - padding: const EdgeInsets.all(16.0), - child: Column( - mainAxisSize: MainAxisSize.min, - children: [ - Text( - 'Realtime connection failed', - style: Theme.of(context).textTheme.titleMedium, - ), - const SizedBox(height: 8), - Text( - ctrl.lastError ?? - 'Unable to reconnect after multiple attempts.', - style: Theme.of(context).textTheme.bodyMedium, - ), - const SizedBox(height: 12), - Row( - mainAxisAlignment: MainAxisAlignment.end, - children: [ - TextButton( - onPressed: () => ctrl.retry(), - child: const Text('Retry'), - ), - ], - ), - ], - ), - ), - ), - ), - ), - ), - ); + // Hide when not recovering + if (!ctrl.isAnyStreamRecovering) { + return const SizedBox.shrink(); } - // isConnecting: show richer skeleton-like placeholders - return Positioned.fill( - child: AbsorbPointer( - absorbing: true, - child: Container( - color: Theme.of( - context, - ).colorScheme.surface.withAlpha((0.35 * 255).round()), - child: Center( - child: SizedBox( - width: 640, - child: Card( - elevation: 4, - child: Padding( - padding: const EdgeInsets.all(16.0), - child: Column( - mainAxisSize: MainAxisSize.min, - crossAxisAlignment: CrossAxisAlignment.stretch, - children: [ - // Header - Container( - height: 20, - decoration: BoxDecoration( - color: Theme.of( - context, - ).colorScheme.surfaceContainerHighest, - borderRadius: BorderRadius.circular(6), - ), - ), - const SizedBox(height: 12), - // chips row - Row( - children: [ - for (var i = 0; i < 3; i++) - Padding( - padding: const EdgeInsets.only(right: 8.0), - child: Container( - width: 100, - height: 14, - decoration: BoxDecoration( - color: Theme.of( - context, - ).colorScheme.surfaceContainerHighest, - borderRadius: BorderRadius.circular(6), - ), - ), - ), - ], - ), - const SizedBox(height: 16), - // lines representing content - for (var i = 0; i < 4; i++) ...[ - Container( - height: 12, - margin: const EdgeInsets.only(bottom: 8), - decoration: BoxDecoration( - color: Theme.of( - context, - ).colorScheme.surfaceContainerHighest, - borderRadius: BorderRadius.circular(6), - ), - ), - ], - ], - ), + return Positioned( + bottom: 16, + right: 16, + child: Container( + padding: const EdgeInsets.symmetric(horizontal: 12, vertical: 8), + decoration: BoxDecoration( + color: Theme.of(context).colorScheme.surface, + border: Border.all( + color: Theme.of(context).colorScheme.outline, + width: 1, + ), + borderRadius: BorderRadius.circular(8), + boxShadow: [ + BoxShadow( + color: Colors.black.withAlpha(3), + blurRadius: 8, + offset: const Offset(0, 2), + ), + ], + ), + child: Row( + mainAxisSize: MainAxisSize.min, + children: [ + SizedBox( + width: 12, + height: 12, + child: CircularProgressIndicator( + strokeWidth: 2, + valueColor: AlwaysStoppedAnimation( + Theme.of(context).colorScheme.primary, ), ), ), - ), + const SizedBox(width: 8), + Text( + 'Reconnecting...', + style: Theme.of(context).textTheme.labelSmall, + ), + ], ), ), );