From ee7e86ea712de0a0390e64752c5cf9180c1681b5 Mon Sep 17 00:00:00 2001 From: Mark Roberts Date: Tue, 25 Feb 2014 01:02:39 -0800 Subject: [PATCH 1/5] Update example.py to compile, add friendly load_example.py --- example.py | 48 ++++++++++++++++++++++++++++++----------- load_example.py | 57 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 13 deletions(-) mode change 100644 => 100755 example.py create mode 100755 load_example.py diff --git a/example.py b/example.py old mode 100644 new mode 100755 index 3a2dc928b..0eac0a506 --- a/example.py +++ b/example.py @@ -1,23 +1,45 @@ -import logging +#!/usr/bin/env python +import threading, logging, time -from kafka.client import KafkaClient, FetchRequest, ProduceRequest +from kafka.client import KafkaClient from kafka.consumer import SimpleConsumer from kafka.producer import SimpleProducer -def produce_example(client): - producer = SimpleProducer(client, "my-topic") - producer.send_messages("test") +class Producer(threading.Thread): + daemon = True -def consume_example(client): - consumer = SimpleConsumer(client, "test-group", "my-topic") - for message in consumer: - print(message) + def run(self): + client = KafkaClient("localhost", 9092) + producer = SimpleProducer(client) + + while True: + producer.send_messages('my-topic', "test") + producer.send_messages('my-topic', "\xc2Hola, mundo!") + + time.sleep(1) + + +class Consumer(threading.Thread): + daemon = True + + def run(self): + client = KafkaClient("localhost", 9092) + consumer = SimpleConsumer(client, "test-group", "my-topic") + + for message in consumer: + print(message) def main(): - client = KafkaClient("localhost", 9092) - produce_example(client) - consume_example(client) + threads = [ + Producer(), + Consumer() + ] + + for t in threads: + t.start() + + time.sleep(5) if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) + logging.basicConfig(level=logging.WARN) main() diff --git a/load_example.py b/load_example.py new file mode 100755 index 000000000..0ef07b6be --- /dev/null +++ b/load_example.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +import threading, logging, time, collections + +from kafka.client import KafkaClient +from kafka.consumer import SimpleConsumer +from kafka.producer import SimpleProducer + +msg_size = 524288 + +class Producer(threading.Thread): + daemon = True + big_msg = "1" * msg_size + + def run(self): + client = KafkaClient("localhost", 9092) + producer = SimpleProducer(client) + self.sent = 0 + + while True: + producer.send_messages('my-topic', self.big_msg) + self.sent += 1 + + +class Consumer(threading.Thread): + daemon = True + + def run(self): + client = KafkaClient("localhost", 9092) + consumer = SimpleConsumer(client, "test-group", "my-topic", + max_buffer_size = None, + ) + self.valid = 0 + self.invalid = 0 + + for message in consumer: + if len(message.message.value) == msg_size: + self.valid += 1 + else: + self.invalid += 1 + +def main(): + threads = [ + Producer(), + Consumer() + ] + + for t in threads: + t.start() + + time.sleep(10) + print 'Messages sent: %d' % threads[0].sent + print 'Messages recvd: %d' % threads[1].valid + print 'Messages invalid: %d' % threads[1].invalid + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + main() From 9732ed1670ef0739956900df37c0c77699628ec7 Mon Sep 17 00:00:00 2001 From: Mark Roberts Date: Tue, 25 Feb 2014 01:06:17 -0800 Subject: [PATCH 2/5] Minor refactor in conn.py, update version in __init__.py, add ErrorString --- kafka/client.py | 6 +++--- kafka/common.py | 36 +++++++++++++++++++++--------------- kafka/conn.py | 19 +++++++++++-------- load_example.py | 2 +- 4 files changed, 36 insertions(+), 27 deletions(-) diff --git a/kafka/client.py b/kafka/client.py index 155f65883..9578ee8e1 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -5,7 +5,7 @@ from functools import partial from itertools import count -from kafka.common import (ErrorMapping, TopicAndPartition, +from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition, ConnectionError, FailedPayloadsError, BrokerResponseError, PartitionUnavailableError, KafkaUnavailableError, KafkaRequestError) @@ -174,8 +174,8 @@ def _raise_on_response_error(self, resp): self.reset_topic_metadata(resp.topic) raise BrokerResponseError( - "Request for %s failed with errorcode=%d" % - (TopicAndPartition(resp.topic, resp.partition), resp.error)) + "Request for %s failed with errorcode=%d (%s)" % + (TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error])) ################# # Public API # diff --git a/kafka/common.py b/kafka/common.py index c0a1a6a1a..583bb3225 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -48,22 +48,28 @@ TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"]) +ErrorStrings = { + -1 : 'UNKNOWN', + 0 : 'NO_ERROR', + 1 : 'OFFSET_OUT_OF_RANGE', + 2 : 'INVALID_MESSAGE', + 3 : 'UNKNOWN_TOPIC_OR_PARTITON', + 4 : 'INVALID_FETCH_SIZE', + 5 : 'LEADER_NOT_AVAILABLE', + 6 : 'NOT_LEADER_FOR_PARTITION', + 7 : 'REQUEST_TIMED_OUT', + 8 : 'BROKER_NOT_AVAILABLE', + 9 : 'REPLICA_NOT_AVAILABLE', + 10 : 'MESSAGE_SIZE_TO_LARGE', + 11 : 'STALE_CONTROLLER_EPOCH', + 12 : 'OFFSET_METADATA_TOO_LARGE', +} + class ErrorMapping(object): - # Many of these are not actually used by the client - UNKNOWN = -1 - NO_ERROR = 0 - OFFSET_OUT_OF_RANGE = 1 - INVALID_MESSAGE = 2 - UNKNOWN_TOPIC_OR_PARTITON = 3 - INVALID_FETCH_SIZE = 4 - LEADER_NOT_AVAILABLE = 5 - NOT_LEADER_FOR_PARTITION = 6 - REQUEST_TIMED_OUT = 7 - BROKER_NOT_AVAILABLE = 8 - REPLICA_NOT_AVAILABLE = 9 - MESSAGE_SIZE_TO_LARGE = 10 - STALE_CONTROLLER_EPOCH = 11 - OFFSET_METADATA_TOO_LARGE = 12 + pass + +for k, v in ErrorStrings.items(): + setattr(ErrorMapping, v, k) ################# # Exceptions # diff --git a/kafka/conn.py b/kafka/conn.py index 2b8f1c283..cc946fcb9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -28,11 +28,10 @@ def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): super(KafkaConnection, self).__init__() self.host = host self.port = port - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((host, port)) self.timeout = timeout - self._sock.settimeout(self.timeout) - self._dirty = False + self._sock = None + + self.reinit() def __repr__(self): return "" % (self.host, self.port) @@ -47,24 +46,28 @@ def _raise_connection_error(self): def _read_bytes(self, num_bytes): bytes_left = num_bytes - resp = '' + responses = [] + log.debug("About to read %d bytes from Kafka", num_bytes) if self._dirty: self.reinit() + while bytes_left: try: - data = self._sock.recv(bytes_left) + data = self._sock.recv(min(bytes_left, 4096)) except socket.error: log.exception('Unable to receive data from Kafka') self._raise_connection_error() + if data == '': log.error("Not enough data to read this response") self._raise_connection_error() + bytes_left -= len(data) log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) - resp += data + responses.append(data) - return resp + return ''.join(responses) ################## # Public API # diff --git a/load_example.py b/load_example.py index 0ef07b6be..4da584121 100755 --- a/load_example.py +++ b/load_example.py @@ -53,5 +53,5 @@ def main(): print 'Messages invalid: %d' % threads[1].invalid if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) + logging.basicConfig(level=logging.WARN) main() From 25e63cc4223f02f9deb3d4d393db308eac7f640b Mon Sep 17 00:00:00 2001 From: Mark Roberts Date: Tue, 25 Feb 2014 02:18:46 -0800 Subject: [PATCH 3/5] Update logging config to include timestamps for better/easier debugging --- example.py | 5 ++++- load_example.py | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/example.py b/example.py index 0eac0a506..43298e8b7 100755 --- a/example.py +++ b/example.py @@ -41,5 +41,8 @@ def main(): time.sleep(5) if __name__ == "__main__": - logging.basicConfig(level=logging.WARN) + logging.basicConfig( + format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', + level=logging.DEBUG + ) main() diff --git a/load_example.py b/load_example.py index 4da584121..95f0c7f25 100755 --- a/load_example.py +++ b/load_example.py @@ -53,5 +53,8 @@ def main(): print 'Messages invalid: %d' % threads[1].invalid if __name__ == "__main__": - logging.basicConfig(level=logging.WARN) + logging.basicConfig( + format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', + level=logging.DEBUG + ) main() From 3b39d9d6589ee46ae4d31fb078f44019a2810983 Mon Sep 17 00:00:00 2001 From: Mark Roberts Date: Tue, 25 Feb 2014 16:51:36 -0800 Subject: [PATCH 4/5] Fix grammar in error string --- kafka/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/common.py b/kafka/common.py index 583bb3225..70d67b4f1 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -60,7 +60,7 @@ 7 : 'REQUEST_TIMED_OUT', 8 : 'BROKER_NOT_AVAILABLE', 9 : 'REPLICA_NOT_AVAILABLE', - 10 : 'MESSAGE_SIZE_TO_LARGE', + 10 : 'MESSAGE_SIZE_TOO_LARGE', 11 : 'STALE_CONTROLLER_EPOCH', 12 : 'OFFSET_METADATA_TOO_LARGE', } From 888f206d5417e95f26de407b28fe935950aea2c9 Mon Sep 17 00:00:00 2001 From: Mark Roberts Date: Wed, 26 Feb 2014 21:38:58 -0800 Subject: [PATCH 5/5] Update load_example --- load_example.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/load_example.py b/load_example.py index 95f0c7f25..1f8b41820 100755 --- a/load_example.py +++ b/load_example.py @@ -12,7 +12,7 @@ class Producer(threading.Thread): big_msg = "1" * msg_size def run(self): - client = KafkaClient("localhost", 9092) + client = KafkaClient("localhost:9092") producer = SimpleProducer(client) self.sent = 0 @@ -25,7 +25,7 @@ class Consumer(threading.Thread): daemon = True def run(self): - client = KafkaClient("localhost", 9092) + client = KafkaClient("localhost:9092") consumer = SimpleConsumer(client, "test-group", "my-topic", max_buffer_size = None, )