Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offset enhancement for consumer #162

Closed
wants to merge 3 commits into from
Closed

Conversation

yongkun
Copy link

@yongkun yongkun commented Apr 30, 2014

Offset enhancement for consumer. User can pass offset as a parameter to consumer, with the following values:
Previous (-1): position stores in zookeeper last time;
CurrentBeginning (-2): current beginning offset (may not be 0 due to TTL);
PreviousOrCurrentBeginning (-3): Get previous offset firstly, if not available, use current beginning;
Latest (-4): Start from latest, like tail;
Any other value >= 0: actual offset, raise exception if the offset is invalid.
Yongkun Wang and KiranRaj Mariyappa

@wizzat
Copy link
Collaborator

wizzat commented May 1, 2014

Overall: I like the proposal. I think it's a really great way to handle offset management. There was some great commentary on absolute offsets a while back in another issue, and I'm not sure that there is any kind of sensible meaning in seeking to an absolute offset in a multipartition consumer. Absolute seeks really only make sense when consuming from a single partition.

@wizzat
Copy link
Collaborator

wizzat commented May 1, 2014

Also, the Travis CI build is failing. I think it might be because the Travis build still uses Kafka 0.8.0. Did the tests work for you locally?

@yongkun
Copy link
Author

yongkun commented May 1, 2014

Mark, thanks for the nice review. I will revise the code.
I forget to submit the test code, so the CI build failed. Will submit it soon.
I agree with you about the absolute offset. Shall I remove it or leave it as an option for user?

@wizzat
Copy link
Collaborator

wizzat commented May 1, 2014

I think a practical solution would be to raise ValueError if len(partitions) > 1. There was talk of breaking seek() up. The previous discussion was here: #146

@wizzat
Copy link
Collaborator

wizzat commented May 1, 2014

FWIW, if you end up having test code that relies on Kafka 0.8.1, I have a branch that allows you to test against it (PR #158).

@yongkun
Copy link
Author

yongkun commented May 1, 2014

Yes, I have tested it with Kafka 0.8.1 and we are using it. Thanks for the reference about the branch for test.

#Zero = 0 # start from 0, may not be available due
# to ttl

Previous = -1 # position stores in zookeeper last time

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: Recommend that you refactor all constants be in capitals with underscores, e.g. PREVIOUS

See PEP 8

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks @girlprogrammer

@yongkun
Copy link
Author

yongkun commented May 7, 2014

@wizzat Could you tell me how to use your Kafka 0.8.1 branch?

@wizzat
Copy link
Collaborator

wizzat commented May 7, 2014

Sure,

So I'd make a branch (of your branch) and merge the 0.8.1 branch in. Then run ./build_integration.sh and:

$ tox
$ KAFKA_VERSION=0.8.1 tox
$ KAFKA_VERSION=0.8.0 tox

Optionally you can pass in the -e py27 flag if you want to restrict the python version.

@dpkp
Copy link
Owner

dpkp commented Sep 4, 2014

I think something like this is important to merge, but I have a few thoughts on implementation:

(1) I dont like passing in a single absolute offset to a consumer that is currently topic-focused. That isn't going to make sense for multi-partition topics.
(2) I think we probably should focus on an api similar to the java client, which uses the auto.offset.reset = largest|smallest|raise config setting to tell the consumer how to deal with OffsetOutOfRange errors (jump to the largest available, jump to the smallest available, or raise the error)
(3) for absolute offsets we probably want a separate method to set partition-level offsets. perhaps something like goto_offset(offset, partition=None)

is this PR still active? it looks like there have been a lot of changes to the master branch that would need to be merged in. if not active, it probably makes sense to create a separate PR

@dpkp
Copy link
Owner

dpkp commented Jun 12, 2015

I believe this is addressed in 0.9.4 / #296 with SimpleConsumer.reset_partition_offset

@dpkp dpkp closed this Jun 12, 2015
wbarnha added a commit to orange-kao/kafka-python that referenced this pull request Mar 9, 2024
Too many MRs to review... so little time.
bradenneal1 pushed a commit to bradenneal1/kafka-python that referenced this pull request May 16, 2024
Too many MRs to review... so little time.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants