From 904273ab807f8a954aa06bded6b6205bbae42824 Mon Sep 17 00:00:00 2001 From: Bob Potter Date: Wed, 16 Jul 2014 17:28:35 -0500 Subject: [PATCH] Consumer: wait for messages by default Prior to this change min_bytes was 0 by default which causes the fetch command to unexpectedly return regardless of the max_wait_ms setting. Also, fixed options for PartitionConsumer#fetch to expect max_wait_ms and not max_wait. --- lib/poseidon/partition_consumer.rb | 14 ++++++++++---- .../simple/simple_producer_and_consumer_spec.rb | 17 +++++++++++++++++ spec/unit/partition_consumer_spec.rb | 8 ++++---- 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/lib/poseidon/partition_consumer.rb b/lib/poseidon/partition_consumer.rb index 5ec798e..a81b69c 100644 --- a/lib/poseidon/partition_consumer.rb +++ b/lib/poseidon/partition_consumer.rb @@ -48,11 +48,14 @@ def self.consumer_for_partition(client_id, seed_brokers, topic, partition, offse # # @option options [:max_bytes] Maximum number of bytes to fetch # Default: 1048576 (1MB) + # # @option options [:max_wait_ms] # How long to block until the server sends us data. + # NOTE: This is only enforced if min_bytes is > 0. # Default: 100 (100ms) + # # @option options [:min_bytes] Smallest amount of data the server should send us. - # Default: 0 (Send us data as soon as it is ready) + # Default: 1 (Send us data as soon as it is ready) # # @api public def initialize(client_id, host, port, topic, partition, offset, options = {}) @@ -75,14 +78,16 @@ def initialize(client_id, host, port, topic, partition, offset, options = {}) # # @option options [:max_bytes] # Maximum number of bytes to fetch + # # @option options [:max_wait_ms] # How long to block until the server sends us data. + # # @option options [:min_bytes] # Smallest amount of data the server should send us. # # @api public def fetch(options = {}) - fetch_max_wait = options.delete(:max_wait) || max_wait_ms + fetch_max_wait = options.delete(:max_wait_ms) || max_wait_ms fetch_max_bytes = options.delete(:max_bytes) || max_bytes fetch_min_bytes = options.delete(:min_bytes) || min_bytes @@ -96,7 +101,8 @@ def fetch(options = {}) partition_response = topic_response.partition_fetch_responses.first unless partition_response.error == Errors::NO_ERROR_CODE - if @offset < 0 && Errors::ERROR_CODES[partition_response.error] == Errors::OffsetOutOfRange + if @offset < 0 && + Errors::ERROR_CODES[partition_response.error] == Errors::OffsetOutOfRange @offset = :earliest_offset return fetch(options) end @@ -125,7 +131,7 @@ def next_offset private def handle_options(options) @max_bytes = options.delete(:max_bytes) || 1024*1024 - @min_bytes = options.delete(:min_bytes) || 0 + @min_bytes = options.delete(:min_bytes) || 1 @max_wait_ms = options.delete(:max_wait_ms) || 10_000 if options.keys.any? raise ArgumentError, "Unknown options: #{options.keys.inspect}" diff --git a/spec/integration/simple/simple_producer_and_consumer_spec.rb b/spec/integration/simple/simple_producer_and_consumer_spec.rb index 67100a3..f77c854 100644 --- a/spec/integration/simple/simple_producer_and_consumer_spec.rb +++ b/spec/integration/simple/simple_producer_and_consumer_spec.rb @@ -54,6 +54,23 @@ expect(messages.empty?).to eq(true) end + it "waits for messages" do + # Create topic + @c = Connection.new("localhost", 9092, "metadata_fetcher") + @c.topic_metadata(["simple_wait_test"]) + + sleep 5 + @consumer = PartitionConsumer.new("test_consumer", "localhost", 9092, + "simple_wait_test", 0, :earliest_offset, + :max_wait_ms => 2500) + + require 'benchmark' + n = Benchmark.realtime do + @consumer.fetch + end + expect(n).to be_within(0.25).of(2.5) + end + # Not sure what's going on here, will revisit. =begin it "fetches larger messages with a larger max bytes size" do diff --git a/spec/unit/partition_consumer_spec.rb b/spec/unit/partition_consumer_spec.rb index 5323f87..f4a4f58 100644 --- a/spec/unit/partition_consumer_spec.rb +++ b/spec/unit/partition_consumer_spec.rb @@ -14,13 +14,13 @@ describe "creation" do context "when passed unknown options" do it "raises an ArgumentError" do - expect { PartitionConsumer.new("test_client", "localhost", 9092, "test_topic", 0,:earliest_offset, :unknown => true) }.to raise_error(ArgumentError) + expect { PartitionConsumer.new("test_client", "localhost", 9092, "test_topic", 0, :earliest_offset, :unknown => true) }.to raise_error(ArgumentError) end end context "when passed an unknown offset" do it "raises an ArgumentError" do - expect { PartitionConsumer.new("test_client", "localhost", 9092, "test_topic", 0,:coolest_offset) }.to raise_error(ArgumentError) + expect { PartitionConsumer.new("test_client", "localhost", 9092, "test_topic", 0, :coolest_offset) }.to raise_error(ArgumentError) end end end @@ -97,13 +97,13 @@ end it "uses object defaults" do - @connection.should_receive(:fetch).with(10_000, 0, anything) + @connection.should_receive(:fetch).with(10_000, 1, anything) @pc.fetch end context "when options are passed" do it "overrides object defaults" do - @connection.should_receive(:fetch).with(20_000, 0, anything) + @connection.should_receive(:fetch).with(20_000, 1, anything) @pc = PartitionConsumer.new("test_client", "localhost", 9092, "test_topic", 0, :earliest_offset, :max_wait_ms => 20_000) @pc.fetch