create or replace function public.merge_raw_email_events(
  p_import_id bigint,
  p_rows jsonb
)
returns table(inserted_rows integer, updated_rows integer, failed_rows integer)
language plpgsql
security definer
set search_path = public
as $$
declare
  v_user_id uuid := auth.uid();
  v_is_admin boolean := public.is_admin(v_user_id);
begin
  with payload as (
    select
      value as row,
      ordinality::bigint as ord
    from jsonb_array_elements(coalesce(p_rows, '[]'::jsonb)) with ordinality
  ),
  parsed as (
    select
      trim(coalesce(row->>'sent_email_name', '')) as sent_email_name,
      upper(coalesce(row->>'primary_country_code', '')) as primary_country_code,
      coalesce(row->>'approved_document_name', '') as approved_document_name,
      case
        when nullif(row->>'last_open', '') is null then null
        when (row->>'last_open') ~ '^\d{4}-\d{2}-\d{2}' then (row->>'last_open')::timestamptz
        else null
      end as last_open,
      case when coalesce(row->>'opened', '') ~ '^-?\d+$' then greatest((row->>'opened')::integer, 0) else 0 end as opened,
      case when coalesce(row->>'total_opens', '') ~ '^-?\d+$' then greatest((row->>'total_opens')::integer, 0) else 0 end as total_opens,
      case
        when nullif(row->>'last_click', '') is null then null
        when (row->>'last_click') ~ '^\d{4}-\d{2}-\d{2}' then (row->>'last_click')::timestamptz
        else null
      end as last_click,
      case when coalesce(row->>'clicked', '') ~ '^-?\d+$' then greatest((row->>'clicked')::integer, 0) else 0 end as clicked,
      case when coalesce(row->>'total_clicks', '') ~ '^-?\d+$' then greatest((row->>'total_clicks')::integer, 0) else 0 end as total_clicks,
      case
        when nullif(row->>'sent_date', '') is null then null
        when (row->>'sent_date') ~ '^\d{4}-\d{2}-\d{2}' then (row->>'sent_date')::timestamptz
        else null
      end as sent_date,
      ord
    from payload
  ),
  dedup as (
    select distinct on (sent_email_name)
      sent_email_name,
      primary_country_code,
      approved_document_name,
      last_open,
      opened,
      total_opens,
      last_click,
      clicked,
      total_clicks,
      sent_date
    from parsed
    where sent_email_name <> ''
    order by sent_email_name, ord desc
  ),
  with_existing as (
    select
      d.*,
      ree.primary_country_code as existing_country,
      ree.sent_email_name is not null as existed
    from dedup d
    left join public.raw_email_events ree
      on ree.sent_email_name = d.sent_email_name
  ),
  eligible as (
    select *
    from with_existing we
    where
      v_is_admin
      or (
        upper(coalesce(nullif(we.existing_country, ''), nullif(we.primary_country_code, ''))) is not null
        and public.has_country_access(
          upper(coalesce(nullif(we.existing_country, ''), nullif(we.primary_country_code, ''))),
          v_user_id
        )
      )
  ),
  upserted as (
    insert into public.raw_email_events (
      sent_email_name,
      primary_country_code,
      approved_document_name,
      last_open,
      opened,
      total_opens,
      last_click,
      clicked,
      total_clicks,
      sent_date,
      latest_import_id
    )
    select
      e.sent_email_name,
      upper(coalesce(e.primary_country_code, '')),
      coalesce(e.approved_document_name, ''),
      e.last_open,
      greatest(coalesce(e.opened, 0), 0),
      greatest(coalesce(e.total_opens, 0), 0),
      e.last_click,
      greatest(coalesce(e.clicked, 0), 0),
      greatest(coalesce(e.total_clicks, 0), 0),
      e.sent_date,
      p_import_id
    from eligible e
    on conflict (sent_email_name)
    do update set
      primary_country_code = coalesce(nullif(excluded.primary_country_code, ''), raw_email_events.primary_country_code),
      approved_document_name = coalesce(nullif(excluded.approved_document_name, ''), raw_email_events.approved_document_name),
      last_open = coalesce(excluded.last_open, raw_email_events.last_open),
      opened = coalesce(excluded.opened, raw_email_events.opened, 0),
      total_opens = coalesce(excluded.total_opens, raw_email_events.total_opens, 0),
      last_click = coalesce(excluded.last_click, raw_email_events.last_click),
      clicked = coalesce(excluded.clicked, raw_email_events.clicked, 0),
      total_clicks = coalesce(excluded.total_clicks, raw_email_events.total_clicks, 0),
      sent_date = coalesce(excluded.sent_date, raw_email_events.sent_date),
      latest_import_id = p_import_id,
      updated_at = timezone('utc'::text, now())
    returning (xmax = 0) as inserted
  ),
  totals as (
    select
      (select count(*) from parsed where sent_email_name = '') as blank_key_count,
      (select count(*) from dedup) as dedup_total,
      (select count(*) from eligible) as eligible_total,
      (select count(*) from upserted where inserted) as inserted_count,
      (select count(*) from upserted where not inserted) as updated_count
  )
  select
    inserted_count::integer,
    updated_count::integer,
    (blank_key_count + (dedup_total - eligible_total))::integer
  into inserted_rows, updated_rows, failed_rows
  from totals;

  return next;
end;
$$;

grant execute on function public.merge_raw_email_events(bigint, jsonb) to authenticated;

