Skip to content

Commit

Permalink
Merge pull request #48 from bpot/bp/consumer_sleep
Browse files Browse the repository at this point in the history
Consumer: block for messages by default
  • Loading branch information
bpot committed Jul 16, 2014
2 parents 9acf1e6 + 904273a commit 5140653
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 8 deletions.
14 changes: 10 additions & 4 deletions lib/poseidon/partition_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@ def self.consumer_for_partition(client_id, seed_brokers, topic, partition, offse
#
# @option options [:max_bytes] Maximum number of bytes to fetch
# Default: 1048576 (1MB)
#
# @option options [:max_wait_ms]
# How long to block until the server sends us data.
# NOTE: This is only enforced if min_bytes is > 0.
# Default: 100 (100ms)
#
# @option options [:min_bytes] Smallest amount of data the server should send us.
# Default: 0 (Send us data as soon as it is ready)
# Default: 1 (Send us data as soon as it is ready)
#
# @api public
def initialize(client_id, host, port, topic, partition, offset, options = {})
Expand All @@ -75,14 +78,16 @@ def initialize(client_id, host, port, topic, partition, offset, options = {})
#
# @option options [:max_bytes]
# Maximum number of bytes to fetch
#
# @option options [:max_wait_ms]
# How long to block until the server sends us data.
#
# @option options [:min_bytes]
# Smallest amount of data the server should send us.
#
# @api public
def fetch(options = {})
fetch_max_wait = options.delete(:max_wait) || max_wait_ms
fetch_max_wait = options.delete(:max_wait_ms) || max_wait_ms
fetch_max_bytes = options.delete(:max_bytes) || max_bytes
fetch_min_bytes = options.delete(:min_bytes) || min_bytes

Expand All @@ -96,7 +101,8 @@ def fetch(options = {})
partition_response = topic_response.partition_fetch_responses.first

unless partition_response.error == Errors::NO_ERROR_CODE
if @offset < 0 && Errors::ERROR_CODES[partition_response.error] == Errors::OffsetOutOfRange
if @offset < 0 &&
Errors::ERROR_CODES[partition_response.error] == Errors::OffsetOutOfRange
@offset = :earliest_offset
return fetch(options)
end
Expand Down Expand Up @@ -125,7 +131,7 @@ def next_offset
private
def handle_options(options)
@max_bytes = options.delete(:max_bytes) || 1024*1024
@min_bytes = options.delete(:min_bytes) || 0
@min_bytes = options.delete(:min_bytes) || 1
@max_wait_ms = options.delete(:max_wait_ms) || 10_000
if options.keys.any?
raise ArgumentError, "Unknown options: #{options.keys.inspect}"
Expand Down
17 changes: 17 additions & 0 deletions spec/integration/simple/simple_producer_and_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,23 @@
expect(messages.empty?).to eq(true)
end

it "waits for messages" do
# Create topic
@c = Connection.new("localhost", 9092, "metadata_fetcher")
@c.topic_metadata(["simple_wait_test"])

sleep 5
@consumer = PartitionConsumer.new("test_consumer", "localhost", 9092,
"simple_wait_test", 0, :earliest_offset,
:max_wait_ms => 2500)

require 'benchmark'
n = Benchmark.realtime do
@consumer.fetch
end
expect(n).to be_within(0.25).of(2.5)
end

# Not sure what's going on here, will revisit.
=begin
it "fetches larger messages with a larger max bytes size" do
Expand Down
8 changes: 4 additions & 4 deletions spec/unit/partition_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
describe "creation" do
context "when passed unknown options" do
it "raises an ArgumentError" do
expect { PartitionConsumer.new("test_client", "localhost", 9092, "test_topic", 0,:earliest_offset, :unknown => true) }.to raise_error(ArgumentError)
expect { PartitionConsumer.new("test_client", "localhost", 9092, "test_topic", 0, :earliest_offset, :unknown => true) }.to raise_error(ArgumentError)
end
end

context "when passed an unknown offset" do
it "raises an ArgumentError" do
expect { PartitionConsumer.new("test_client", "localhost", 9092, "test_topic", 0,:coolest_offset) }.to raise_error(ArgumentError)
expect { PartitionConsumer.new("test_client", "localhost", 9092, "test_topic", 0, :coolest_offset) }.to raise_error(ArgumentError)
end
end
end
Expand Down Expand Up @@ -97,13 +97,13 @@
end

it "uses object defaults" do
@connection.should_receive(:fetch).with(10_000, 0, anything)
@connection.should_receive(:fetch).with(10_000, 1, anything)
@pc.fetch
end

context "when options are passed" do
it "overrides object defaults" do
@connection.should_receive(:fetch).with(20_000, 0, anything)
@connection.should_receive(:fetch).with(20_000, 1, anything)
@pc = PartitionConsumer.new("test_client", "localhost", 9092, "test_topic", 0, :earliest_offset, :max_wait_ms => 20_000)

@pc.fetch
Expand Down

0 comments on commit 5140653

Please sign in to comment.