Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer async partitions #437

Merged
merged 2 commits into from
Apr 29, 2015
Merged

Consumer async partitions #437

merged 2 commits into from
Apr 29, 2015

Conversation

eapache
Copy link
Contributor

@eapache eapache commented Apr 28, 2015

Feed consumer responses to the user asynchronously

Give each PartitionConsumer a goroutine whose job is to feed the user, and have
the brokerConsumer feed each partition with the response, then wait for acks.

This makes the consumer much more resilient to weird user consumption ordering,
as the old "one partition at a time" logic was very unforgiving.

Fixes #325
Replaces #398

@Shopify/kafka careful 👀 on this please

@eapache eapache force-pushed the consumer-async-partitions branch from fc7feed to d5141e2 Compare April 28, 2015 17:08
@eapache
Copy link
Contributor Author

eapache commented Apr 28, 2015

After several network errors and one (valid) race condition bug found, CI is finally green.

Logger.Printf("consumer/%s/%d abandoned broker %d because %s\n", child.topic, child.partition, child.broker.broker.ID(), err)
child.broker.acks <- child
child.trigger <- none{}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally don't like using fallthrough like this, but I suppose it has always worked like this so no need to change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative is to duplicate these three lines twice (and the sendError call one additional time). Using fallthrough explicitly, (as opposed to the stupid implicit C version) seems much more elegant to me.

I don't want to derail the PR (as you mentioned, this code is just moved, not changed), but you've piqued my curiousity - what bothers you about this pattern?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just way harder to parse in my mind, especially with the default case not be the last one. I'd prefer a bunch of if statements.

@eapache
Copy link
Contributor Author

eapache commented Apr 29, 2015

Thoughts on the third commit? It does make the behaviour less subtle, I think, though perhaps at the cost of another moving part...

@wvanbergen
Copy link
Contributor

I like this approach better. Using a WaitGroup for waiting for a bunch of goroutines feels much more natural to me.

I am not a big fan of the extra bool field on PartitionConsumer. Maybe we should make the channel hold a wrapper type with a partitionConsumer and a bool, instead of adding a member to the partitionConsumer type?

(Rant: Go really should support multi-value channels so you can simply feed multi-value return values to a channel without having to wrap them in a shitty type.)

@eapache
Copy link
Contributor Author

eapache commented Apr 29, 2015

Maybe we should make the channel hold a wrapper type with a partitionConsumer and a bool, instead of adding a member to the partitionConsumer type?

Which channel?

@wvanbergen
Copy link
Contributor

the acks channel

@eapache
Copy link
Contributor Author

eapache commented Apr 29, 2015

There is no acks channel anymore, it's a wait group o.o

@wvanbergen
Copy link
Contributor

Argh, I am an idiot.

@eapache eapache force-pushed the consumer-async-partitions branch from 41464b3 to 9772a2c Compare April 29, 2015 17:00
eapache added 2 commits April 29, 2015 17:03
Give each PartitionConsumer a goroutine whose job is to feed the user, and have
the brokerConsumer feed each partition with the response, then wait for acks.

This makes the consumer much more resilient to weird user consumption ordering,
as the old "one partition at a time" logic was very unforgiving.
@eapache eapache force-pushed the consumer-async-partitions branch from 9772a2c to 3bd5a52 Compare April 29, 2015 17:03
@eapache
Copy link
Contributor Author

eapache commented Apr 29, 2015

OK, this version removes the needsDispatch flag by switching the dying channel to carry actual useful values. Two side-effects:

  • we can't close dying in the AsyncClose function since we might have an internal thread writing to it concurrently now; fortunately writing a nil value is just as good
  • simplifies the switch which was bothering you, which means it's now just as easy to not use fallthrough, so do that

@wvanbergen
Copy link
Contributor

I think this makes sense, but we should stress test this as well.

@shivnagarajan @thegedge, opinions on this?

@eapache
Copy link
Contributor Author

eapache commented Apr 29, 2015

I will merge this now to prep master for stress testing, but feel free to continue commenting @shivnagarajan and @thegedge.

eapache added a commit that referenced this pull request Apr 29, 2015
@eapache eapache merged commit 1bd758a into master Apr 29, 2015
@eapache eapache deleted the consumer-async-partitions branch April 29, 2015 20:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

PartitionConsumer.Close() hangs frequently
2 participants