Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MultiProcess consumer #35

Merged
merged 20 commits into from
Jul 26, 2013
Merged

MultiProcess consumer #35

merged 20 commits into from
Jul 26, 2013

Conversation

mahendra
Copy link
Collaborator

kafka-python consumers

mahendra added 19 commits June 12, 2013 14:56
Currently the kafka SimpleConsumer consumes messages from all
partitions. This commit will ensure that data is consumed only
from partitions specified during init
The implementation is done by using simple options to
Kafka Fetch Request

Also in the SimpleConsumer iterator, update the offset before the
message is yielded. This is so that the consumer state is not lost
if certain cases.
For eg: the message is yielded and consumed by the caller,
but the caller does not come back into the generator again.
The message will be consumed but the status is not updated in
the consumer
Other changes
* Put a message size restriction on the shared queue
  - to prevent message overload
* Wait for a while after each process is started (in constructor)
* Wait for a while in each child if the consumer does not return any messages
  Just to be nice to the CPU.
* Control the start event more granularly - this prevents infinite loops
  if the control does not return to the generator. For eg:
  for msg in consumer:
      assert False
* Update message status before yield
Conflicts:
	kafka/consumer.py
This was hidden because of another bug in offset management
@mahendra
Copy link
Collaborator Author

@mumrah Hey, once this, #33 and #36 and are done, we can easily add gevent support. Matter of 20-30 lines of code. Basically, if you see, we are avoiding use of threads completely.

With these three, we are using Queue() and Event() and Pool() exclusively.
Python threading, multiprocess and gevent provide support for this. We can easily provide multi-threaded, mult-process and gevent-async based consumers and producers. I tried some sample code. It is very very easy.

@mahendra
Copy link
Collaborator Author

This will take care of #30 also

In the current patch get_messages(count=1) would return zero messages
the first time it is invoked after a consumer was initialized.
@mumrah mumrah merged commit 1d278f0 into dpkp:master Jul 26, 2013
@mumrah
Copy link
Collaborator

mumrah commented Jul 26, 2013

Merged manually, tests passing.

Thanks again, @mahendra!

@mahendra
Copy link
Collaborator Author

cool. thanks for reviewing @mumrah . just got back to office. will work on updating other pull requests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants