Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

preallocate message slice in consumer.go and random fixes #1298

Merged
merged 3 commits into from
Mar 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 29 additions & 33 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import (

// ConsumerMessage encapsulates a Kafka message returned by the consumer.
type ConsumerMessage struct {
Key, Value []byte
Topic string
Partition int32
Offset int64
Headers []*RecordHeader // only set if kafka is version 0.11+
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
Headers []*RecordHeader // only set if kafka is version 0.11+

Key, Value []byte
Topic string
Partition int32
Offset int64
}

// ConsumerError is what is provided to the user when an error occurs.
Expand Down Expand Up @@ -75,12 +76,11 @@ type Consumer interface {
}

type consumer struct {
client Client
conf *Config

lock sync.Mutex
conf *Config
children map[string]map[int32]*partitionConsumer
brokerConsumers map[*Broker]*brokerConsumer
client Client
lock sync.Mutex
}

// NewConsumer creates a new consumer using the given broker addresses and configuration.
Expand Down Expand Up @@ -258,7 +258,7 @@ func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
//
// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of
// consumer tear-down & return imediately. Continue to loop, servicing the Messages channel until the teardown process
// consumer tear-down & return immediately. Continue to loop, servicing the Messages channel until the teardown process
// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call
// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
// also drain the Messages channel, harvest all errors & return them once cleanup has completed.
Expand Down Expand Up @@ -295,24 +295,22 @@ type PartitionConsumer interface {

type partitionConsumer struct {
highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
consumer *consumer
conf *Config
topic string
partition int32

consumer *consumer
conf *Config
broker *brokerConsumer
messages chan *ConsumerMessage
errors chan *ConsumerError
feeder chan *FetchResponse

trigger, dying chan none
responseResult error
closeOnce sync.Once

fetchSize int32
offset int64

retries int32
topic string
partition int32
responseResult error
fetchSize int32
offset int64
retries int32
}

var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
Expand All @@ -335,9 +333,8 @@ func (child *partitionConsumer) computeBackoff() time.Duration {
if child.conf.Consumer.Retry.BackoffFunc != nil {
retries := atomic.AddInt32(&child.retries, 1)
return child.conf.Consumer.Retry.BackoffFunc(int(retries))
} else {
return child.conf.Consumer.Retry.Backoff
}
return child.conf.Consumer.Retry.Backoff
}

func (child *partitionConsumer) dispatcher() {
Expand Down Expand Up @@ -529,7 +526,8 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
}

func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
var messages []*ConsumerMessage
messages := make([]*ConsumerMessage, 0, len(batch.Records))

for _, rec := range batch.Records {
offset := batch.FirstOffset + rec.OffsetDelta
if offset < child.offset {
Expand Down Expand Up @@ -625,15 +623,13 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
return messages, nil
}

// brokerConsumer

type brokerConsumer struct {
consumer *consumer
broker *Broker
input chan *partitionConsumer
newSubscriptions chan []*partitionConsumer
wait chan none
subscriptions map[*partitionConsumer]none
wait chan none
acks sync.WaitGroup
refs int
}
Expand All @@ -655,14 +651,14 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
return bc
}

// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
// so the main goroutine can block waiting for work if it has none.
func (bc *brokerConsumer) subscriptionManager() {
var buffer []*partitionConsumer

// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
// so the main goroutine can block waiting for work if it has none.
for {
if len(buffer) > 0 {
select {
Expand Down Expand Up @@ -695,10 +691,10 @@ done:
close(bc.newSubscriptions)
}

//subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
func (bc *brokerConsumer) subscriptionConsumer() {
<-bc.wait // wait for our first piece of work

// the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
for newSubscriptions := range bc.newSubscriptions {
bc.updateSubscriptions(newSubscriptions)

Expand Down Expand Up @@ -744,8 +740,8 @@ func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsu
}
}

//handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed
func (bc *brokerConsumer) handleResponses() {
// handles the response codes left for us by our subscriptions, and abandons ones that have been closed
for child := range bc.subscriptions {
result := child.responseResult
child.responseResult = nil
Expand Down
8 changes: 5 additions & 3 deletions record.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1
)

//RecordHeader stores key and value for a record header
type RecordHeader struct {
Key []byte
Value []byte
Expand All @@ -33,15 +34,16 @@ func (h *RecordHeader) decode(pd packetDecoder) (err error) {
return nil
}

//Record is kafka record type
type Record struct {
Headers []*RecordHeader

Attributes int8
TimestampDelta time.Duration
OffsetDelta int64
Key []byte
Value []byte
Headers []*RecordHeader

length varintLengthField
length varintLengthField
}

func (r *Record) encode(pe packetEncoder) error {
Expand Down