import 'dart:async'; import 'package:flutter/foundation.dart'; import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:supabase_flutter/supabase_flutter.dart'; import 'supabase_provider.dart'; class TypingIndicatorState { const TypingIndicatorState({ required this.userIds, required this.channelStatus, required this.lastPayload, }); final Set userIds; final String channelStatus; final Map lastPayload; TypingIndicatorState copyWith({ Set? userIds, String? channelStatus, Map? lastPayload, }) { return TypingIndicatorState( userIds: userIds ?? this.userIds, channelStatus: channelStatus ?? this.channelStatus, lastPayload: lastPayload ?? this.lastPayload, ); } } final typingIndicatorProvider = StateNotifierProvider.family< TypingIndicatorController, TypingIndicatorState, String >((ref, ticketId) { final client = ref.watch(supabaseClientProvider); final controller = TypingIndicatorController(client, ticketId); return controller; }); class TypingIndicatorController extends StateNotifier { TypingIndicatorController(this._client, this._ticketId) : super( const TypingIndicatorState( userIds: {}, channelStatus: 'init', lastPayload: {}, ), ) { _initChannel(); } final SupabaseClient _client; final String _ticketId; RealtimeChannel? _channel; Timer? _typingTimer; final Map _remoteTimeouts = {}; // Marked when dispose() starts to prevent late async callbacks mutating state. bool _disposed = false; void _initChannel() { final channel = _client.channel('typing:$_ticketId'); channel.onBroadcast( event: 'typing', callback: (payload) { try { if (_disposed || !mounted) return; final Map data = _extractPayload(payload); final userId = data['user_id'] as String?; final rawType = data['type']?.toString(); final currentUserId = _client.auth.currentUser?.id; if (_disposed || !mounted) return; state = state.copyWith(lastPayload: data); if (userId == null || userId == currentUserId) { return; } if (rawType == 'stop') { _clearRemoteTyping(userId); return; } _markRemoteTyping(userId); } catch (e, st) { debugPrint( 'TypingIndicatorController: broadcast callback error: $e\n$st', ); } }, ); channel.subscribe((status, error) { try { if (_disposed || !mounted) return; state = state.copyWith(channelStatus: status.name); if (error != null) { debugPrint('TypingIndicatorController: subscribe error: $error'); } } catch (e, st) { debugPrint( 'TypingIndicatorController: subscribe callback error: $e\n$st', ); } }); _channel = channel; } Map _extractPayload(dynamic payload) { // The realtime client can wrap the actual broadcast payload inside // several nested fields (e.g. {payload: {payload: {...}}}). Walk the // object until we find a map containing `user_id` or `type` keys which // represent the actual typing payload. try { dynamic current = payload; for (var i = 0; i < 6; i++) { if (current is Map) { // Only return when we actually find the `user_id`, otherwise try // to unwrap nested envelopes. Some wrappers include `type: broadcast` // at the top-level which should not be treated as the message. if (current.containsKey('user_id')) { return Map.from(current); } if (current.containsKey('payload') && current['payload'] is Map) { current = current['payload']; continue; } // Some realtime envelope stores the payload at `data`. if (current.containsKey('data') && current['data'] is Map) { current = current['data']; continue; } break; } // Try common field on wrapper objects (e.g. RealtimeMessage.payload) try { final dyn = (current as dynamic).payload; if (dyn is Map) { current = dyn; continue; } } catch (_) { break; } } } catch (_) {} // As a last-resort, do a shallow recursive search for a map containing // `user_id` in case the realtime client used a Map // shape that wasn't caught above. try { Map? found; void search(dynamic node, int depth) { if (found != null || depth > 4) return; if (node is Map) { try { final m = Map.from(node); if (m.containsKey('user_id')) { found = m; return; } for (final v in m.values) { search(v, depth + 1); if (found != null) return; } } catch (_) { // ignore conversion errors } } else if (node is Iterable) { for (final v in node) { search(v, depth + 1); if (found != null) return; } } } search(payload, 0); if (found != null) return found!; } catch (_) {} return {}; } void userTyping() { if (_disposed || !mounted) return; if (_client.auth.currentUser?.id == null) return; _sendTypingEvent('start'); _typingTimer?.cancel(); // Debounce sending the stop event slightly so quick pauses don't spam // the network. 150ms was short and caused frequent start/stop bursts; // increase to 600ms to stabilize UX. _typingTimer = Timer(const Duration(milliseconds: 600), () { if (_disposed || !mounted) return; _sendTypingEvent('stop'); }); } void stopTyping() { if (_disposed || !mounted) return; _typingTimer?.cancel(); _sendTypingEvent('stop'); } void _markRemoteTyping(String userId) { if (_disposed || !mounted) return; final updated = {...state.userIds, userId}; if (_disposed || !mounted) return; state = state.copyWith(userIds: updated); _remoteTimeouts[userId]?.cancel(); // Extend timeout to 2500ms to accommodate brief realtime interruptions // (auth refresh, channel reconnect, message processing, etc.) without // clearing the presence. This gives a smoother typing experience. _remoteTimeouts[userId] = Timer(const Duration(milliseconds: 3500), () { if (_disposed || !mounted) return; _clearRemoteTyping(userId); }); } void _clearRemoteTyping(String userId) { if (_disposed || !mounted) return; final updated = {...state.userIds}..remove(userId); if (_disposed || !mounted) return; state = state.copyWith(userIds: updated); _remoteTimeouts[userId]?.cancel(); _remoteTimeouts.remove(userId); } void _sendTypingEvent(String type) { if (_disposed || !mounted) return; final userId = _client.auth.currentUser?.id; if (userId == null || _channel == null) return; _channel!.sendBroadcastMessage( event: 'typing', payload: {'user_id': userId, 'type': type}, ); } // Exposed for tests only: simulate a remote typing broadcast. @visibleForTesting void debugSimulateRemoteTyping(String userId, {bool stop = false}) { if (_disposed || !mounted) return; final data = {'user_id': userId, 'type': stop ? 'stop' : 'start'}; state = state.copyWith(lastPayload: data); if (stop) { _clearRemoteTyping(userId); } else { _markRemoteTyping(userId); } } @override void dispose() { // Mark disposed first so any late async callbacks will no-op. _disposed = true; // Cancel local timers and remote timeouts; do NOT send network events during // dispose (prevents broadcasts from re-entering callbacks after disposal). _typingTimer?.cancel(); for (final timer in _remoteTimeouts.values) { timer.cancel(); } _remoteTimeouts.clear(); // Unsubscribe from realtime channel. _channel?.unsubscribe(); _channel = null; super.dispose(); } }