261 lines
10 KiB
TypeScript
261 lines
10 KiB
TypeScript
import { createClient } from 'npm:@supabase/supabase-js@2'
|
|
|
|
// Minimal Deno Edge Function to process queued scheduled_notifications
|
|
// Environment variables required:
|
|
// SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY
|
|
// Optional: SEND_FCM_URL (defaults to ${SUPABASE_URL}/functions/v1/send_fcm)
|
|
|
|
const corsHeaders = {
|
|
'Access-Control-Allow-Origin': '*',
|
|
'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type',
|
|
}
|
|
|
|
const SUPABASE_URL = Deno.env.get('SUPABASE_URL')!
|
|
const SERVICE_ROLE_KEY = Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!
|
|
const supabase = createClient(SUPABASE_URL, SERVICE_ROLE_KEY)
|
|
const SEND_FCM_URL = Deno.env.get('SEND_FCM_URL')
|
|
|| `${SUPABASE_URL}/functions/v1/send_fcm`
|
|
const BATCH_SIZE = Number(Deno.env.get('PROCESSOR_BATCH_SIZE') || '50')
|
|
|
|
// deterministic UUIDv5-like from a name using SHA-1
|
|
async function uuidFromName(name: string): Promise<string> {
|
|
const encoder = new TextEncoder()
|
|
const data = encoder.encode(name)
|
|
const hashBuffer = await crypto.subtle.digest('SHA-1', data)
|
|
const hash = new Uint8Array(hashBuffer)
|
|
// use first 16 bytes of SHA-1
|
|
const bytes = hash.slice(0, 16)
|
|
// set version (5) and variant (RFC 4122)
|
|
bytes[6] = (bytes[6] & 0x0f) | 0x50 // version 5
|
|
bytes[8] = (bytes[8] & 0x3f) | 0x80 // variant
|
|
const hex = Array.from(bytes).map(b => b.toString(16).padStart(2, '0')).join('')
|
|
return `${hex.slice(0,8)}-${hex.slice(8,12)}-${hex.slice(12,16)}-${hex.slice(16,20)}-${hex.slice(20,32)}`
|
|
}
|
|
|
|
async function processBatch() {
|
|
// Step 1: Enqueue any due notifications via RPC.
|
|
// This makes the edge function self-sufficient — a single trigger
|
|
// (pg_cron, external cron, or manual curl) handles both enqueue + process.
|
|
console.log('Calling enqueue_all_notifications RPC...')
|
|
const { error: enqueueErr, data: enqueueData } = await supabase.rpc('enqueue_all_notifications')
|
|
if (enqueueErr) {
|
|
console.error('❌ enqueue_all_notifications RPC FAILED:', JSON.stringify(enqueueErr))
|
|
} else {
|
|
console.log('✓ enqueue_all_notifications completed successfully', enqueueData)
|
|
}
|
|
|
|
// Step 2: Process enqueued rows
|
|
const nowIso = new Date().toISOString()
|
|
const { data: rows, error } = await supabase
|
|
.from('scheduled_notifications')
|
|
.select('*')
|
|
.eq('processed', false)
|
|
.lte('scheduled_for', nowIso)
|
|
.order('scheduled_for', { ascending: true })
|
|
.limit(BATCH_SIZE)
|
|
|
|
if (error) {
|
|
console.error('Failed to fetch scheduled_notifications', error)
|
|
return
|
|
}
|
|
if (!rows || rows.length === 0) {
|
|
console.log('No scheduled rows to process')
|
|
return
|
|
}
|
|
|
|
// Deduplicate announcement_banner rows: for each (announcement_id, user_id)
|
|
// pair, keep only the row with the highest epoch and immediately mark older
|
|
// ones as processed without sending FCM. This prevents double-pushes caused
|
|
// by stale rows from previous epochs appearing alongside the current epoch's
|
|
// row (e.g. if scheduled_for was set to now() instead of the next boundary,
|
|
// or if a pg_cron cycle was missed leaving old rows unprocessed).
|
|
const annBannerBest = new Map<string, any>()
|
|
const staleIds: string[] = []
|
|
for (const r of rows) {
|
|
if (r.notify_type !== 'announcement_banner' || !r.announcement_id) continue
|
|
const key = `${r.announcement_id}:${r.user_id}`
|
|
const best = annBannerBest.get(key)
|
|
if (!best || (r.epoch ?? 0) > (best.epoch ?? 0)) {
|
|
if (best) staleIds.push(best.id)
|
|
annBannerBest.set(key, r)
|
|
} else {
|
|
staleIds.push(r.id)
|
|
}
|
|
}
|
|
if (staleIds.length > 0) {
|
|
console.log(`Skipping ${staleIds.length} stale announcement_banner row(s)`)
|
|
await supabase
|
|
.from('scheduled_notifications')
|
|
.update({ processed: true, processed_at: new Date().toISOString() })
|
|
.in('id', staleIds)
|
|
}
|
|
const staleSet = new Set(staleIds)
|
|
|
|
for (const r of rows.filter((r: any) => !staleSet.has(r.id))) {
|
|
try {
|
|
const scheduleId = r.schedule_id
|
|
const userId = r.user_id
|
|
const notifyType = r.notify_type
|
|
const rowId = r.id
|
|
|
|
// Build a unique ID that accounts for all reference columns + epoch.
|
|
// announcement_id is included so that concurrent banner announcements
|
|
// targeting the same user+epoch get distinct notificationIds — without
|
|
// it, try_mark_notification_pushed would silently drop the second one.
|
|
const idSource = `${scheduleId || ''}-${r.task_id || ''}-${r.it_service_request_id || ''}-${r.pass_slip_id || ''}-${r.announcement_id || ''}-${userId}-${notifyType}-${r.epoch || 0}`
|
|
const notificationId = await uuidFromName(idSource)
|
|
|
|
// Idempotency is handled by send_fcm via try_mark_notification_pushed.
|
|
// Do NOT call it here — doing so would mark the notification as pushed
|
|
// before send_fcm sees it, causing send_fcm to skip the actual FCM send.
|
|
|
|
// Prepare message based on notify_type
|
|
let title = ''
|
|
let body = ''
|
|
const data: Record<string, string> = {
|
|
notification_id: notificationId,
|
|
type: notifyType,
|
|
}
|
|
|
|
// Include reference IDs in data payload
|
|
if (scheduleId) data.schedule_id = scheduleId
|
|
if (r.task_id) data.task_id = r.task_id
|
|
if (r.it_service_request_id) data.it_service_request_id = r.it_service_request_id
|
|
if (r.pass_slip_id) data.pass_slip_id = r.pass_slip_id
|
|
if (r.announcement_id) data.announcement_id = r.announcement_id
|
|
|
|
switch (notifyType) {
|
|
case 'start_15':
|
|
title = 'Shift starting soon'
|
|
body = "Your shift starts in 15 minutes. Don't forget to check in."
|
|
data.navigate_to = '/attendance'
|
|
break
|
|
case 'end':
|
|
title = 'Shift ended'
|
|
body = "Your shift has ended. Please remember to check out."
|
|
data.navigate_to = '/attendance'
|
|
break
|
|
case 'end_hourly':
|
|
title = 'Check-out reminder'
|
|
body = "You haven't checked out yet. Please check out when done."
|
|
data.navigate_to = '/attendance'
|
|
break
|
|
case 'overtime_idle_15':
|
|
title = 'No active task'
|
|
body = "You've been on overtime for 15 minutes without an active task or IT service request."
|
|
data.navigate_to = '/tasks'
|
|
break
|
|
case 'overtime_checkout_30':
|
|
title = 'Overtime check-out reminder'
|
|
body = "It's been 30 minutes since your last task ended. Consider checking out if you're done."
|
|
data.navigate_to = '/attendance'
|
|
break
|
|
case 'isr_event_60':
|
|
title = 'IT Service Request event soon'
|
|
body = 'An IT service request event starts in 1 hour.'
|
|
data.navigate_to = `/it-service-requests/${r.it_service_request_id}`
|
|
break
|
|
case 'isr_evidence_daily':
|
|
title = 'Evidence upload reminder'
|
|
body = 'Please upload evidence and action taken for your IT service request.'
|
|
data.navigate_to = `/it-service-requests/${r.it_service_request_id}`
|
|
break
|
|
case 'task_paused_daily':
|
|
title = 'Paused task reminder'
|
|
body = 'You have a paused task that needs attention.'
|
|
data.navigate_to = `/tasks/${r.task_id}`
|
|
break
|
|
case 'backlog_15':
|
|
title = 'Pending tasks reminder'
|
|
body = 'Your shift ends in 15 minutes and you still have pending tasks.'
|
|
data.navigate_to = '/tasks'
|
|
break
|
|
case 'pass_slip_expiry_15':
|
|
title = 'Pass slip expiring soon'
|
|
body = 'Your pass slip expires in 15 minutes. Please return and complete it.'
|
|
data.navigate_to = '/attendance'
|
|
break
|
|
case 'pass_slip_expired_15':
|
|
title = 'Pass slip OVERDUE'
|
|
body = 'Your pass slip has exceeded the 1-hour limit. Please return and complete it immediately.'
|
|
data.navigate_to = '/attendance'
|
|
break
|
|
case 'announcement_banner': {
|
|
const { data: ann } = await supabase
|
|
.from('announcements')
|
|
.select('title')
|
|
.eq('id', r.announcement_id)
|
|
.single()
|
|
const rawTitle = ann?.title ?? ''
|
|
const displayTitle = rawTitle.length > 80
|
|
? rawTitle.substring(0, 80) + '\u2026'
|
|
: rawTitle
|
|
title = 'Announcement Reminder'
|
|
body = displayTitle
|
|
? `"${displayTitle}" — Please tap to review this announcement.`
|
|
: 'You have a pending announcement that requires your attention. Tap to view it.'
|
|
data.navigate_to = '/announcements'
|
|
break
|
|
}
|
|
default:
|
|
title = 'Reminder'
|
|
body = 'You have a pending notification.'
|
|
}
|
|
|
|
// Call send_fcm endpoint to deliver push (reuses existing implementation)
|
|
const payload = {
|
|
user_ids: [userId],
|
|
title,
|
|
body,
|
|
data,
|
|
}
|
|
|
|
const res = await fetch(SEND_FCM_URL, {
|
|
method: 'POST',
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': `Bearer ${SERVICE_ROLE_KEY}`,
|
|
},
|
|
body: JSON.stringify(payload),
|
|
})
|
|
|
|
if (!res.ok) {
|
|
const text = await res.text().catch(() => '')
|
|
console.error('send_fcm failed', res.status, text)
|
|
await supabase.from('scheduled_notifications').update({ retry_count: r.retry_count + 1, last_error: `send_fcm ${res.status}: ${text}` }).eq('id', rowId)
|
|
continue
|
|
}
|
|
|
|
// Mark processed
|
|
await supabase.from('scheduled_notifications').update({ processed: true, processed_at: new Date().toISOString() }).eq('id', rowId)
|
|
console.log('Processed scheduled notification', rowId, notificationId)
|
|
} catch (err) {
|
|
console.error('Error processing row', r, err)
|
|
try {
|
|
await supabase.from('scheduled_notifications').update({ retry_count: r.retry_count + 1, last_error: String(err) }).eq('id', r.id)
|
|
} catch (e) {
|
|
console.error('Failed to update retry_count', e)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Deno.serve(async (req) => {
|
|
if (req.method === 'OPTIONS') {
|
|
return new Response('ok', { headers: corsHeaders })
|
|
}
|
|
|
|
try {
|
|
// Allow manual triggering via POST; also allow GET for quick check
|
|
if (req.method === 'POST' || req.method === 'GET') {
|
|
await processBatch()
|
|
return new Response('ok', { headers: corsHeaders })
|
|
}
|
|
|
|
return new Response('method not allowed', { status: 405, headers: corsHeaders })
|
|
} catch (err) {
|
|
console.error('Processor error', err)
|
|
return new Response(JSON.stringify({ error: String(err) }), { status: 500, headers: corsHeaders })
|
|
}
|
|
})
|