From ed4bdee6fa544a2c45a557726723b2a7d7e1958b Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 22 Nov 2019 10:58:11 -0500 Subject: [PATCH] Workaround for missing offsets in Azure's OffsetFetchResponse --- broker.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/broker.go b/broker.go index 81467498c..6a7dbeee1 100644 --- a/broker.go +++ b/broker.go @@ -387,6 +387,23 @@ func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, return nil, err } + // In new consumer group / partition pairs, Microsoft's Azure Event Hubs + // communicates the "no error / new offset" state by omitting the requested + // entries from the OffsetFetchResponse (in contrast to other implementations + // which indicate this by returning an explicit offset of -1). To handle this + // case, we check all entries in the request and add an offset to the response + // table for any that are missing. + for topic, partitions := range request.partitions { + if response.Blocks[topic] == nil { + response.Blocks[topic] = make(map[int32]*OffsetFetchResponseBlock) + } + for _, p := range partitions { + if response.Blocks[topic][p] == nil { + response.Blocks[topic][p] = &OffsetFetchResponseBlock{Offset: -1} + } + } + } + return response, nil }