Skip to content

Commit

Permalink
Add a Queue-like producer/consumer
Browse files Browse the repository at this point in the history
Creates a producer process and one consumer process per partition. Uses
`multiprocessing.Queue` for communication between the parent process and
the producer/consumers.

```python
kafka = KafkaClient("localhost", 9092)
q = KafkaQueue(kafka, client="test-queue", partitions=[0,1])
q.put("test")
q.get()
q.close()
kafka.close()
```

Ref #8
  • Loading branch information
mumrah committed Nov 20, 2012
1 parent 0c61f44 commit 330ddbc
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 0 deletions.
3 changes: 3 additions & 0 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ def __init__(self, host, port, bufsize=1024):
self._sock.settimeout(10)
log.debug("Connected to %s on %d", host, port)

def __copy__(self):
return KafkaClient(self.host, self.port, self.bufsize)

######################
# Protocol Stuff #
######################
Expand Down
119 changes: 119 additions & 0 deletions kafka/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from copy import copy
import logging
from multiprocessing import Process, Queue, Event
from Queue import Empty
import time

from .client import KafkaClient, FetchRequest, ProduceRequest

log = logging.getLogger("kafka")

class KafkaConsumerProcess(Process):
def __init__(self, client, topic, partition, out_queue, barrier):
self.client = copy(client)
self.topic = topic
self.partition = partition
self.out_queue = out_queue
self.barrier = barrier
self.consumer_sleep = 0.2
Process.__init__(self)

def config(self, consumer_sleep):
self.consumer_sleep = consumer_sleep / 1000.

def run(self):
self.barrier.wait()
log.info("Starting Consumer")
fetchRequest = FetchRequest(self.topic, self.partition, offset=0, size=self.client.bufsize)
while True:
if self.barrier.is_set() == False:
self.client.close()
break
lastOffset = fetchRequest.offset
(messages, fetchRequest) = self.client.get_message_set(fetchRequest)
if fetchRequest.offset == lastOffset:
log.debug("No more data for this partition, sleeping a bit (200ms)")
time.sleep(self.consumer_sleep)
continue
for message in messages:
self.out_queue.put(message)

class KafkaProducerProcess(Process):
def __init__(self, client, topic, in_queue, barrier):
self.client = copy(client)
self.topic = topic
self.in_queue = in_queue
self.barrier = barrier
self.producer_flush_buffer = 100
self.producer_flush_timeout = 2.0
self.producer_timeout = 0.1
Process.__init__(self)

def config(self, producer_flush_buffer, producer_flush_timeout, producer_timeout):
self.producer_flush_buffer = producer_flush_buffer
self.producer_flush_timeout = producer_flush_timeout / 1000.
self.producer_timeout = producer_timeout / 1000.

def run(self):
self.barrier.wait()
log.info("Starting Producer")
messages = []
last_produce = time.time()

def flush(messages):
self.client.send_message_set(ProduceRequest(self.topic, -1, messages))
del messages[:]

while True:
if self.barrier.is_set() == False:
log.info("Producer shut down. Flushing messages")
flush(messages)
self.client.close()
break
if len(messages) > self.producer_flush_buffer:
log.debug("Message count threashold reached. Flushing messages")
flush(messages)
elif (time.time() - last_produce) > self.producer_flush_timeout:
log.debug("Producer timeout reached. Flushing messages")
flush(messages)
last_produce = time.time()
try:
messages.append(KafkaClient.create_message(self.in_queue.get(True, self.producer_timeout)))
except Empty:
continue

class KafkaQueue(object):
def __init__(self, client, topic, partitions):
self.in_queue = Queue()
self.out_queue = Queue()
self.consumers = []
self.barrier = Event()

# Initialize and start consumer threads
for partition in partitions:
consumer = KafkaConsumerProcess(client, topic, partition, self.in_queue, self.barrier)
consumer.config(consumer_sleep=200)
consumer.start()
self.consumers.append(consumer)

# Initialize and start producer thread
self.producer = KafkaProducerProcess(client, topic, self.out_queue, self.barrier)
self.producer.config(producer_flush_buffer=500, producer_flush_timeout=2000, producer_timeout=100)
self.producer.start()

# Trigger everything to start
self.barrier.set()

def get(self, block=True, timeout=None):
return self.in_queue.get(block, timeout).payload

def put(self, msg, block=True, timeout=None):
return self.out_queue.put(msg, block, timeout)

def close(self):
self.in_queue.close()
self.out_queue.close()
self.barrier.clear()
self.producer.join()
for consumer in self.consumers:
consumer.join()
26 changes: 26 additions & 0 deletions test/integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import unittest

from kafka.client import KafkaClient, ProduceRequest, FetchRequest, OffsetRequest
from kafka.queue import KafkaQueue

def get_open_port():
sock = socket.socket()
Expand Down Expand Up @@ -231,5 +232,30 @@ def test_10k_messages(self):
#self.assertTrue(self.server.wait_for("Created log for 'test-10k'-1"))
#self.assertTrue(self.server.wait_for("Flushing log 'test-10k-1'"))

def test_queue(self):
# Send 1000 messages
q = KafkaQueue(self.kafka, "test-queue", [0,1])
t1 = time.time()
for i in range(1000):
q.put("test %d" % i)
t2 = time.time()

# Wait for the producer to fully flush
time.sleep(2)

# Copy all the messages into a list
t1 = time.time()
consumed = []
for i in range(1000):
consumed.append(q.get())
t2 = time.time()

# Verify everything is there
for i in range(1000):
self.assertTrue("test %d" % i in consumed)

# Shutdown the queue
q.close()

if __name__ == "__main__":
unittest.main()

0 comments on commit 330ddbc

Please sign in to comment.