Implemented per stream subscription recovery with polling fallback

This commit is contained in:
Marc Rejohn Castillano 2026-03-01 17:24:04 +08:00
parent e91e7b43d2
commit c9479f01f0
19 changed files with 894 additions and 494 deletions

View File

@ -6,6 +6,7 @@ import '../utils/device_id.dart';
import '../models/notification_item.dart'; import '../models/notification_item.dart';
import 'profile_provider.dart'; import 'profile_provider.dart';
import 'supabase_provider.dart'; import 'supabase_provider.dart';
import 'stream_recovery.dart';
import '../utils/app_time.dart'; import '../utils/app_time.dart';
final notificationsProvider = StreamProvider<List<NotificationItem>>((ref) { final notificationsProvider = StreamProvider<List<NotificationItem>>((ref) {
@ -14,12 +15,26 @@ final notificationsProvider = StreamProvider<List<NotificationItem>>((ref) {
return const Stream.empty(); return const Stream.empty();
} }
final client = ref.watch(supabaseClientProvider); final client = ref.watch(supabaseClientProvider);
return client
final wrapper = StreamRecoveryWrapper<NotificationItem>(
stream: client
.from('notifications') .from('notifications')
.stream(primaryKey: ['id']) .stream(primaryKey: ['id'])
.eq('user_id', userId) .eq('user_id', userId)
.order('created_at', ascending: false) .order('created_at', ascending: false),
.map((rows) => rows.map(NotificationItem.fromMap).toList()); onPollData: () async {
final data = await client
.from('notifications')
.select()
.eq('user_id', userId)
.order('created_at', ascending: false);
return data.map(NotificationItem.fromMap).toList();
},
fromMap: NotificationItem.fromMap,
);
ref.onDispose(wrapper.dispose);
return wrapper.stream.map((result) => result.data);
}); });
final unreadNotificationsCountProvider = Provider<int>((ref) { final unreadNotificationsCountProvider = Provider<int>((ref) {

View File

@ -6,6 +6,7 @@ import 'package:supabase_flutter/supabase_flutter.dart';
import '../models/profile.dart'; import '../models/profile.dart';
import 'auth_provider.dart'; import 'auth_provider.dart';
import 'supabase_provider.dart'; import 'supabase_provider.dart';
import 'stream_recovery.dart';
final currentUserIdProvider = Provider<String?>((ref) { final currentUserIdProvider = Provider<String?>((ref) {
final authState = ref.watch(authStateChangesProvider); final authState = ref.watch(authStateChangesProvider);
@ -23,20 +24,43 @@ final currentProfileProvider = StreamProvider<Profile?>((ref) {
return const Stream.empty(); return const Stream.empty();
} }
final client = ref.watch(supabaseClientProvider); final client = ref.watch(supabaseClientProvider);
return client
final wrapper = StreamRecoveryWrapper<Profile?>(
stream: client.from('profiles').stream(primaryKey: ['id']).eq('id', userId),
onPollData: () async {
final data = await client
.from('profiles') .from('profiles')
.stream(primaryKey: ['id']) .select()
.eq('id', userId) .eq('id', userId)
.map((rows) => rows.isEmpty ? null : Profile.fromMap(rows.first)); .maybeSingle();
return data == null ? [] : [Profile.fromMap(data)];
},
fromMap: Profile.fromMap,
);
ref.onDispose(wrapper.dispose);
return wrapper.stream.map((result) {
return result.data.isEmpty ? null : result.data.first;
});
}); });
final profilesProvider = StreamProvider<List<Profile>>((ref) { final profilesProvider = StreamProvider<List<Profile>>((ref) {
final client = ref.watch(supabaseClientProvider); final client = ref.watch(supabaseClientProvider);
return client
final wrapper = StreamRecoveryWrapper<Profile>(
stream: client
.from('profiles') .from('profiles')
.stream(primaryKey: ['id']) .stream(primaryKey: ['id'])
.order('full_name') .order('full_name'),
.map((rows) => rows.map(Profile.fromMap).toList()); onPollData: () async {
final data = await client.from('profiles').select().order('full_name');
return data.map(Profile.fromMap).toList();
},
fromMap: Profile.fromMap,
);
ref.onDispose(wrapper.dispose);
return wrapper.stream.map((result) => result.data);
}); });
/// Controller for the current user's profile (update full name / password). /// Controller for the current user's profile (update full name / password).

View File

@ -1,7 +1,6 @@
import 'dart:async'; import 'dart:async';
import 'package:flutter/foundation.dart'; import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:supabase_flutter/supabase_flutter.dart'; import 'package:supabase_flutter/supabase_flutter.dart';
@ -16,31 +15,33 @@ final realtimeControllerProvider = ChangeNotifierProvider<RealtimeController>((
return controller; return controller;
}); });
/// A lightweight controller that attempts to recover the Supabase Realtime /// Simplified realtime controller for app-lifecycle awareness.
/// connection when the app returns to the foreground or when auth tokens /// Individual streams now handle their own recovery via [StreamRecoveryWrapper].
/// are refreshed. /// This controller only coordinates:
/// - App lifecycle transitions (background/foreground)
/// - Auth token refreshes
/// - Global connection state notification (for UI indicators)
class RealtimeController extends ChangeNotifier { class RealtimeController extends ChangeNotifier {
final SupabaseClient _client; final SupabaseClient _client;
bool isConnecting = false;
bool isFailed = false;
String? lastError;
int attempts = 0;
final int maxAttempts;
bool _disposed = false; bool _disposed = false;
RealtimeController(this._client, {this.maxAttempts = 4}) { /// Global flag: true if any stream is recovering; used for subtle UI indicator.
bool isAnyStreamRecovering = false;
RealtimeController(this._client) {
_init(); _init();
} }
void _init() { void _init() {
try { try {
// Listen for auth changes and try to recover the realtime connection // Listen for auth changes; ensure tokens are fresh for realtime.
// Individual streams will handle their own reconnection.
_client.auth.onAuthStateChange.listen((data) { _client.auth.onAuthStateChange.listen((data) {
final event = data.event; final event = data.event;
if (event == AuthChangeEvent.tokenRefreshed || // Only refresh token on existing session refreshes, not immediately after sign-in
event == AuthChangeEvent.signedIn) { // (sign-in already provides a fresh token)
recoverConnection(); if (event == AuthChangeEvent.tokenRefreshed) {
_ensureTokenFresh();
} }
}); });
} catch (e) { } catch (e) {
@ -48,77 +49,37 @@ class RealtimeController extends ChangeNotifier {
} }
} }
/// Try to reconnect the realtime client using a small exponential backoff. /// Ensure auth token is fresh for upcoming realtime operations.
Future<void> recoverConnection() async { /// This is called after token refresh events, not immediately after sign-in.
Future<void> _ensureTokenFresh() async {
if (_disposed) return; if (_disposed) return;
if (isConnecting) return;
isFailed = false;
lastError = null;
isConnecting = true;
notifyListeners();
try { try {
int delaySeconds = 1; // Defensive: only refresh if the method exists (SDK version compatibility)
while (attempts < maxAttempts && !_disposed) { final authDynamic = _client.auth as dynamic;
attempts++; if (authDynamic.refreshSession != null) {
try { await authDynamic.refreshSession?.call();
// Best-effort disconnect then connect so the realtime client picks }
// up any refreshed tokens.
try {
// Try to refresh session/token if the SDK supports it. Use dynamic
// to avoid depending on a specific SDK version symbol.
try {
await (_client.auth as dynamic).refreshSession?.call();
} catch (_) {}
// Best-effort disconnect then connect so the realtime client picks
// up any refreshed tokens. The realtime connect/disconnect are
// marked internal by the SDK; suppress the lint here since this
// is a deliberate best-effort recovery.
// ignore: invalid_use_of_internal_member
_client.realtime.disconnect();
} catch (_) {}
await Future.delayed(const Duration(milliseconds: 300));
try {
// ignore: invalid_use_of_internal_member
_client.realtime.connect();
} catch (_) {}
// Give the socket a moment to stabilise.
await Future.delayed(const Duration(seconds: 1));
// Success (best-effort). Reset attempt counter and clear failure.
attempts = 0;
isFailed = false;
lastError = null;
break;
} catch (e) { } catch (e) {
lastError = e.toString(); debugPrint('RealtimeController: token refresh failed: $e');
if (attempts >= maxAttempts) {
isFailed = true;
break;
}
await Future.delayed(Duration(seconds: delaySeconds));
delaySeconds = delaySeconds * 2;
}
}
} finally {
if (!_disposed) {
isConnecting = false;
notifyListeners();
}
} }
} }
/// Retry a failed recovery attempt. /// Notify that a stream is starting recovery. Used for global UI indicator.
Future<void> retry() async { void markStreamRecovering() {
if (_disposed) return; if (!isAnyStreamRecovering) {
attempts = 0; isAnyStreamRecovering = true;
isFailed = false;
lastError = null;
notifyListeners(); notifyListeners();
await recoverConnection(); }
}
/// Notify that stream recovery completed. If all streams recovered, update state.
void markStreamRecovered() {
// In practice, individual streams notify their own status via statusChanges.
// This is kept for potential future global coordination.
if (isAnyStreamRecovering) {
isAnyStreamRecovering = false;
notifyListeners();
}
} }
@override @override

View File

@ -2,14 +2,22 @@ import 'package:flutter_riverpod/flutter_riverpod.dart';
import '../models/service.dart'; import '../models/service.dart';
import 'supabase_provider.dart'; import 'supabase_provider.dart';
import 'stream_recovery.dart';
final servicesProvider = StreamProvider<List<Service>>((ref) { final servicesProvider = StreamProvider<List<Service>>((ref) {
final client = ref.watch(supabaseClientProvider); final client = ref.watch(supabaseClientProvider);
return client
.from('services') final wrapper = StreamRecoveryWrapper<Service>(
.stream(primaryKey: ['id']) stream: client.from('services').stream(primaryKey: ['id']).order('name'),
.order('name') onPollData: () async {
.map((rows) => rows.map((r) => Service.fromMap(r)).toList()); final data = await client.from('services').select().order('name');
return data.map(Service.fromMap).toList();
},
fromMap: Service.fromMap,
);
ref.onDispose(wrapper.dispose);
return wrapper.stream.map((result) => result.data);
}); });
final servicesOnceProvider = FutureProvider<List<Service>>((ref) async { final servicesOnceProvider = FutureProvider<List<Service>>((ref) async {

View File

@ -0,0 +1,243 @@
import 'dart:async';
import 'package:flutter/foundation.dart';
/// Connection status for a single stream subscription.
enum StreamConnectionStatus {
/// Connected and receiving live updates.
connected,
/// Attempting to recover the connection; data may be stale.
recovering,
/// Connection failed; attempting to fallback to polling.
polling,
/// Connection and polling both failed; data is stale.
stale,
/// Fatal error; stream will not recover without manual intervention.
failed,
}
/// Represents the result of a polling attempt.
class PollResult<T> {
final List<T> data;
final bool success;
final String? error;
PollResult({required this.data, required this.success, this.error});
}
/// Configuration for stream recovery behavior.
class StreamRecoveryConfig {
/// Maximum number of automatic recovery attempts before giving up.
final int maxRecoveryAttempts;
/// Initial delay (in milliseconds) before first recovery attempt.
final int initialDelayMs;
/// Maximum delay (in milliseconds) for exponential backoff.
final int maxDelayMs;
/// Multiplier for exponential backoff (e.g., 2.0 = double each attempt).
final double backoffMultiplier;
/// Enable polling fallback when realtime fails.
final bool enablePollingFallback;
/// Polling interval (in milliseconds) when realtime is unavailable.
final int pollingIntervalMs;
const StreamRecoveryConfig({
this.maxRecoveryAttempts = 4,
this.initialDelayMs = 1000,
this.maxDelayMs = 32000, // 32 seconds max
this.backoffMultiplier = 2.0,
this.enablePollingFallback = true,
this.pollingIntervalMs = 5000, // Poll every 5 seconds
});
}
/// Wraps a Supabase realtime stream with automatic recovery, polling fallback,
/// and connection status tracking. Provides graceful degradation when the
/// realtime connection fails.
///
/// Usage:
/// ```dart
/// final wrappedStream = StreamRecoveryWrapper(
/// stream: client.from('tasks').stream(primaryKey: ['id']),
/// onPollData: () => fetchTasksViaRest(),
/// );
/// // wrappedStream.stream emits data with connection status in metadata
/// ```
class StreamRecoveryWrapper<T> {
final Stream<List<Map<String, dynamic>>> _realtimeStream;
final Future<List<T>> Function() _onPollData;
final T Function(Map<String, dynamic>) _fromMap;
final StreamRecoveryConfig _config;
StreamConnectionStatus _connectionStatus = StreamConnectionStatus.connected;
int _recoveryAttempts = 0;
Timer? _pollingTimer;
StreamController<StreamConnectionStatus>? _statusController;
Stream<StreamRecoveryResult<T>>? _cachedStream;
StreamRecoveryWrapper({
required Stream<List<Map<String, dynamic>>> stream,
required Future<List<T>> Function() onPollData,
required T Function(Map<String, dynamic>) fromMap,
StreamRecoveryConfig config = const StreamRecoveryConfig(),
}) : _realtimeStream = stream,
_onPollData = onPollData,
_fromMap = fromMap,
_config = config;
/// The wrapped stream that emits recovery results with metadata.
Stream<StreamRecoveryResult<T>> get stream =>
_cachedStream ??= _buildStream();
/// Current connection status of this stream.
StreamConnectionStatus get connectionStatus => _connectionStatus;
/// Notifies listeners when connection status changes.
Stream<StreamConnectionStatus> get statusChanges {
_statusController ??= StreamController<StreamConnectionStatus>.broadcast();
return _statusController!.stream;
}
/// Builds the wrapped stream with recovery and polling logic.
Stream<StreamRecoveryResult<T>> _buildStream() async* {
int delayMs = _config.initialDelayMs;
while (true) {
try {
_setStatus(StreamConnectionStatus.connected);
// Try realtime stream first
yield* _realtimeStream
.map(
(rows) => StreamRecoveryResult<T>(
data: rows.map(_fromMap).toList(),
connectionStatus: StreamConnectionStatus.connected,
isStale: false,
),
)
.handleError((error) {
debugPrint(
'StreamRecoveryWrapper: realtime stream error: $error',
);
_setStatus(StreamConnectionStatus.recovering);
throw error; // Propagate to outer handler
});
// If we get here, stream completed normally (shouldn't happen)
break;
} catch (e) {
debugPrint(
'StreamRecoveryWrapper: realtime failed, error=$e, '
'attempts=$_recoveryAttempts/${_config.maxRecoveryAttempts}',
);
// Exceeded max recovery attempts?
if (_recoveryAttempts >= _config.maxRecoveryAttempts) {
if (_config.enablePollingFallback) {
_setStatus(StreamConnectionStatus.polling);
yield* _pollingFallback();
break;
} else {
_setStatus(StreamConnectionStatus.failed);
yield StreamRecoveryResult<T>(
data: const [],
connectionStatus: StreamConnectionStatus.failed,
isStale: true,
error: e.toString(),
);
break;
}
}
// Exponential backoff before retry
_recoveryAttempts++;
await Future.delayed(Duration(milliseconds: delayMs));
delayMs = (delayMs * _config.backoffMultiplier).toInt();
if (delayMs > _config.maxDelayMs) {
delayMs = _config.maxDelayMs;
}
}
}
}
/// Fallback to periodic REST polling when realtime is unavailable.
Stream<StreamRecoveryResult<T>> _pollingFallback() async* {
while (_connectionStatus == StreamConnectionStatus.polling) {
try {
final data = await _onPollData();
yield StreamRecoveryResult<T>(
data: data,
connectionStatus: StreamConnectionStatus.polling,
isStale: true, // Mark as stale since it's no longer live
);
await Future.delayed(Duration(milliseconds: _config.pollingIntervalMs));
} catch (e) {
debugPrint('StreamRecoveryWrapper: polling error: $e');
_setStatus(StreamConnectionStatus.stale);
yield StreamRecoveryResult<T>(
data: const [],
connectionStatus: StreamConnectionStatus.stale,
isStale: true,
error: e.toString(),
);
break;
}
}
}
/// Update connection status and notify listeners.
void _setStatus(StreamConnectionStatus status) {
if (_connectionStatus != status) {
_connectionStatus = status;
_statusController?.add(status);
}
}
/// Manually trigger a recovery attempt.
void retry() {
_recoveryAttempts = 0;
_setStatus(StreamConnectionStatus.recovering);
}
/// Clean up resources.
void dispose() {
_pollingTimer?.cancel();
_statusController?.close();
}
}
/// Result of a stream emission, including metadata about connection status.
class StreamRecoveryResult<T> {
/// The data emitted by the stream.
final List<T> data;
/// Current connection status.
final StreamConnectionStatus connectionStatus;
/// Whether the data is stale (not live from realtime).
final bool isStale;
/// Error message, if any.
final String? error;
StreamRecoveryResult({
required this.data,
required this.connectionStatus,
required this.isStale,
this.error,
});
/// True if data is live and reliable.
bool get isLive => connectionStatus == StreamConnectionStatus.connected;
/// True if we should show a "data may be stale" indicator.
bool get shouldIndicateStale =>
isStale || connectionStatus == StreamConnectionStatus.polling;
}

View File

@ -14,6 +14,7 @@ import 'profile_provider.dart';
import 'supabase_provider.dart'; import 'supabase_provider.dart';
import 'tickets_provider.dart'; import 'tickets_provider.dart';
import 'user_offices_provider.dart'; import 'user_offices_provider.dart';
import 'stream_recovery.dart';
import '../utils/app_time.dart'; import '../utils/app_time.dart';
// Helper to insert activity log rows while sanitizing nulls and // Helper to insert activity log rows while sanitizing nulls and
@ -137,12 +138,48 @@ final tasksProvider = StreamProvider<List<Task>>((ref) {
return Stream.value(const <Task>[]); return Stream.value(const <Task>[]);
} }
// NOTE: Supabase stream builder does not support `.range(...)` // Wrap realtime stream with recovery logic
// apply pagination and remaining filters client-side after mapping. final wrapper = StreamRecoveryWrapper<Task>(
final baseStream = client.from('tasks').stream(primaryKey: ['id']); stream: client.from('tasks').stream(primaryKey: ['id']),
onPollData: () async {
final data = await client.from('tasks').select();
return data.cast<Map<String, dynamic>>().map(Task.fromMap).toList();
},
fromMap: Task.fromMap,
);
return baseStream.asyncMap((rows) async { ref.onDispose(wrapper.dispose);
final rowsList = (rows as List<dynamic>).cast<Map<String, dynamic>>();
// Process tasks with filtering/pagination after recovery
return wrapper.stream.asyncMap((result) async {
final rowsList = result.data
.map(
(task) => <String, dynamic>{
'id': task.id,
'task_number': task.taskNumber,
'office_id': task.officeId,
'ticket_id': task.ticketId,
'title': task.title,
'description': task.description,
'status': task.status,
'priority': task.priority,
'creator_id': task.creatorId,
'created_at': task.createdAt.toIso8601String(),
'started_at': task.startedAt?.toIso8601String(),
'completed_at': task.completedAt?.toIso8601String(),
'requested_by': task.requestedBy,
'noted_by': task.notedBy,
'received_by': task.receivedBy,
'queue_order': task.queueOrder,
'request_type': task.requestType,
'request_type_other': task.requestTypeOther,
'request_category': task.requestCategory,
'action_taken': task.actionTaken,
'cancellation_reason': task.cancellationReason,
'cancelled_at': task.cancelledAt?.toIso8601String(),
},
)
.toList();
final allowedTicketIds = final allowedTicketIds =
ticketsAsync.valueOrNull?.map((ticket) => ticket.id).toList() ?? ticketsAsync.valueOrNull?.map((ticket) => ticket.id).toList() ??
@ -325,22 +362,46 @@ final tasksQueryProvider = StateProvider<TaskQuery>((ref) => const TaskQuery());
final taskAssignmentsProvider = StreamProvider<List<TaskAssignment>>((ref) { final taskAssignmentsProvider = StreamProvider<List<TaskAssignment>>((ref) {
final client = ref.watch(supabaseClientProvider); final client = ref.watch(supabaseClientProvider);
return client
final wrapper = StreamRecoveryWrapper<TaskAssignment>(
stream: client
.from('task_assignments') .from('task_assignments')
.stream(primaryKey: ['task_id', 'user_id']) .stream(primaryKey: ['task_id', 'user_id']),
.map((rows) => rows.map(TaskAssignment.fromMap).toList()); onPollData: () async {
final data = await client.from('task_assignments').select();
return data.map(TaskAssignment.fromMap).toList();
},
fromMap: TaskAssignment.fromMap,
);
ref.onDispose(wrapper.dispose);
return wrapper.stream.map((result) => result.data);
}); });
/// Stream of activity logs for a single task. /// Stream of activity logs for a single task.
final taskActivityLogsProvider = final taskActivityLogsProvider =
StreamProvider.family<List<TaskActivityLog>, String>((ref, taskId) { StreamProvider.family<List<TaskActivityLog>, String>((ref, taskId) {
final client = ref.watch(supabaseClientProvider); final client = ref.watch(supabaseClientProvider);
return client
final wrapper = StreamRecoveryWrapper<TaskActivityLog>(
stream: client
.from('task_activity_logs') .from('task_activity_logs')
.stream(primaryKey: ['id']) .stream(primaryKey: ['id'])
.eq('task_id', taskId) .eq('task_id', taskId)
.order('created_at', ascending: false) .order('created_at', ascending: false),
.map((rows) => rows.map((r) => TaskActivityLog.fromMap(r)).toList()); onPollData: () async {
final data = await client
.from('task_activity_logs')
.select()
.eq('task_id', taskId)
.order('created_at', ascending: false);
return data.map((r) => TaskActivityLog.fromMap(r)).toList();
},
fromMap: TaskActivityLog.fromMap,
);
ref.onDispose(wrapper.dispose);
return wrapper.stream.map((result) => result.data);
}); });
final taskAssignmentsControllerProvider = Provider<TaskAssignmentsController>(( final taskAssignmentsControllerProvider = Provider<TaskAssignmentsController>((

View File

@ -1,24 +1,42 @@
import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'supabase_provider.dart'; import 'supabase_provider.dart';
import 'stream_recovery.dart';
import '../models/team.dart'; import '../models/team.dart';
import '../models/team_member.dart'; import '../models/team_member.dart';
/// Real-time stream of teams (keeps UI in sync with DB changes). /// Real-time stream of teams with automatic recovery and graceful degradation.
final teamsProvider = StreamProvider<List<Team>>((ref) { final teamsProvider = StreamProvider<List<Team>>((ref) {
final client = ref.watch(supabaseClientProvider); final client = ref.watch(supabaseClientProvider);
return client
.from('teams') final wrapper = StreamRecoveryWrapper<Team>(
.stream(primaryKey: ['id']) stream: client.from('teams').stream(primaryKey: ['id']).order('name'),
.order('name') onPollData: () async {
.map((rows) => rows.map((r) => Team.fromMap(r)).toList()); final data = await client.from('teams').select().order('name');
return data.map(Team.fromMap).toList();
},
fromMap: Team.fromMap,
);
ref.onDispose(wrapper.dispose);
return wrapper.stream.map((result) => result.data);
}); });
/// Real-time stream of team membership rows. /// Real-time stream of team membership rows with automatic recovery.
final teamMembersProvider = StreamProvider<List<TeamMember>>((ref) { final teamMembersProvider = StreamProvider<List<TeamMember>>((ref) {
final client = ref.watch(supabaseClientProvider); final client = ref.watch(supabaseClientProvider);
return client
final wrapper = StreamRecoveryWrapper<TeamMember>(
stream: client
.from('team_members') .from('team_members')
.stream(primaryKey: ['team_id', 'user_id']) .stream(primaryKey: ['team_id', 'user_id']),
.map((rows) => rows.map((r) => TeamMember.fromMap(r)).toList()); onPollData: () async {
final data = await client.from('team_members').select();
return data.map(TeamMember.fromMap).toList();
},
fromMap: TeamMember.fromMap,
);
ref.onDispose(wrapper.dispose);
return wrapper.stream.map((result) => result.data);
}); });

View File

@ -12,14 +12,22 @@ import 'profile_provider.dart';
import 'supabase_provider.dart'; import 'supabase_provider.dart';
import 'user_offices_provider.dart'; import 'user_offices_provider.dart';
import 'tasks_provider.dart'; import 'tasks_provider.dart';
import 'stream_recovery.dart';
final officesProvider = StreamProvider<List<Office>>((ref) { final officesProvider = StreamProvider<List<Office>>((ref) {
final client = ref.watch(supabaseClientProvider); final client = ref.watch(supabaseClientProvider);
return client
.from('offices') final wrapper = StreamRecoveryWrapper<Office>(
.stream(primaryKey: ['id']) stream: client.from('offices').stream(primaryKey: ['id']).order('name'),
.order('name') onPollData: () async {
.map((rows) => rows.map(Office.fromMap).toList()); final data = await client.from('offices').select().order('name');
return data.map(Office.fromMap).toList();
},
fromMap: Office.fromMap,
);
ref.onDispose(wrapper.dispose);
return wrapper.stream.map((result) => result.data);
}); });
final officesOnceProvider = FutureProvider<List<Office>>((ref) async { final officesOnceProvider = FutureProvider<List<Office>>((ref) async {
@ -128,16 +136,38 @@ final ticketsProvider = StreamProvider<List<Ticket>>((ref) {
profile.role == 'dispatcher' || profile.role == 'dispatcher' ||
profile.role == 'it_staff'; profile.role == 'it_staff';
// Use stream for realtime updates. Offload expensive client-side // Wrap realtime stream with recovery logic
// filtering/sorting/pagination to a background isolate via `compute` final wrapper = StreamRecoveryWrapper<Ticket>(
// so UI navigation and builds remain smooth. stream: client.from('tickets').stream(primaryKey: ['id']),
final baseStream = client.from('tickets').stream(primaryKey: ['id']); onPollData: () async {
// Polling fallback: fetch all tickets once
final data = await client.from('tickets').select();
return data.cast<Map<String, dynamic>>().map(Ticket.fromMap).toList();
},
fromMap: Ticket.fromMap,
);
return baseStream.asyncMap((rows) async { ref.onDispose(wrapper.dispose);
// rows is List<dynamic> of maps coming from Supabase
final rowsList = (rows as List<dynamic>).cast<Map<String, dynamic>>(); // Process tickets with filtering/pagination after recovery
return wrapper.stream.asyncMap((result) async {
final rowsList = result.data
.map(
(ticket) => <String, dynamic>{
'id': ticket.id,
'subject': ticket.subject,
'description': ticket.description,
'status': ticket.status,
'office_id': ticket.officeId,
'creator_id': ticket.creatorId,
'created_at': ticket.createdAt.toIso8601String(),
'responded_at': ticket.respondedAt?.toIso8601String(),
'promoted_at': ticket.promotedAt?.toIso8601String(),
'closed_at': ticket.closedAt?.toIso8601String(),
},
)
.toList();
// Prepare lightweight serializable args for background processing
final allowedOfficeIds = final allowedOfficeIds =
assignmentsAsync.valueOrNull assignmentsAsync.valueOrNull
?.where((assignment) => assignment.userId == profile.id) ?.where((assignment) => assignment.userId == profile.id)
@ -160,7 +190,6 @@ final ticketsProvider = StreamProvider<List<Ticket>>((ref) {
final processed = await compute(_processTicketsInIsolate, payload); final processed = await compute(_processTicketsInIsolate, payload);
// `processed` is List<Map<String,dynamic>> convert to Ticket objects
final tickets = (processed as List<dynamic>) final tickets = (processed as List<dynamic>)
.cast<Map<String, dynamic>>() .cast<Map<String, dynamic>>()
.map(Ticket.fromMap) .map(Ticket.fromMap)
@ -246,32 +275,73 @@ final ticketsQueryProvider = StateProvider<TicketQuery>(
final ticketMessagesProvider = final ticketMessagesProvider =
StreamProvider.family<List<TicketMessage>, String>((ref, ticketId) { StreamProvider.family<List<TicketMessage>, String>((ref, ticketId) {
final client = ref.watch(supabaseClientProvider); final client = ref.watch(supabaseClientProvider);
return client
final wrapper = StreamRecoveryWrapper<TicketMessage>(
stream: client
.from('ticket_messages') .from('ticket_messages')
.stream(primaryKey: ['id']) .stream(primaryKey: ['id'])
.eq('ticket_id', ticketId) .eq('ticket_id', ticketId)
.order('created_at', ascending: false) .order('created_at', ascending: false),
.map((rows) => rows.map(TicketMessage.fromMap).toList()); onPollData: () async {
final data = await client
.from('ticket_messages')
.select()
.eq('ticket_id', ticketId)
.order('created_at', ascending: false);
return data.map(TicketMessage.fromMap).toList();
},
fromMap: TicketMessage.fromMap,
);
ref.onDispose(wrapper.dispose);
return wrapper.stream.map((result) => result.data);
}); });
final ticketMessagesAllProvider = StreamProvider<List<TicketMessage>>((ref) { final ticketMessagesAllProvider = StreamProvider<List<TicketMessage>>((ref) {
final client = ref.watch(supabaseClientProvider); final client = ref.watch(supabaseClientProvider);
return client
final wrapper = StreamRecoveryWrapper<TicketMessage>(
stream: client
.from('ticket_messages') .from('ticket_messages')
.stream(primaryKey: ['id']) .stream(primaryKey: ['id'])
.order('created_at', ascending: false) .order('created_at', ascending: false),
.map((rows) => rows.map(TicketMessage.fromMap).toList()); onPollData: () async {
final data = await client
.from('ticket_messages')
.select()
.order('created_at', ascending: false);
return data.map(TicketMessage.fromMap).toList();
},
fromMap: TicketMessage.fromMap,
);
ref.onDispose(wrapper.dispose);
return wrapper.stream.map((result) => result.data);
}); });
final taskMessagesProvider = StreamProvider.family<List<TicketMessage>, String>( final taskMessagesProvider = StreamProvider.family<List<TicketMessage>, String>(
(ref, taskId) { (ref, taskId) {
final client = ref.watch(supabaseClientProvider); final client = ref.watch(supabaseClientProvider);
return client
final wrapper = StreamRecoveryWrapper<TicketMessage>(
stream: client
.from('ticket_messages') .from('ticket_messages')
.stream(primaryKey: ['id']) .stream(primaryKey: ['id'])
.eq('task_id', taskId) .eq('task_id', taskId)
.order('created_at', ascending: false) .order('created_at', ascending: false),
.map((rows) => rows.map(TicketMessage.fromMap).toList()); onPollData: () async {
final data = await client
.from('ticket_messages')
.select()
.eq('task_id', taskId)
.order('created_at', ascending: false);
return data.map(TicketMessage.fromMap).toList();
},
fromMap: TicketMessage.fromMap,
);
ref.onDispose(wrapper.dispose);
return wrapper.stream.map((result) => result.data);
}, },
); );

View File

@ -30,11 +30,12 @@ class TypingIndicatorState {
} }
} }
final typingIndicatorProvider = StateNotifierProvider.autoDispose final typingIndicatorProvider =
.family<TypingIndicatorController, TypingIndicatorState, String>(( StateNotifierProvider.family<
ref, TypingIndicatorController,
ticketId, TypingIndicatorState,
) { String
>((ref, ticketId) {
final client = ref.watch(supabaseClientProvider); final client = ref.watch(supabaseClientProvider);
final controller = TypingIndicatorController(client, ticketId); final controller = TypingIndicatorController(client, ticketId);
return controller; return controller;
@ -65,30 +66,14 @@ class TypingIndicatorController extends StateNotifier<TypingIndicatorState> {
channel.onBroadcast( channel.onBroadcast(
event: 'typing', event: 'typing',
callback: (payload) { callback: (payload) {
// Prevent any work if we're already disposing. Log stack for diagnostics. try {
if (_disposed || !mounted) { if (_disposed || !mounted) return;
if (kDebugMode) {
debugPrint(
'TypingIndicatorController: onBroadcast skipped (disposed|unmounted)',
);
debugPrint(StackTrace.current.toString());
}
return;
}
final Map<String, dynamic> data = _extractPayload(payload); final Map<String, dynamic> data = _extractPayload(payload);
final userId = data['user_id'] as String?; final userId = data['user_id'] as String?;
final rawType = data['type']?.toString(); final rawType = data['type']?.toString();
final currentUserId = _client.auth.currentUser?.id; final currentUserId = _client.auth.currentUser?.id;
if (_disposed || !mounted) { if (_disposed || !mounted) return;
if (kDebugMode) {
debugPrint(
'TypingIndicatorController: payload received but controller disposed/unmounted',
);
debugPrint(StackTrace.current.toString());
}
return;
}
state = state.copyWith(lastPayload: data); state = state.copyWith(lastPayload: data);
if (userId == null || userId == currentUserId) { if (userId == null || userId == currentUserId) {
return; return;
@ -98,112 +83,141 @@ class TypingIndicatorController extends StateNotifier<TypingIndicatorState> {
return; return;
} }
_markRemoteTyping(userId); _markRemoteTyping(userId);
} catch (e, st) {
debugPrint(
'TypingIndicatorController: broadcast callback error: $e\n$st',
);
}
}, },
); );
channel.subscribe((status, error) { channel.subscribe((status, error) {
if (_disposed || !mounted) { try {
if (kDebugMode) { if (_disposed || !mounted) return;
debugPrint(
'TypingIndicatorController: subscribe callback skipped (disposed|unmounted)',
);
debugPrint(StackTrace.current.toString());
}
return;
}
state = state.copyWith(channelStatus: status.name); state = state.copyWith(channelStatus: status.name);
if (error != null) {
debugPrint('TypingIndicatorController: subscribe error: $error');
}
} catch (e, st) {
debugPrint(
'TypingIndicatorController: subscribe callback error: $e\n$st',
);
}
}); });
_channel = channel; _channel = channel;
} }
Map<String, dynamic> _extractPayload(dynamic payload) { Map<String, dynamic> _extractPayload(dynamic payload) {
if (payload is Map<String, dynamic>) { // The realtime client can wrap the actual broadcast payload inside
final inner = payload['payload']; // several nested fields (e.g. {payload: {payload: {...}}}). Walk the
if (inner is Map<String, dynamic>) { // object until we find a map containing `user_id` or `type` keys which
return inner; // represent the actual typing payload.
try {
dynamic current = payload;
for (var i = 0; i < 6; i++) {
if (current is Map<String, dynamic>) {
// Only return when we actually find the `user_id`, otherwise try
// to unwrap nested envelopes. Some wrappers include `type: broadcast`
// at the top-level which should not be treated as the message.
if (current.containsKey('user_id')) {
return Map<String, dynamic>.from(current);
} }
return payload; if (current.containsKey('payload') &&
current['payload'] is Map<String, dynamic>) {
current = current['payload'];
continue;
} }
final dynamic inner = payload.payload; // Some realtime envelope stores the payload at `data`.
if (inner is Map<String, dynamic>) { if (current.containsKey('data') &&
return inner; current['data'] is Map<String, dynamic>) {
current = current['data'];
continue;
} }
break;
}
// Try common field on wrapper objects (e.g. RealtimeMessage.payload)
try {
final dyn = (current as dynamic).payload;
if (dyn is Map<String, dynamic>) {
current = dyn;
continue;
}
} catch (_) {
break;
}
}
} catch (_) {}
// As a last-resort, do a shallow recursive search for a map containing
// `user_id` in case the realtime client used a Map<dynamic,dynamic>
// shape that wasn't caught above.
try {
Map<String, dynamic>? found;
void search(dynamic node, int depth) {
if (found != null || depth > 4) return;
if (node is Map) {
try {
final m = Map<String, dynamic>.from(node);
if (m.containsKey('user_id')) {
found = m;
return;
}
for (final v in m.values) {
search(v, depth + 1);
if (found != null) return;
}
} catch (_) {
// ignore conversion errors
}
} else if (node is Iterable) {
for (final v in node) {
search(v, depth + 1);
if (found != null) return;
}
}
}
search(payload, 0);
if (found != null) return found!;
} catch (_) {}
return <String, dynamic>{}; return <String, dynamic>{};
} }
void userTyping() { void userTyping() {
if (_disposed || !mounted) { if (_disposed || !mounted) return;
if (kDebugMode) {
debugPrint(
'TypingIndicatorController.userTyping() ignored after dispose',
);
}
return;
}
if (_client.auth.currentUser?.id == null) return; if (_client.auth.currentUser?.id == null) return;
_sendTypingEvent('start'); _sendTypingEvent('start');
_typingTimer?.cancel(); _typingTimer?.cancel();
_typingTimer = Timer(const Duration(milliseconds: 150), () { // Debounce sending the stop event slightly so quick pauses don't spam
if (_disposed || !mounted) { // the network. 150ms was short and caused frequent start/stop bursts;
if (kDebugMode) { // increase to 600ms to stabilize UX.
debugPrint( _typingTimer = Timer(const Duration(milliseconds: 600), () {
'TypingIndicatorController._typingTimer callback ignored after dispose', if (_disposed || !mounted) return;
);
}
return;
}
_sendTypingEvent('stop'); _sendTypingEvent('stop');
}); });
} }
void stopTyping() { void stopTyping() {
if (_disposed || !mounted) { if (_disposed || !mounted) return;
if (kDebugMode) {
debugPrint(
'TypingIndicatorController.stopTyping() ignored after dispose',
);
}
return;
}
_typingTimer?.cancel(); _typingTimer?.cancel();
_sendTypingEvent('stop'); _sendTypingEvent('stop');
} }
void _markRemoteTyping(String userId) { void _markRemoteTyping(String userId) {
if (_disposed || !mounted) { if (_disposed || !mounted) return;
if (kDebugMode) {
debugPrint(
'TypingIndicatorController._markRemoteTyping ignored after dispose for user: $userId',
);
debugPrint(StackTrace.current.toString());
}
return;
}
final updated = {...state.userIds, userId}; final updated = {...state.userIds, userId};
if (_disposed || !mounted) return; if (_disposed || !mounted) return;
state = state.copyWith(userIds: updated); state = state.copyWith(userIds: updated);
_remoteTimeouts[userId]?.cancel(); _remoteTimeouts[userId]?.cancel();
_remoteTimeouts[userId] = Timer(const Duration(milliseconds: 400), () { // Extend timeout to 2500ms to accommodate brief realtime interruptions
if (_disposed || !mounted) { // (auth refresh, channel reconnect, message processing, etc.) without
if (kDebugMode) { // clearing the presence. This gives a smoother typing experience.
debugPrint( _remoteTimeouts[userId] = Timer(const Duration(milliseconds: 3500), () {
'TypingIndicatorController.remote timeout callback ignored after dispose for user: $userId', if (_disposed || !mounted) return;
);
}
return;
}
_clearRemoteTyping(userId); _clearRemoteTyping(userId);
}); });
} }
void _clearRemoteTyping(String userId) { void _clearRemoteTyping(String userId) {
if (_disposed || !mounted) { if (_disposed || !mounted) return;
if (kDebugMode) {
debugPrint(
'TypingIndicatorController._clearRemoteTyping ignored after dispose for user: $userId',
);
}
return;
}
final updated = {...state.userIds}..remove(userId); final updated = {...state.userIds}..remove(userId);
if (_disposed || !mounted) return; if (_disposed || !mounted) return;
state = state.copyWith(userIds: updated); state = state.copyWith(userIds: updated);

View File

@ -3,14 +3,28 @@ import 'package:supabase_flutter/supabase_flutter.dart';
import '../models/user_office.dart'; import '../models/user_office.dart';
import 'supabase_provider.dart'; import 'supabase_provider.dart';
import 'stream_recovery.dart';
final userOfficesProvider = StreamProvider<List<UserOffice>>((ref) { final userOfficesProvider = StreamProvider<List<UserOffice>>((ref) {
final client = ref.watch(supabaseClientProvider); final client = ref.watch(supabaseClientProvider);
return client
final wrapper = StreamRecoveryWrapper<UserOffice>(
stream: client
.from('user_offices') .from('user_offices')
.stream(primaryKey: ['user_id', 'office_id']) .stream(primaryKey: ['user_id', 'office_id'])
.order('created_at') .order('created_at'),
.map((rows) => rows.map(UserOffice.fromMap).toList()); onPollData: () async {
final data = await client
.from('user_offices')
.select()
.order('created_at');
return data.map(UserOffice.fromMap).toList();
},
fromMap: UserOffice.fromMap,
);
ref.onDispose(wrapper.dispose);
return wrapper.stream.map((result) => result.data);
}); });
final userOfficesControllerProvider = Provider<UserOfficesController>((ref) { final userOfficesControllerProvider = Provider<UserOfficesController>((ref) {

View File

@ -6,6 +6,7 @@ import '../models/duty_schedule.dart';
import '../models/swap_request.dart'; import '../models/swap_request.dart';
import 'profile_provider.dart'; import 'profile_provider.dart';
import 'supabase_provider.dart'; import 'supabase_provider.dart';
import 'stream_recovery.dart';
final geofenceProvider = FutureProvider<GeofenceConfig?>((ref) async { final geofenceProvider = FutureProvider<GeofenceConfig?>((ref) async {
final client = ref.watch(supabaseClientProvider); final client = ref.watch(supabaseClientProvider);
@ -28,16 +29,30 @@ final dutySchedulesProvider = StreamProvider<List<DutySchedule>>((ref) {
} }
final isAdmin = profile.role == 'admin' || profile.role == 'dispatcher'; final isAdmin = profile.role == 'admin' || profile.role == 'dispatcher';
final base = client.from('duty_schedules').stream(primaryKey: ['id']);
if (isAdmin) { final wrapper = StreamRecoveryWrapper<DutySchedule>(
return base stream: isAdmin
? client
.from('duty_schedules')
.stream(primaryKey: ['id'])
.order('start_time') .order('start_time')
.map((rows) => rows.map(DutySchedule.fromMap).toList()); : client
} .from('duty_schedules')
return base .stream(primaryKey: ['id'])
.eq('user_id', profile.id) .eq('user_id', profile.id)
.order('start_time') .order('start_time'),
.map((rows) => rows.map(DutySchedule.fromMap).toList()); onPollData: () async {
final query = client.from('duty_schedules').select();
final data = isAdmin
? await query.order('start_time')
: await query.eq('user_id', profile.id).order('start_time');
return data.map(DutySchedule.fromMap).toList();
},
fromMap: DutySchedule.fromMap,
);
ref.onDispose(wrapper.dispose);
return wrapper.stream.map((result) => result.data);
}); });
/// Fetch duty schedules by a list of IDs (used by UI when swap requests reference /// Fetch duty schedules by a list of IDs (used by UI when swap requests reference
@ -88,24 +103,36 @@ final swapRequestsProvider = StreamProvider<List<SwapRequest>>((ref) {
} }
final isAdmin = profile.role == 'admin' || profile.role == 'dispatcher'; final isAdmin = profile.role == 'admin' || profile.role == 'dispatcher';
final base = client.from('swap_requests').stream(primaryKey: ['id']);
if (isAdmin) { final wrapper = StreamRecoveryWrapper<SwapRequest>(
return base stream: isAdmin
? client
.from('swap_requests')
.stream(primaryKey: ['id'])
.order('created_at', ascending: false) .order('created_at', ascending: false)
.map((rows) => rows.map(SwapRequest.fromMap).toList()); : client
} .from('swap_requests')
return base .stream(primaryKey: ['id'])
.order('created_at', ascending: false) .order('created_at', ascending: false),
.map( onPollData: () async {
(rows) => rows final data = await client
.from('swap_requests')
.select()
.order('created_at', ascending: false);
return data.map(SwapRequest.fromMap).toList();
},
fromMap: SwapRequest.fromMap,
);
ref.onDispose(wrapper.dispose);
return wrapper.stream.map((result) {
return result.data
.where( .where(
(row) => (row) =>
row['requester_id'] == profile.id || row.requesterId == profile.id || row.recipientId == profile.id,
row['recipient_id'] == profile.id,
) )
.map(SwapRequest.fromMap) .toList();
.toList(), });
);
}); });
final workforceControllerProvider = Provider<WorkforceController>((ref) { final workforceControllerProvider = Provider<WorkforceController>((ref) {

View File

@ -161,13 +161,17 @@ final appRouterProvider = Provider<GoRouter>((ref) {
class RouterNotifier extends ChangeNotifier { class RouterNotifier extends ChangeNotifier {
RouterNotifier(this.ref) { RouterNotifier(this.ref) {
_authSub = ref.listen(authStateChangesProvider, (previous, next) { _authSub = ref.listen(authStateChangesProvider, (previous, next) {
// Enforce auth-level ban when a session becomes available. // Only enforce lock on successful sign-in events, not on every auth state change
if (next is AsyncData) { if (next is AsyncData) {
final authState = next.value; final authState = next.value;
final session = authState?.session; final session = authState?.session;
if (session != null) { // Only check for bans when we have a session and the previous state didn't
// Fire-and-forget enforcement (best-effort client-side sign-out) final previousSession = previous is AsyncData
enforceLockForCurrentUser(ref.read(supabaseClientProvider)); ? previous.value?.session
: null;
if (session != null && previousSession == null) {
// User just signed in; enforce lock check
_enforceLockAsync();
} }
} }
notifyListeners(); notifyListeners();
@ -180,6 +184,25 @@ class RouterNotifier extends ChangeNotifier {
final Ref ref; final Ref ref;
late final ProviderSubscription _authSub; late final ProviderSubscription _authSub;
late final ProviderSubscription _profileSub; late final ProviderSubscription _profileSub;
bool _lockEnforcementInProgress = false;
/// Safely enforce lock in the background, preventing concurrent calls
void _enforceLockAsync() {
// Prevent concurrent enforcement calls
if (_lockEnforcementInProgress) return;
_lockEnforcementInProgress = true;
// Use Future.microtask to defer execution and avoid blocking
Future.microtask(() async {
try {
await enforceLockForCurrentUser(ref.read(supabaseClientProvider));
} catch (e) {
debugPrint('RouterNotifier: lock enforcement error: $e');
} finally {
_lockEnforcementInProgress = false;
}
});
}
@override @override
void dispose() { void dispose() {

View File

@ -13,6 +13,7 @@ import '../../providers/profile_provider.dart';
import '../../providers/tasks_provider.dart'; import '../../providers/tasks_provider.dart';
import '../../providers/tickets_provider.dart'; import '../../providers/tickets_provider.dart';
import '../../widgets/responsive_body.dart'; import '../../widgets/responsive_body.dart';
import '../../widgets/reconnect_overlay.dart';
import '../../providers/realtime_controller.dart'; import '../../providers/realtime_controller.dart';
import 'package:skeletonizer/skeletonizer.dart'; import 'package:skeletonizer/skeletonizer.dart';
import '../../theme/app_surfaces.dart'; import '../../theme/app_surfaces.dart';
@ -353,7 +354,7 @@ class _DashboardScreenState extends State<DashboardScreen> {
return ResponsiveBody( return ResponsiveBody(
child: Skeletonizer( child: Skeletonizer(
enabled: realtime.isConnecting, enabled: realtime.isAnyStreamRecovering,
child: LayoutBuilder( child: LayoutBuilder(
builder: (context, constraints) { builder: (context, constraints) {
final sections = <Widget>[ final sections = <Widget>[
@ -449,43 +450,11 @@ class _DashboardScreenState extends State<DashboardScreen> {
), ),
), ),
), ),
if (realtime.isConnecting) if (realtime.isAnyStreamRecovering)
Positioned.fill( const Positioned(
child: AbsorbPointer( bottom: 16,
absorbing: true, right: 16,
child: Container( child: ReconnectIndicator(),
color: Theme.of(
context,
).colorScheme.surface.withAlpha((0.35 * 255).round()),
alignment: Alignment.topCenter,
padding: const EdgeInsets.only(top: 36),
child: SizedBox(
width: 280,
child: Card(
elevation: 4,
child: Padding(
padding: const EdgeInsets.all(12.0),
child: Row(
mainAxisSize: MainAxisSize.min,
children: const [
SizedBox(
width: 20,
height: 20,
child: CircularProgressIndicator(
strokeWidth: 2,
),
),
SizedBox(width: 12),
Expanded(
child: Text('Reconnecting realtime…'),
),
],
),
),
),
),
),
),
), ),
], ],
); );

View File

@ -239,7 +239,7 @@ class _TaskDetailScreenState extends ConsumerState<TaskDetailScreen>
final realtime = ref.watch(realtimeControllerProvider); final realtime = ref.watch(realtimeControllerProvider);
final isRetrieving = final isRetrieving =
realtime.isConnecting || realtime.isAnyStreamRecovering ||
tasksAsync.isLoading || tasksAsync.isLoading ||
ticketsAsync.isLoading || ticketsAsync.isLoading ||
officesAsync.isLoading || officesAsync.isLoading ||
@ -2684,6 +2684,21 @@ class _TaskDetailScreenState extends ConsumerState<TaskDetailScreen>
final typingController = _maybeTypingController(typingChannelId); final typingController = _maybeTypingController(typingChannelId);
typingController?.stopTyping(); typingController?.stopTyping();
// Capture mentioned user ids and clear the composer immediately so the
// UI does not block while the network call completes. Perform the send
// and mention notification creation in a background Future.
final mentionUserIds = _extractMentionedUserIds(
content,
profiles,
currentUserId,
);
if (mounted) {
_messageController.clear();
_clearMentions();
}
Future(() async {
try {
final message = await ref final message = await ref
.read(ticketsControllerProvider) .read(ticketsControllerProvider)
.sendTaskMessage( .sendTaskMessage(
@ -2691,12 +2706,9 @@ class _TaskDetailScreenState extends ConsumerState<TaskDetailScreen>
ticketId: task.ticketId, ticketId: task.ticketId,
content: content, content: content,
); );
final mentionUserIds = _extractMentionedUserIds(
content,
profiles,
currentUserId,
);
if (mentionUserIds.isNotEmpty && currentUserId != null) { if (mentionUserIds.isNotEmpty && currentUserId != null) {
try {
await ref await ref
.read(notificationsControllerProvider) .read(notificationsControllerProvider)
.createMentionNotifications( .createMentionNotifications(
@ -2706,15 +2718,12 @@ class _TaskDetailScreenState extends ConsumerState<TaskDetailScreen>
taskId: task.id, taskId: task.id,
messageId: message.id, messageId: message.id,
); );
} catch (_) {}
} }
ref.invalidate(taskMessagesProvider(task.id)); } catch (e, st) {
if (task.ticketId != null) { debugPrint('sendTaskMessage error: $e\n$st');
ref.invalidate(ticketMessagesProvider(task.ticketId!));
}
if (mounted) {
_messageController.clear();
_clearMentions();
} }
});
} }
void _handleComposerChanged( void _handleComposerChanged(

View File

@ -104,7 +104,7 @@ class _TasksListScreenState extends ConsumerState<TasksListScreen>
final realtime = ref.watch(realtimeControllerProvider); final realtime = ref.watch(realtimeControllerProvider);
final showSkeleton = final showSkeleton =
realtime.isConnecting || realtime.isAnyStreamRecovering ||
tasksAsync.maybeWhen(loading: () => true, orElse: () => false) || tasksAsync.maybeWhen(loading: () => true, orElse: () => false) ||
ticketsAsync.maybeWhen(loading: () => true, orElse: () => false) || ticketsAsync.maybeWhen(loading: () => true, orElse: () => false) ||
officesAsync.maybeWhen(loading: () => true, orElse: () => false) || officesAsync.maybeWhen(loading: () => true, orElse: () => false) ||
@ -534,7 +534,7 @@ class _TasksListScreenState extends ConsumerState<TasksListScreen>
), ),
), ),
), ),
const ReconnectOverlay(), const ReconnectIndicator(),
], ],
); );
} }

View File

@ -528,15 +528,27 @@ class _TicketDetailScreenState extends ConsumerState<TicketDetailScreen> {
_maybeTypingController(widget.ticketId)?.stopTyping(); _maybeTypingController(widget.ticketId)?.stopTyping();
final message = await ref // Capture mentions and clear the composer immediately so the UI
.read(ticketsControllerProvider) // remains snappy. Perform the network send and notification creation
.sendTicketMessage(ticketId: widget.ticketId, content: content); // in a fire-and-forget background Future.
final mentionUserIds = _extractMentionedUserIds( final mentionUserIds = _extractMentionedUserIds(
content, content,
profiles, profiles,
currentUserId, currentUserId,
); );
if (mounted) {
_messageController.clear();
_clearMentions();
}
Future(() async {
try {
final message = await ref
.read(ticketsControllerProvider)
.sendTicketMessage(ticketId: widget.ticketId, content: content);
if (mentionUserIds.isNotEmpty && currentUserId != null) { if (mentionUserIds.isNotEmpty && currentUserId != null) {
try {
await ref await ref
.read(notificationsControllerProvider) .read(notificationsControllerProvider)
.createMentionNotifications( .createMentionNotifications(
@ -545,12 +557,12 @@ class _TicketDetailScreenState extends ConsumerState<TicketDetailScreen> {
ticketId: widget.ticketId, ticketId: widget.ticketId,
messageId: message.id, messageId: message.id,
); );
} catch (_) {}
} }
ref.invalidate(ticketMessagesProvider(widget.ticketId)); } catch (e, st) {
if (mounted) { debugPrint('sendTicketMessage error: $e\n$st');
_messageController.clear();
_clearMentions();
} }
});
} }
List<String> _extractMentionedUserIds( List<String> _extractMentionedUserIds(

View File

@ -64,7 +64,7 @@ class _TicketsListScreenState extends ConsumerState<TicketsListScreen> {
final profilesAsync = ref.watch(profilesProvider); final profilesAsync = ref.watch(profilesProvider);
final showSkeleton = final showSkeleton =
realtime.isConnecting || realtime.isAnyStreamRecovering ||
ticketsAsync.maybeWhen(loading: () => true, orElse: () => false) || ticketsAsync.maybeWhen(loading: () => true, orElse: () => false) ||
officesAsync.maybeWhen(loading: () => true, orElse: () => false) || officesAsync.maybeWhen(loading: () => true, orElse: () => false) ||
profilesAsync.maybeWhen(loading: () => true, orElse: () => false) || profilesAsync.maybeWhen(loading: () => true, orElse: () => false) ||
@ -347,7 +347,7 @@ class _TicketsListScreenState extends ConsumerState<TicketsListScreen> {
), ),
), ),
), ),
const ReconnectOverlay(), const ReconnectIndicator(),
], ],
); );
} }

View File

@ -4,7 +4,6 @@ import 'package:firebase_messaging/firebase_messaging.dart';
import '../models/notification_item.dart'; import '../models/notification_item.dart';
import '../providers/notifications_provider.dart'; import '../providers/notifications_provider.dart';
import '../providers/realtime_controller.dart';
/// Wraps the app and installs both a Supabase realtime listener and the /// Wraps the app and installs both a Supabase realtime listener and the
/// FCM handlers described in the frontend design. /// FCM handlers described in the frontend design.
@ -46,11 +45,11 @@ class _NotificationBridgeState extends ConsumerState<NotificationBridge>
@override @override
void didChangeAppLifecycleState(AppLifecycleState state) { void didChangeAppLifecycleState(AppLifecycleState state) {
super.didChangeAppLifecycleState(state); super.didChangeAppLifecycleState(state);
// App lifecycle is now monitored, but individual streams handle their own
// recovery via StreamRecoveryWrapper. This no longer forces a global reconnect,
// which was the blocking behavior users complained about.
if (state == AppLifecycleState.resumed) { if (state == AppLifecycleState.resumed) {
try { // Future: Could trigger stream-specific recovery hints if needed.
// Trigger a best-effort realtime reconnection when the app resumes.
ref.read(realtimeControllerProvider).recoverConnection();
} catch (_) {}
} }
} }

View File

@ -3,127 +3,60 @@ import 'package:flutter_riverpod/flutter_riverpod.dart';
import '../providers/realtime_controller.dart'; import '../providers/realtime_controller.dart';
class ReconnectOverlay extends ConsumerWidget { /// Subtle, non-blocking connection status indicator.
const ReconnectOverlay({super.key}); /// Shows in the bottom-right corner when streams are recovering/stale.
/// Unlike the old blocking overlay, this does NOT prevent user interaction.
class ReconnectIndicator extends ConsumerWidget {
const ReconnectIndicator({super.key});
@override @override
Widget build(BuildContext context, WidgetRef ref) { Widget build(BuildContext context, WidgetRef ref) {
final ctrl = ref.watch(realtimeControllerProvider); final ctrl = ref.watch(realtimeControllerProvider);
if (!ctrl.isConnecting && !ctrl.isFailed) return const SizedBox.shrink();
if (ctrl.isFailed) { // Hide when not recovering
return Positioned.fill( if (!ctrl.isAnyStreamRecovering) {
child: AbsorbPointer( return const SizedBox.shrink();
absorbing: true,
child: Center(
child: SizedBox(
width: 420,
child: Card(
elevation: 6,
child: Padding(
padding: const EdgeInsets.all(16.0),
child: Column(
mainAxisSize: MainAxisSize.min,
children: [
Text(
'Realtime connection failed',
style: Theme.of(context).textTheme.titleMedium,
),
const SizedBox(height: 8),
Text(
ctrl.lastError ??
'Unable to reconnect after multiple attempts.',
style: Theme.of(context).textTheme.bodyMedium,
),
const SizedBox(height: 12),
Row(
mainAxisAlignment: MainAxisAlignment.end,
children: [
TextButton(
onPressed: () => ctrl.retry(),
child: const Text('Retry'),
),
],
),
],
),
),
),
),
),
),
);
} }
// isConnecting: show richer skeleton-like placeholders return Positioned(
return Positioned.fill( bottom: 16,
child: AbsorbPointer( right: 16,
absorbing: true,
child: Container( child: Container(
color: Theme.of( padding: const EdgeInsets.symmetric(horizontal: 12, vertical: 8),
context, decoration: BoxDecoration(
).colorScheme.surface.withAlpha((0.35 * 255).round()), color: Theme.of(context).colorScheme.surface,
child: Center( border: Border.all(
child: SizedBox( color: Theme.of(context).colorScheme.outline,
width: 640, width: 1,
child: Card( ),
elevation: 4, borderRadius: BorderRadius.circular(8),
child: Padding( boxShadow: [
padding: const EdgeInsets.all(16.0), BoxShadow(
child: Column( color: Colors.black.withAlpha(3),
blurRadius: 8,
offset: const Offset(0, 2),
),
],
),
child: Row(
mainAxisSize: MainAxisSize.min, mainAxisSize: MainAxisSize.min,
crossAxisAlignment: CrossAxisAlignment.stretch,
children: [ children: [
// Header SizedBox(
Container( width: 12,
height: 20,
decoration: BoxDecoration(
color: Theme.of(
context,
).colorScheme.surfaceContainerHighest,
borderRadius: BorderRadius.circular(6),
),
),
const SizedBox(height: 12),
// chips row
Row(
children: [
for (var i = 0; i < 3; i++)
Padding(
padding: const EdgeInsets.only(right: 8.0),
child: Container(
width: 100,
height: 14,
decoration: BoxDecoration(
color: Theme.of(
context,
).colorScheme.surfaceContainerHighest,
borderRadius: BorderRadius.circular(6),
),
),
),
],
),
const SizedBox(height: 16),
// lines representing content
for (var i = 0; i < 4; i++) ...[
Container(
height: 12, height: 12,
margin: const EdgeInsets.only(bottom: 8), child: CircularProgressIndicator(
decoration: BoxDecoration( strokeWidth: 2,
color: Theme.of( valueColor: AlwaysStoppedAnimation(
context, Theme.of(context).colorScheme.primary,
).colorScheme.surfaceContainerHighest,
borderRadius: BorderRadius.circular(6),
), ),
), ),
),
const SizedBox(width: 8),
Text(
'Reconnecting...',
style: Theme.of(context).textTheme.labelSmall,
),
], ],
],
),
),
),
),
),
), ),
), ),
); );