diff --git a/lib/pub_sub_model_sync/service_google.rb b/lib/pub_sub_model_sync/service_google.rb index d9f2b59..6172fb0 100644 --- a/lib/pub_sub_model_sync/service_google.rb +++ b/lib/pub_sub_model_sync/service_google.rb @@ -35,11 +35,7 @@ def listen_messages def publish(payload) p_topic_names = Array(payload.headers[:topic_name] || config.default_topic_name) message_topics = p_topic_names.map(&method(:find_topic)) - message_topics.each do |topic| - topic.publish_async(encode_payload(payload), message_headers(payload)) do |res| - raise StandardError, 'Failed to publish the message.' unless res.succeeded? - end - end + message_topics.each { |topic| publish_to_topic(topic, payload) } end def stop @@ -56,6 +52,19 @@ def find_topic(topic_name) topics[topic_name] || publish_topics[topic_name] || init_topic(topic_name, only_publish: true) end + def publish_to_topic(topic, payload) + retries ||= 0 + topic.publish_async(encode_payload(payload), message_headers(payload)) do |res| + raise StandardError, "Failed to publish the message. #{res.error}" unless res.succeeded? + end + rescue Google::Cloud::PubSub::OrderingKeyError => e + raise if (retries += 1) > 1 + + log("Resuming ordering_key and retrying OrderingKeyError for #{payload.headers[:uuid]}: #{e.message}") + topic.resume_publish(message_headers(payload)[:ordering_key]) + retry + end + # @param only_publish (Boolean): if false is used to listen and publish messages # @return (Topic): returns created or loaded topic def init_topic(topic_name, only_publish: false) diff --git a/spec/service_google_spec.rb b/spec/service_google_spec.rb index d0b5668..8339683 100644 --- a/spec/service_google_spec.rb +++ b/spec/service_google_spec.rb @@ -146,6 +146,28 @@ end inst.publish(payload) end + + # https://github.com/googleapis/google-cloud-ruby/blob/main/google-cloud-pubsub/OVERVIEW.md#handling-errors-with-ordered-keys + describe 'when failed because of OrderingKeyError (Ordered messages that fail to publish to the Pub/Sub API due + to error will put the ordering_key in a failed state)' do + before do + error = Google::Cloud::PubSub::OrderingKeyError.new('some error') + calls = 0 + allow(topic).to receive(:publish_async) do + (calls += 1) == 1 ? raise(error) : true + end + end + + it 'calls #resume_publish on topic to re-enable publish for the ordering_key' do + expect(topic).to receive(:resume_publish).with(payload.headers[:ordering_key]) + inst.publish(payload) + end + + it 'retries 1 time' do + expect(topic).to receive(:publish_async).exactly(2) + inst.publish(payload) + end + end end describe '.stop' do