diff --git a/lib/providers/tasks_provider.dart b/lib/providers/tasks_provider.dart index 880b45fb..6b6cf7d7 100644 --- a/lib/providers/tasks_provider.dart +++ b/lib/providers/tasks_provider.dart @@ -105,6 +105,59 @@ class TaskQuery { } } +/// Builds the isolate payload from a list of [Task] objects and the current +/// query/access context. Extracted so the initial REST seed and the realtime +/// stream listener can share the same logic without duplication. +Map _buildTaskPayload({ + required List tasks, + required bool isGlobal, + required List allowedTicketIds, + required List allowedOfficeIds, + required TaskQuery query, +}) { + final rowsList = tasks + .map( + (task) => { + 'id': task.id, + 'task_number': task.taskNumber, + 'office_id': task.officeId, + 'ticket_id': task.ticketId, + 'title': task.title, + 'description': task.description, + 'status': task.status, + 'priority': task.priority, + 'creator_id': task.creatorId, + 'created_at': task.createdAt.toIso8601String(), + 'started_at': task.startedAt?.toIso8601String(), + 'completed_at': task.completedAt?.toIso8601String(), + 'requested_by': task.requestedBy, + 'noted_by': task.notedBy, + 'received_by': task.receivedBy, + 'queue_order': task.queueOrder, + 'request_type': task.requestType, + 'request_type_other': task.requestTypeOther, + 'request_category': task.requestCategory, + 'action_taken': task.actionTaken, + 'cancellation_reason': task.cancellationReason, + 'cancelled_at': task.cancelledAt?.toIso8601String(), + }, + ) + .toList(); + + return { + 'rows': rowsList, + 'isGlobal': isGlobal, + 'allowedTicketIds': allowedTicketIds, + 'allowedOfficeIds': allowedOfficeIds, + 'officeId': query.officeId, + 'status': query.status, + 'searchQuery': query.searchQuery, + 'taskNumber': query.taskNumber, + 'dateStart': query.dateRange?.start.millisecondsSinceEpoch, + 'dateEnd': query.dateRange?.end.millisecondsSinceEpoch, + }; +} + final tasksProvider = StreamProvider>((ref) { final client = ref.watch(supabaseClientProvider); final profileAsync = ref.watch(currentProfileProvider); @@ -122,19 +175,18 @@ final tasksProvider = StreamProvider>((ref) { profile.role == 'dispatcher' || profile.role == 'it_staff'; - // For RBAC early-exit: if the user has no accessible tickets/offices, - // avoid subscribing to the full tasks stream. - List earlyAllowedTicketIds = - ticketsAsync.valueOrNull?.map((ticket) => ticket.id).toList() ?? - []; - List earlyOfficeIds = + final allowedTicketIds = + ticketsAsync.valueOrNull?.map((t) => t.id).toList() ?? []; + final allowedOfficeIds = assignmentsAsync.valueOrNull - ?.where((assignment) => assignment.userId == profile.id) - .map((assignment) => assignment.officeId) + ?.where((a) => a.userId == profile.id) + .map((a) => a.officeId) .toSet() .toList() ?? []; - if (!isGlobal && earlyAllowedTicketIds.isEmpty && earlyOfficeIds.isEmpty) { + + // For non-global users with no assigned offices/tickets, skip subscribing. + if (!isGlobal && allowedTicketIds.isEmpty && allowedOfficeIds.isEmpty) { return Stream.value(const []); } @@ -150,70 +202,92 @@ final tasksProvider = StreamProvider>((ref) { ref.onDispose(wrapper.dispose); - // Process tasks with filtering/pagination after recovery - return wrapper.stream.asyncMap((result) async { - final rowsList = result.data - .map( - (task) => { - 'id': task.id, - 'task_number': task.taskNumber, - 'office_id': task.officeId, - 'ticket_id': task.ticketId, - 'title': task.title, - 'description': task.description, - 'status': task.status, - 'priority': task.priority, - 'creator_id': task.creatorId, - 'created_at': task.createdAt.toIso8601String(), - 'started_at': task.startedAt?.toIso8601String(), - 'completed_at': task.completedAt?.toIso8601String(), - 'requested_by': task.requestedBy, - 'noted_by': task.notedBy, - 'received_by': task.receivedBy, - 'queue_order': task.queueOrder, - 'request_type': task.requestType, - 'request_type_other': task.requestTypeOther, - 'request_category': task.requestCategory, - 'action_taken': task.actionTaken, - 'cancellation_reason': task.cancellationReason, - 'cancelled_at': task.cancelledAt?.toIso8601String(), - }, - ) - .toList(); + var lastResultHash = ''; + Timer? debounceTimer; + // broadcast() so Riverpod and any other listener can both receive events. + final controller = StreamController>.broadcast(); - final allowedTicketIds = - ticketsAsync.valueOrNull?.map((ticket) => ticket.id).toList() ?? - []; - final allowedOfficeIds = - assignmentsAsync.valueOrNull - ?.where((assignment) => assignment.userId == profile.id) - .map((assignment) => assignment.officeId) - .toList() ?? - []; + void emitDebounced(List tasks) { + debounceTimer?.cancel(); + debounceTimer = Timer(const Duration(milliseconds: 150), () { + if (!controller.isClosed) controller.add(tasks); + }); + } - final payload = { - 'rows': rowsList, - 'isGlobal': isGlobal, - 'allowedTicketIds': allowedTicketIds, - 'allowedOfficeIds': allowedOfficeIds, - 'officeId': query.officeId, - 'status': query.status, - 'searchQuery': query.searchQuery, - 'taskNumber': query.taskNumber, - 'dateStart': query.dateRange?.start.millisecondsSinceEpoch, - 'dateEnd': query.dateRange?.end.millisecondsSinceEpoch, - }; - - final processed = await compute(_processTasksInIsolate, payload); - - final tasks = (processed as List) - .cast>() - .map(Task.fromMap) - .toList(); - - debugPrint('[tasksProvider] processed ${tasks.length} tasks'); - return tasks; + ref.onDispose(() { + debounceTimer?.cancel(); + controller.close(); }); + + // ── Immediate REST seed ─────────────────────────────────────────────────── + // Fire a one-shot HTTP fetch right now so the UI can render before the + // WebSocket realtime channel is fully established. Eliminates loading delay + // on web and initial flash on mobile. Hash check prevents a duplicate + // rebuild if both the seed and the realtime stream arrive with the same data. + unawaited( + Future(() async { + try { + final data = await client.from('tasks').select(); + final raw = data + .cast>() + .map(Task.fromMap) + .toList(); + final payload = _buildTaskPayload( + tasks: raw, + isGlobal: isGlobal, + allowedTicketIds: allowedTicketIds, + allowedOfficeIds: allowedOfficeIds, + query: query, + ); + final processed = await compute(_processTasksInIsolate, payload); + final tasks = (processed as List) + .cast>() + .map(Task.fromMap) + .toList(); + final hash = tasks.fold('', (h, t) => '$h${t.id}'); + if (!controller.isClosed && hash != lastResultHash) { + lastResultHash = hash; + controller.add(tasks); // emit immediately – no debounce + } + } catch (e) { + debugPrint('[tasksProvider] initial seed error: $e'); + } + }), + ); + + // ── Realtime stream ─────────────────────────────────────────────────────── + // Processes every realtime event through the same isolate. Debounced so + // rapid consecutive events (e.g. bulk inserts) don't cause repeated renders. + wrapper.stream + .asyncMap((result) async { + final payload = _buildTaskPayload( + tasks: result.data, + isGlobal: isGlobal, + allowedTicketIds: allowedTicketIds, + allowedOfficeIds: allowedOfficeIds, + query: query, + ); + final processed = await compute(_processTasksInIsolate, payload); + return (processed as List) + .cast>() + .map(Task.fromMap) + .toList(); + }) + .listen( + (tasks) { + final hash = tasks.fold('', (h, t) => '$h${t.id}'); + if (hash != lastResultHash) { + lastResultHash = hash; + emitDebounced(tasks); + } + }, + onError: (Object e) { + debugPrint('[tasksProvider] stream error: $e'); + controller.addError(e); + }, + ); + + return controller.stream; }); // Runs inside a background isolate to filter/sort tasks represented as diff --git a/lib/providers/tickets_provider.dart b/lib/providers/tickets_provider.dart index 3af91b9a..b585b5dd 100644 --- a/lib/providers/tickets_provider.dart +++ b/lib/providers/tickets_provider.dart @@ -120,6 +120,46 @@ class TicketQuery { } } +/// Builds the isolate payload from a list of [Ticket] objects and the current +/// query/access context. Extracted so the initial REST seed and the realtime +/// stream listener can share the same logic without duplication. +Map _buildTicketPayload({ + required List tickets, + required bool isGlobal, + required List allowedOfficeIds, + required TicketQuery query, +}) { + final rowsList = tickets + .map( + (ticket) => { + 'id': ticket.id, + 'subject': ticket.subject, + 'description': ticket.description, + 'status': ticket.status, + 'office_id': ticket.officeId, + 'creator_id': ticket.creatorId, + 'created_at': ticket.createdAt.toIso8601String(), + 'responded_at': ticket.respondedAt?.toIso8601String(), + 'promoted_at': ticket.promotedAt?.toIso8601String(), + 'closed_at': ticket.closedAt?.toIso8601String(), + }, + ) + .toList(); + + return { + 'rows': rowsList, + 'isGlobal': isGlobal, + 'allowedOfficeIds': allowedOfficeIds, + 'offset': query.offset, + 'limit': query.limit, + 'searchQuery': query.searchQuery, + 'officeId': query.officeId, + 'status': query.status, + 'dateStart': query.dateRange?.start.millisecondsSinceEpoch, + 'dateEnd': query.dateRange?.end.millisecondsSinceEpoch, + }; +} + final ticketsProvider = StreamProvider>((ref) { final client = ref.watch(supabaseClientProvider); final profileAsync = ref.watch(currentProfileProvider); @@ -136,11 +176,17 @@ final ticketsProvider = StreamProvider>((ref) { profile.role == 'dispatcher' || profile.role == 'it_staff'; + final allowedOfficeIds = + assignmentsAsync.valueOrNull + ?.where((a) => a.userId == profile.id) + .map((a) => a.officeId) + .toList() ?? + []; + // Wrap realtime stream with recovery logic final wrapper = StreamRecoveryWrapper( stream: client.from('tickets').stream(primaryKey: ['id']), onPollData: () async { - // Polling fallback: fetch all tickets once final data = await client.from('tickets').select(); return data.cast>().map(Ticket.fromMap).toList(); }, @@ -149,55 +195,91 @@ final ticketsProvider = StreamProvider>((ref) { ref.onDispose(wrapper.dispose); - // Process tickets with filtering/pagination after recovery - return wrapper.stream.asyncMap((result) async { - final rowsList = result.data - .map( - (ticket) => { - 'id': ticket.id, - 'subject': ticket.subject, - 'description': ticket.description, - 'status': ticket.status, - 'office_id': ticket.officeId, - 'creator_id': ticket.creatorId, - 'created_at': ticket.createdAt.toIso8601String(), - 'responded_at': ticket.respondedAt?.toIso8601String(), - 'promoted_at': ticket.promotedAt?.toIso8601String(), - 'closed_at': ticket.closedAt?.toIso8601String(), - }, - ) - .toList(); + var lastResultHash = ''; + Timer? debounceTimer; + // broadcast() so Riverpod and any other listener can both receive events. + final controller = StreamController>.broadcast(); - final allowedOfficeIds = - assignmentsAsync.valueOrNull - ?.where((assignment) => assignment.userId == profile.id) - .map((assignment) => assignment.officeId) - .toList() ?? - []; + void emitDebounced(List tickets) { + debounceTimer?.cancel(); + debounceTimer = Timer(const Duration(milliseconds: 150), () { + if (!controller.isClosed) controller.add(tickets); + }); + } - final payload = { - 'rows': rowsList, - 'isGlobal': isGlobal, - 'allowedOfficeIds': allowedOfficeIds, - 'offset': query.offset, - 'limit': query.limit, - 'searchQuery': query.searchQuery, - 'officeId': query.officeId, - 'status': query.status, - 'dateStart': query.dateRange?.start.millisecondsSinceEpoch, - 'dateEnd': query.dateRange?.end.millisecondsSinceEpoch, - }; - - final processed = await compute(_processTicketsInIsolate, payload); - - final tickets = (processed as List) - .cast>() - .map(Ticket.fromMap) - .toList(); - - debugPrint('[ticketsProvider] processed ${tickets.length} tickets'); - return tickets; + ref.onDispose(() { + debounceTimer?.cancel(); + controller.close(); }); + + // ── Immediate REST seed ─────────────────────────────────────────────────── + // Fire a one-shot HTTP fetch right now so the UI can render before the + // WebSocket realtime channel is fully established. This eliminates the + // loading delay on web (WebSocket ~200-500 ms) and the initial flash on + // mobile. The realtime stream takes over afterwards; the hash check below + // prevents a duplicate rebuild if both arrive with identical data. + unawaited( + Future(() async { + try { + final data = await client.from('tickets').select(); + final raw = data + .cast>() + .map(Ticket.fromMap) + .toList(); + final payload = _buildTicketPayload( + tickets: raw, + isGlobal: isGlobal, + allowedOfficeIds: allowedOfficeIds, + query: query, + ); + final processed = await compute(_processTicketsInIsolate, payload); + final tickets = (processed as List) + .cast>() + .map(Ticket.fromMap) + .toList(); + final hash = tickets.fold('', (h, t) => '$h${t.id}'); + if (!controller.isClosed && hash != lastResultHash) { + lastResultHash = hash; + controller.add(tickets); // emit immediately – no debounce + } + } catch (e) { + debugPrint('[ticketsProvider] initial seed error: $e'); + } + }), + ); + + // ── Realtime stream ─────────────────────────────────────────────────────── + // Processes every realtime event through the same isolate. Debounced so + // rapid consecutive events (e.g. bulk inserts) don't cause repeated renders. + wrapper.stream + .asyncMap((result) async { + final payload = _buildTicketPayload( + tickets: result.data, + isGlobal: isGlobal, + allowedOfficeIds: allowedOfficeIds, + query: query, + ); + final processed = await compute(_processTicketsInIsolate, payload); + return (processed as List) + .cast>() + .map(Ticket.fromMap) + .toList(); + }) + .listen( + (tickets) { + final hash = tickets.fold('', (h, t) => '$h${t.id}'); + if (hash != lastResultHash) { + lastResultHash = hash; + emitDebounced(tickets); + } + }, + onError: (Object e) { + debugPrint('[ticketsProvider] stream error: $e'); + controller.addError(e); + }, + ); + + return controller.stream; }); // Runs inside a background isolate. Accepts a serializable payload and