package notifications import ( "bytes" "context" "encoding/json" "fmt" "log" "net/http" "time" ) // dispatchOne builds the payload for one (event, audience, channel) tuple, // posts it to the upstream Matrix/Email service, and writes a // notification_log row. Returns one of "sent", "failed", "skipped", // "already" so the caller can tally counters. // // "skipped" means the upstream URL is empty (dev/test mode) — we still log // so the UI can render "will-send-when-configured". "already" means the // (event, lead, audience, channel) combo is already logged from an earlier // run today; we don't re-send. func (s *Service) dispatchOne(ctx context.Context, e dueEvent, audience, channel string, runDate time.Time) (status string, err error) { // Idempotency check. var existing int if err := s.db.QueryRow(ctx, ` SELECT COUNT(*) FROM notification_log WHERE event_id = $1 AND lead_days = $2 AND audience = $3 AND channel = $4 `, e.ID, e.LeadDays, audience, channel).Scan(&existing); err != nil { return "failed", err } if existing > 0 { return "already", nil } recipients, lang, err := s.recipientsFor(ctx, e, audience) if err != nil { return "failed", err } url := s.urlFor(channel) if url == "" { // Stub mode: write a 'skipped' log row but report success so the cron // counter isn't alarming when running locally without the upstream. _ = s.writeLog(ctx, e, audience, channel, "skipped", "no upstream URL configured", runDate) return "skipped", nil } subject, body := Render(e.EventType, audience, e.LeadDays, lang, Vars{ Title: e.Title, Date: e.StartDate.Format("2006-01-02"), DatePretty: e.StartDate.Format("02.01.2006"), ClassName: e.ClassName, }) for _, recipient := range recipients { payload := DispatchPayload{ Channel: channel, Recipient: recipient, Language: lang, Subject: subject, Body: body, EventID: e.ID, LeadDays: e.LeadDays, } if err := s.postUpstream(ctx, url, payload); err != nil { _ = s.writeLog(ctx, e, audience, channel, "failed", err.Error(), runDate) return "failed", err } } if err := s.writeLog(ctx, e, audience, channel, "sent", "", runDate); err != nil { log.Printf("notification_log insert failed (already counted as sent): %v", err) } return "sent", nil } // recipientsFor returns the list of email addresses (parents) or Matrix // handles (students — derived from … unimplemented for now; we just return // the parent emails and let the bridge fan out). // // Per memory the Matrix/Email upstream services are owned by the colleague; // our job here is to hand them a recipient identifier they can resolve. // For parents that's the email; for students we have no contact identifier // yet, so we fall back to the parent emails too (broadcast). func (s *Service) recipientsFor(ctx context.Context, e dueEvent, audience string) ([]string, string, error) { // Find the class IDs from the event row. If empty → all classes owned by // the teacher. rows, err := s.db.Query(ctx, ` SELECT DISTINCT pa.email, pa.preferred_language FROM parent_account pa JOIN parent_child pc ON pc.parent_id = pa.id WHERE pa.created_by_user_id = $1 AND ( (SELECT array_length(affected_class_ids, 1) FROM cal_school_event WHERE id = $2) IS NULL OR pc.tt_class_id = ANY( (SELECT affected_class_ids FROM cal_school_event WHERE id = $2) ) ) `, e.OwnerUserID, e.ID) if err != nil { return nil, "de", err } defer rows.Close() var emails []string primaryLang := "de" first := true for rows.Next() { var email, lang string if err := rows.Scan(&email, &lang); err != nil { return nil, "de", err } emails = append(emails, email) if first { primaryLang = lang first = false } } return emails, primaryLang, nil } func (s *Service) urlFor(channel string) string { switch channel { case "matrix": return s.matrixURL case "email": return s.emailURL } return "" } func (s *Service) postUpstream(ctx context.Context, url string, payload DispatchPayload) error { body, _ := json.Marshal(payload) cctx, cancel := context.WithTimeout(ctx, s.httpTimeout) defer cancel() req, err := http.NewRequestWithContext(cctx, "POST", url, bytes.NewReader(body)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode >= 400 { return fmt.Errorf("upstream returned HTTP %d", resp.StatusCode) } return nil } func (s *Service) writeLog(ctx context.Context, e dueEvent, audience, channel, status, errorMessage string, runDate time.Time) error { _, err := s.db.Exec(ctx, ` INSERT INTO notification_log (event_id, lead_days, audience, channel, status, error_message, run_date) VALUES ($1::uuid, $2, $3, $4, $5, NULLIF($6, ''), $7::date) ON CONFLICT (event_id, lead_days, audience, channel) DO NOTHING `, e.ID, e.LeadDays, audience, channel, status, errorMessage, runDate.Format("2006-01-02")) return err }