Skip to content

Commit

Permalink
Start working on emitting messages about all job row changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
chanks committed Sep 13, 2017
1 parent e58af83 commit 6729e65
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 1 deletion.
57 changes: 57 additions & 0 deletions lib/que/migrations.que_state_trigger_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# frozen_string_literal: true

require 'spec_helper'

describe Que::Migrations, "que_state trigger" do
attr_reader :connection

around do |&block|
super() do
DEFAULT_QUE_POOL.checkout do |conn|
begin
@connection = conn
conn.execute "LISTEN que_state"

block.call
ensure
conn.execute "UNLISTEN que_state"
conn.drain_notifications
end
end
end
end

def get_message
connection.wait_for_notify(1) do |channel, pid, payload|
json = JSON.parse(payload, symbolize_names: true)
assert_equal "job_change", json.delete(:message_type)
return json
end
raise "No message!"
end

describe "when inserting a new job" do
it "should issue a notification containing the job's class, queue, etc." do
DB[:que_jobs].insert(job_class: "MyJobClass")

assert_equal(
{action: "insert", job_class: "MyJobClass", queue: "default"},
get_message,
)
end

describe "that is wrapped by ActiveJob" do
it "should report the wrapped job class"

it "when the wrapped job class cannot be found should do the best it can"
end
end

describe "when updating a job" do
it "should issue a notification containing the job's class, error count, etc."
end

describe "when deleting a job" do
it "should issue a notification containing the job's class, queue, etc."
end
end
2 changes: 2 additions & 0 deletions lib/que/migrations/4/down.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ DROP INDEX que_jobs_data_gin_idx;

DROP TRIGGER que_job_notify ON que_jobs;
DROP FUNCTION que_job_notify();
DROP TRIGGER que_state_notify ON que_jobs;
DROP FUNCTION que_state_notify();
DROP TABLE que_lockers;

DROP TABLE que_values;
Expand Down
35 changes: 35 additions & 0 deletions lib/que/migrations/4/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,38 @@ CREATE TRIGGER que_job_notify
AFTER INSERT ON que_jobs
FOR EACH ROW
EXECUTE PROCEDURE que_job_notify();

CREATE FUNCTION que_state_notify() RETURNS trigger AS $$
DECLARE
row record;
message json;
BEGIN
IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN
row := NEW;
ELSIF TG_OP = 'DELETE' THEN
row := OLD;
ELSE
RAISE EXCEPTION 'Unrecognized TG_OP: %', TG_OP;
END IF;

SELECT row_to_json(t)
INTO message
FROM (
SELECT
'job_change' AS message_type,
lower(TG_OP) AS action,
row.queue AS queue,
row.job_class AS job_class
) t;

PERFORM pg_notify('que_state', message::text);

RETURN null;
END
$$
LANGUAGE plpgsql;

CREATE TRIGGER que_state_notify
AFTER INSERT OR UPDATE OR DELETE ON que_jobs
FOR EACH ROW
EXECUTE PROCEDURE que_state_notify();
4 changes: 3 additions & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
# Reset the schema to the most up-to-date version.
DB.drop_table? *QUE_TABLES
DB.drop_function :que_job_notify, if_exists: true
DB.drop_function :que_state_notify, if_exists: true
Que::Migrations.migrate!(version: Que::Migrations::CURRENT_VERSION)


Expand Down Expand Up @@ -287,8 +288,9 @@ def around
DB.get{setval(Sequel.cast('que_jobs_id_seq', :regclass), new_id)}

QUE_TABLES.each { |t| DB[t].delete }
rescue Sequel::DatabaseError
rescue Sequel::DatabaseError => e
puts "\n\nPrevious spec left DB in unexpected state, run aborted\n\n"
puts "\n\n#{e.class}: #{e.message}\n\n"
abort
end

Expand Down

0 comments on commit 6729e65

Please sign in to comment.