Skip to content

Commit

Permalink
fix order of send_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
ChuckHend committed Dec 19, 2024
1 parent b4e5d03 commit 684cb2f
Showing 1 changed file with 46 additions and 23 deletions.
69 changes: 46 additions & 23 deletions pgmq-extension/sql/pgmq.sql
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,29 @@ BEGIN
END;
$$ LANGUAGE plpgsql;
-- send: actual implementation
CREATE FUNCTION pgmq.send(
queue_name TEXT,
msg JSONB,
headers JSONB,
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
INSERT INTO pgmq.%I (vt, message, headers)
VALUES ($2, $1, $3)
RETURNING msg_id;
$QUERY$,
qtable
);
RETURN QUERY EXECUTE sql USING msg, delay, headers;
END;
$$ LANGUAGE plpgsql;
-- send: 2 args, no delay or headers
CREATE FUNCTION pgmq.send(
queue_name TEXT,
Expand Down Expand Up @@ -336,6 +359,29 @@ BEGIN
END;
$$ LANGUAGE plpgsql;
-- send_batch: actual implementation
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
headers JSONB[],
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
INSERT INTO pgmq.%I (vt, message, headers)
SELECT $2, unnest($1), unnest(coalesce($3, ARRAY[]::jsonb[]))
RETURNING msg_id;
$QUERY$,
qtable
);
RETURN QUERY EXECUTE sql USING msgs, delay, headers;
END;
$$ LANGUAGE plpgsql;
-- send batch: 2 args
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
Expand Down Expand Up @@ -381,29 +427,6 @@ CREATE FUNCTION pgmq.send_batch(
SELECT * FROM pgmq.send_batch(queue_name, msgs, headers, clock_timestamp() + make_interval(secs => delay));
$$ LANGUAGE sql;
-- send_batch: actual implementation
CREATE FUNCTION pgmq.send_batch(
queue_name TEXT,
msgs JSONB[],
headers JSONB[],
delay TIMESTAMP WITH TIME ZONE
) RETURNS SETOF BIGINT AS $$
DECLARE
sql TEXT;
qtable TEXT := pgmq.format_table_name(queue_name, 'q');
BEGIN
sql := FORMAT(
$QUERY$
INSERT INTO pgmq.%I (vt, message, headers)
SELECT $2, unnest($1), unnest(coalesce($3, ARRAY[]::jsonb[]))
RETURNING msg_id;
$QUERY$,
qtable
);
RETURN QUERY EXECUTE sql USING msgs, delay, headers;
END;
$$ LANGUAGE plpgsql;
-- returned by pgmq.metrics() and pgmq.metrics_all
CREATE TYPE pgmq.metrics_result AS (
queue_name text,
Expand Down

0 comments on commit 684cb2f

Please sign in to comment.