-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
conn.py performance improvements, make examples work, add another example #134
Changes from 3 commits
ee7e86e
9732ed1
25e63cc
3b39d9d
112158f
888f206
a6fc260
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,23 +1,48 @@ | ||
import logging | ||
#!/usr/bin/env python | ||
import threading, logging, time | ||
|
||
from kafka.client import KafkaClient, FetchRequest, ProduceRequest | ||
from kafka.client import KafkaClient | ||
from kafka.consumer import SimpleConsumer | ||
from kafka.producer import SimpleProducer | ||
|
||
def produce_example(client): | ||
producer = SimpleProducer(client, "my-topic") | ||
producer.send_messages("test") | ||
class Producer(threading.Thread): | ||
daemon = True | ||
|
||
def consume_example(client): | ||
consumer = SimpleConsumer(client, "test-group", "my-topic") | ||
for message in consumer: | ||
print(message) | ||
def run(self): | ||
client = KafkaClient("localhost", 9092) | ||
producer = SimpleProducer(client) | ||
|
||
while True: | ||
producer.send_messages('my-topic', "test") | ||
producer.send_messages('my-topic', "\xc2Hola, mundo!") | ||
|
||
time.sleep(1) | ||
|
||
|
||
class Consumer(threading.Thread): | ||
daemon = True | ||
|
||
def run(self): | ||
client = KafkaClient("localhost", 9092) | ||
consumer = SimpleConsumer(client, "test-group", "my-topic") | ||
|
||
for message in consumer: | ||
print(message) | ||
|
||
def main(): | ||
client = KafkaClient("localhost", 9092) | ||
produce_example(client) | ||
consume_example(client) | ||
threads = [ | ||
Producer(), | ||
Consumer() | ||
] | ||
|
||
for t in threads: | ||
t.start() | ||
|
||
time.sleep(5) | ||
|
||
if __name__ == "__main__": | ||
logging.basicConfig(level=logging.DEBUG) | ||
logging.basicConfig( | ||
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', | ||
level=logging.DEBUG | ||
) | ||
main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,11 +28,10 @@ def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): | |
super(KafkaConnection, self).__init__() | ||
self.host = host | ||
self.port = port | ||
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
self._sock.connect((host, port)) | ||
self.timeout = timeout | ||
self._sock.settimeout(self.timeout) | ||
self._dirty = False | ||
self._sock = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. self._sock is required because the first line of reinit() asks "if not self._sock" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah you mean in |
||
|
||
self.reinit() | ||
|
||
def __repr__(self): | ||
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port) | ||
|
@@ -47,24 +46,28 @@ def _raise_connection_error(self): | |
|
||
def _read_bytes(self, num_bytes): | ||
bytes_left = num_bytes | ||
resp = '' | ||
responses = [] | ||
|
||
log.debug("About to read %d bytes from Kafka", num_bytes) | ||
if self._dirty: | ||
self.reinit() | ||
|
||
while bytes_left: | ||
try: | ||
data = self._sock.recv(bytes_left) | ||
data = self._sock.recv(min(bytes_left, 4096)) | ||
except socket.error: | ||
log.exception('Unable to receive data from Kafka') | ||
self._raise_connection_error() | ||
|
||
if data == '': | ||
log.error("Not enough data to read this response") | ||
self._raise_connection_error() | ||
|
||
bytes_left -= len(data) | ||
log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) | ||
resp += data | ||
responses.append(data) | ||
|
||
return resp | ||
return ''.join(responses) | ||
|
||
################## | ||
# Public API # | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
#!/usr/bin/env python | ||
import threading, logging, time, collections | ||
|
||
from kafka.client import KafkaClient | ||
from kafka.consumer import SimpleConsumer | ||
from kafka.producer import SimpleProducer | ||
|
||
msg_size = 524288 | ||
|
||
class Producer(threading.Thread): | ||
daemon = True | ||
big_msg = "1" * msg_size | ||
|
||
def run(self): | ||
client = KafkaClient("localhost", 9092) | ||
producer = SimpleProducer(client) | ||
self.sent = 0 | ||
|
||
while True: | ||
producer.send_messages('my-topic', self.big_msg) | ||
self.sent += 1 | ||
|
||
|
||
class Consumer(threading.Thread): | ||
daemon = True | ||
|
||
def run(self): | ||
client = KafkaClient("localhost", 9092) | ||
consumer = SimpleConsumer(client, "test-group", "my-topic", | ||
max_buffer_size = None, | ||
) | ||
self.valid = 0 | ||
self.invalid = 0 | ||
|
||
for message in consumer: | ||
if len(message.message.value) == msg_size: | ||
self.valid += 1 | ||
else: | ||
self.invalid += 1 | ||
|
||
def main(): | ||
threads = [ | ||
Producer(), | ||
Consumer() | ||
] | ||
|
||
for t in threads: | ||
t.start() | ||
|
||
time.sleep(10) | ||
print 'Messages sent: %d' % threads[0].sent | ||
print 'Messages recvd: %d' % threads[1].valid | ||
print 'Messages invalid: %d' % threads[1].invalid | ||
|
||
if __name__ == "__main__": | ||
logging.basicConfig( | ||
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s', | ||
level=logging.DEBUG | ||
) | ||
main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this was already wrong, but it should be "_TOO_LARGE"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Patch issued.