Initial commit for shift push notification reminders
This commit is contained in:
parent
3af7a1e348
commit
4b63b55812
144
supabase/functions/process_scheduled_notifications/index.ts
Normal file
144
supabase/functions/process_scheduled_notifications/index.ts
Normal file
|
|
@ -0,0 +1,144 @@
|
|||
import { createClient } from 'npm:@supabase/supabase-js@2'
|
||||
|
||||
// Minimal Deno Edge Function to process queued scheduled_notifications
|
||||
// Environment variables required:
|
||||
// SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, SEND_FCM_URL
|
||||
|
||||
const corsHeaders = {
|
||||
'Access-Control-Allow-Origin': '*',
|
||||
'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type',
|
||||
}
|
||||
|
||||
const supabase = createClient(Deno.env.get('SUPABASE_URL')!, Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!)
|
||||
const SEND_FCM_URL = Deno.env.get('SEND_FCM_URL')!
|
||||
const BATCH_SIZE = Number(Deno.env.get('PROCESSOR_BATCH_SIZE') || '50')
|
||||
|
||||
// deterministic UUIDv5-like from a name using SHA-1
|
||||
async function uuidFromName(name: string): Promise<string> {
|
||||
const encoder = new TextEncoder()
|
||||
const data = encoder.encode(name)
|
||||
const hashBuffer = await crypto.subtle.digest('SHA-1', data)
|
||||
const hash = new Uint8Array(hashBuffer)
|
||||
// use first 16 bytes of SHA-1
|
||||
const bytes = hash.slice(0, 16)
|
||||
// set version (5) and variant (RFC 4122)
|
||||
bytes[6] = (bytes[6] & 0x0f) | 0x50 // version 5
|
||||
bytes[8] = (bytes[8] & 0x3f) | 0x80 // variant
|
||||
const hex = Array.from(bytes).map(b => b.toString(16).padStart(2, '0')).join('')
|
||||
return `${hex.slice(0,8)}-${hex.slice(8,12)}-${hex.slice(12,16)}-${hex.slice(16,20)}-${hex.slice(20,32)}`
|
||||
}
|
||||
|
||||
async function processBatch() {
|
||||
const nowIso = new Date().toISOString()
|
||||
const { data: rows, error } = await supabase
|
||||
.from('scheduled_notifications')
|
||||
.select('*')
|
||||
.eq('processed', false)
|
||||
.lte('scheduled_for', nowIso)
|
||||
.order('scheduled_for', { ascending: true })
|
||||
.limit(BATCH_SIZE)
|
||||
|
||||
if (error) {
|
||||
console.error('Failed to fetch scheduled_notifications', error)
|
||||
return
|
||||
}
|
||||
if (!rows || rows.length === 0) {
|
||||
console.log('No scheduled rows to process')
|
||||
return
|
||||
}
|
||||
|
||||
for (const r of rows) {
|
||||
try {
|
||||
const scheduleId = r.schedule_id
|
||||
const userId = r.user_id
|
||||
const notifyType = r.notify_type
|
||||
const rowId = r.id
|
||||
|
||||
const notificationId = await uuidFromName(`${scheduleId}-${userId}-${notifyType}`)
|
||||
|
||||
// Attempt to mark idempotent push
|
||||
const { data: markData, error: markErr } = await supabase.rpc('try_mark_notification_pushed', { p_notification_id: notificationId })
|
||||
if (markErr) {
|
||||
console.warn('try_mark_notification_pushed error', markErr)
|
||||
// do not mark processed; increment retry
|
||||
await supabase.from('scheduled_notifications').update({ retry_count: r.retry_count + 1, last_error: String(markErr) }).eq('id', rowId)
|
||||
continue
|
||||
}
|
||||
|
||||
if (markData === false) {
|
||||
console.log('Notification already pushed, skipping', notificationId)
|
||||
await supabase.from('scheduled_notifications').update({ processed: true, processed_at: new Date().toISOString() }).eq('id', rowId)
|
||||
continue
|
||||
}
|
||||
|
||||
// Prepare message
|
||||
let title = ''
|
||||
let body = ''
|
||||
if (notifyType === 'start_15') {
|
||||
title = 'Shift starting soon'
|
||||
body = 'Your shift starts in 15 minutes. Don\'t forget to check in.'
|
||||
} else if (notifyType === 'end') {
|
||||
title = 'Shift ended'
|
||||
body = 'Your shift has ended. Please remember to check out if you haven\'t.'
|
||||
} else {
|
||||
title = 'Shift reminder'
|
||||
body = 'Reminder about your shift.'
|
||||
}
|
||||
|
||||
// Call send_fcm endpoint to deliver push (reuses existing implementation)
|
||||
const payload = {
|
||||
user_ids: [userId],
|
||||
title,
|
||||
body,
|
||||
data: {
|
||||
notification_id: notificationId,
|
||||
schedule_id: scheduleId,
|
||||
type: notifyType,
|
||||
},
|
||||
}
|
||||
|
||||
const res = await fetch(SEND_FCM_URL, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify(payload),
|
||||
})
|
||||
|
||||
if (!res.ok) {
|
||||
const text = await res.text().catch(() => '')
|
||||
console.error('send_fcm failed', res.status, text)
|
||||
await supabase.from('scheduled_notifications').update({ retry_count: r.retry_count + 1, last_error: `send_fcm ${res.status}: ${text}` }).eq('id', rowId)
|
||||
continue
|
||||
}
|
||||
|
||||
// Mark processed
|
||||
await supabase.from('scheduled_notifications').update({ processed: true, processed_at: new Date().toISOString() }).eq('id', rowId)
|
||||
console.log('Processed scheduled notification', rowId, notificationId)
|
||||
} catch (err) {
|
||||
console.error('Error processing row', r, err)
|
||||
try {
|
||||
await supabase.from('scheduled_notifications').update({ retry_count: r.retry_count + 1, last_error: String(err) }).eq('id', r.id)
|
||||
} catch (e) {
|
||||
console.error('Failed to update retry_count', e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Deno.serve(async (req) => {
|
||||
if (req.method === 'OPTIONS') {
|
||||
return new Response('ok', { headers: corsHeaders })
|
||||
}
|
||||
|
||||
try {
|
||||
// Allow manual triggering via POST; also allow GET for quick check
|
||||
if (req.method === 'POST' || req.method === 'GET') {
|
||||
await processBatch()
|
||||
return new Response('ok', { headers: corsHeaders })
|
||||
}
|
||||
|
||||
return new Response('method not allowed', { status: 405, headers: corsHeaders })
|
||||
} catch (err) {
|
||||
console.error('Processor error', err)
|
||||
return new Response(JSON.stringify({ error: String(err) }), { status: 500, headers: corsHeaders })
|
||||
}
|
||||
})
|
||||
75
supabase/migrations/20260318_add_scheduled_notifications.sql
Normal file
75
supabase/migrations/20260318_add_scheduled_notifications.sql
Normal file
|
|
@ -0,0 +1,75 @@
|
|||
-- Migration: add scheduled_notifications queue and enqueue function for shift reminders
|
||||
-- Creates a queue table and stored procedure to enqueue 15-minute and end-of-shift reminders
|
||||
|
||||
CREATE EXTENSION IF NOT EXISTS pgcrypto;
|
||||
|
||||
-- Queue table for scheduled notifications
|
||||
CREATE TABLE IF NOT EXISTS public.scheduled_notifications (
|
||||
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
schedule_id uuid NOT NULL REFERENCES public.duty_schedules(id) ON DELETE CASCADE,
|
||||
user_id uuid NOT NULL REFERENCES auth.users(id) ON DELETE CASCADE,
|
||||
notify_type text NOT NULL, -- 'start_15' | 'end'
|
||||
scheduled_for timestamptz NOT NULL,
|
||||
created_at timestamptz NOT NULL DEFAULT now(),
|
||||
processed boolean NOT NULL DEFAULT false,
|
||||
processed_at timestamptz,
|
||||
retry_count int NOT NULL DEFAULT 0,
|
||||
last_error text
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_scheduled_notifications_scheduled_for ON public.scheduled_notifications(scheduled_for);
|
||||
CREATE INDEX IF NOT EXISTS idx_scheduled_notifications_processed ON public.scheduled_notifications(processed);
|
||||
|
||||
-- Ensure uniqueness per schedule/user/notify type
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT 1 FROM pg_constraint c
|
||||
JOIN pg_class t ON c.conrelid = t.oid
|
||||
WHERE c.conname = 'uniq_sched_user_type' AND t.relname = 'scheduled_notifications'
|
||||
) THEN
|
||||
ALTER TABLE public.scheduled_notifications ADD CONSTRAINT uniq_sched_user_type UNIQUE (schedule_id, user_id, notify_type);
|
||||
END IF;
|
||||
END$$;
|
||||
|
||||
-- The project already has notification_pushes and try_mark_notification_pushed (see earlier migrations).
|
||||
|
||||
-- Enqueue function: finds duty_schedules in the configured windows and inserts into queue.
|
||||
CREATE OR REPLACE FUNCTION public.enqueue_due_shift_notifications()
|
||||
RETURNS void LANGUAGE plpgsql AS $$
|
||||
DECLARE
|
||||
rec RECORD;
|
||||
BEGIN
|
||||
-- 15-minute-before reminders
|
||||
FOR rec IN
|
||||
SELECT id AS schedule_id, user_id AS user_id, start_time AS start_at
|
||||
FROM public.duty_schedules
|
||||
WHERE start_time BETWEEN now() + interval '15 minutes' - interval '30 seconds' AND now() + interval '15 minutes' + interval '30 seconds'
|
||||
AND status = 'active'
|
||||
LOOP
|
||||
-- Skip if user already checked in for this duty (attendance_logs references duty_schedule_id)
|
||||
IF EXISTS (SELECT 1 FROM public.attendance_logs al WHERE al.duty_schedule_id = rec.schedule_id AND al.check_in_at IS NOT NULL) THEN
|
||||
CONTINUE;
|
||||
END IF;
|
||||
|
||||
INSERT INTO public.scheduled_notifications (schedule_id, user_id, notify_type, scheduled_for)
|
||||
VALUES (rec.schedule_id, rec.user_id, 'start_15', rec.start_at)
|
||||
ON CONFLICT (schedule_id, user_id, notify_type) DO NOTHING;
|
||||
END LOOP;
|
||||
|
||||
-- End-of-shift reminders (near exact end)
|
||||
FOR rec IN
|
||||
SELECT id AS schedule_id, user_id AS user_id, end_time AS end_at
|
||||
FROM public.duty_schedules
|
||||
WHERE end_time BETWEEN now() - interval '30 seconds' AND now() + interval '30 seconds'
|
||||
AND status = 'active'
|
||||
LOOP
|
||||
INSERT INTO public.scheduled_notifications (schedule_id, user_id, notify_type, scheduled_for)
|
||||
VALUES (rec.schedule_id, rec.user_id, 'end', rec.end_at)
|
||||
ON CONFLICT (schedule_id, user_id, notify_type) DO NOTHING;
|
||||
END LOOP;
|
||||
END;
|
||||
$$;
|
||||
|
||||
-- NOTE: Schedule this function with pg_cron or your Postgres scheduler. Example (requires pg_cron extension enabled):
|
||||
-- SELECT cron.schedule('shift_reminders_every_min', '*/1 * * * *', $$SELECT public.enqueue_due_shift_notifications();$$);
|
||||
Loading…
Reference in New Issue
Block a user