Fixed ghost channel subscription

This commit is contained in:
Marc Rejohn Castillano 2026-03-01 20:31:13 +08:00
parent 029e671367
commit 3950f3ee94
3 changed files with 19 additions and 5 deletions

View File

@ -329,12 +329,22 @@ class StreamRecoveryWrapper<T> {
_startRealtimeSubscription(); _startRealtimeSubscription();
} }
/// Clean up all resources. /// Clean up all resources and notify the status callback that this
/// channel is no longer active, preventing ghost entries in the
/// [RealtimeController]'s recovering-channels set.
void dispose() { void dispose() {
if (_disposed) return;
_disposed = true; _disposed = true;
_pollingTimer?.cancel(); _pollingTimer?.cancel();
_recoveryTimer?.cancel(); _recoveryTimer?.cancel();
_realtimeSub?.cancel(); _realtimeSub?.cancel();
// Ensure the channel is removed from the recovering set when the
// wrapper is torn down (e.g. provider disposed during navigation).
// Without this, disposed wrappers that were mid-recovery leave
// orphaned entries that keep the reconnection indicator spinning.
if (_connectionStatus != StreamConnectionStatus.connected) {
_onStatusChanged?.call(channelName, StreamConnectionStatus.connected);
}
_controller?.close(); _controller?.close();
} }
} }

View File

@ -261,7 +261,7 @@ final tasksProvider = StreamProvider<List<Task>>((ref) {
// Realtime stream // Realtime stream
// Processes every realtime event through the same isolate. Debounced so // Processes every realtime event through the same isolate. Debounced so
// rapid consecutive events (e.g. bulk inserts) don't cause repeated renders. // rapid consecutive events (e.g. bulk inserts) don't cause repeated renders.
wrapper.stream final wrapperSub = wrapper.stream
.asyncMap((result) async { .asyncMap((result) async {
final payload = _buildTaskPayload( final payload = _buildTaskPayload(
tasks: result.data, tasks: result.data,
@ -286,10 +286,12 @@ final tasksProvider = StreamProvider<List<Task>>((ref) {
}, },
onError: (Object e) { onError: (Object e) {
debugPrint('[tasksProvider] stream error: $e'); debugPrint('[tasksProvider] stream error: $e');
controller.addError(e); // Don't forward errors — the wrapper handles recovery internally.
}, },
); );
ref.onDispose(wrapperSub.cancel);
return controller.stream; return controller.stream;
}); });

View File

@ -256,7 +256,7 @@ final ticketsProvider = StreamProvider<List<Ticket>>((ref) {
// Realtime stream // Realtime stream
// Processes every realtime event through the same isolate. Debounced so // Processes every realtime event through the same isolate. Debounced so
// rapid consecutive events (e.g. bulk inserts) don't cause repeated renders. // rapid consecutive events (e.g. bulk inserts) don't cause repeated renders.
wrapper.stream final wrapperSub = wrapper.stream
.asyncMap((result) async { .asyncMap((result) async {
final payload = _buildTicketPayload( final payload = _buildTicketPayload(
tickets: result.data, tickets: result.data,
@ -280,10 +280,12 @@ final ticketsProvider = StreamProvider<List<Ticket>>((ref) {
}, },
onError: (Object e) { onError: (Object e) {
debugPrint('[ticketsProvider] stream error: $e'); debugPrint('[ticketsProvider] stream error: $e');
controller.addError(e); // Don't forward errors — the wrapper handles recovery internally.
}, },
); );
ref.onDispose(wrapperSub.cancel);
return controller.stream; return controller.stream;
}); });