Skip to content

Commit

Permalink
fix: Resume ordering_key when marked as failed
Browse files Browse the repository at this point in the history
  • Loading branch information
owen2345 committed Dec 2, 2022
1 parent affeeeb commit ae731b4
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 5 deletions.
19 changes: 14 additions & 5 deletions lib/pub_sub_model_sync/service_google.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,7 @@ def listen_messages
def publish(payload)
p_topic_names = Array(payload.headers[:topic_name] || config.default_topic_name)
message_topics = p_topic_names.map(&method(:find_topic))
message_topics.each do |topic|
topic.publish_async(encode_payload(payload), message_headers(payload)) do |res|
raise StandardError, 'Failed to publish the message.' unless res.succeeded?
end
end
message_topics.each { |topic| publish_to_topic(topic, payload) }
end

def stop
Expand All @@ -56,6 +52,19 @@ def find_topic(topic_name)
topics[topic_name] || publish_topics[topic_name] || init_topic(topic_name, only_publish: true)
end

def publish_to_topic(topic, payload)
retries ||= 0
topic.publish_async(encode_payload(payload), message_headers(payload)) do |res|
raise StandardError, "Failed to publish the message. #{res.error}" unless res.succeeded?
end
rescue Google::Cloud::PubSub::OrderingKeyError => e
raise if (retries += 1) > 1

log("Resuming ordering_key and retrying OrderingKeyError for #{payload.headers[:uuid]}: #{e.message}")
topic.resume_publish(message_headers(payload)[:ordering_key])
retry
end

# @param only_publish (Boolean): if false is used to listen and publish messages
# @return (Topic): returns created or loaded topic
def init_topic(topic_name, only_publish: false)
Expand Down
22 changes: 22 additions & 0 deletions spec/service_google_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,28 @@
end
inst.publish(payload)
end

# https://github.com/googleapis/google-cloud-ruby/blob/main/google-cloud-pubsub/OVERVIEW.md#handling-errors-with-ordered-keys
describe 'when failed because of OrderingKeyError (Ordered messages that fail to publish to the Pub/Sub API due
to error will put the ordering_key in a failed state)' do
before do
error = Google::Cloud::PubSub::OrderingKeyError.new('some error')
calls = 0
allow(topic).to receive(:publish_async) do
(calls += 1) == 1 ? raise(error) : true
end
end

it 'calls #resume_publish on topic to re-enable publish for the ordering_key' do
expect(topic).to receive(:resume_publish).with(payload.headers[:ordering_key])
inst.publish(payload)
end

it 'retries 1 time' do
expect(topic).to receive(:publish_async).exactly(2)
inst.publish(payload)
end
end
end

describe '.stop' do
Expand Down

0 comments on commit ae731b4

Please sign in to comment.