Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azeventhubs] Latest start position can also be inclusive (ie, get the latest message). #20744

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions sdk/messaging/azeventhubs/consumer_client_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,42 +49,46 @@ func TestUnitNewConsumerClient(t *testing.T) {

func TestUnit_getOffsetExpression(t *testing.T) {
t.Run("Valid", func(t *testing.T) {
expr, err := getOffsetExpression(StartPosition{})
expr, err := getStartExpression(StartPosition{})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-offset > '@latest'", expr)

expr, err = getOffsetExpression(StartPosition{Earliest: to.Ptr(true)})
expr, err = getStartExpression(StartPosition{Earliest: to.Ptr(true)})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-offset > '-1'", expr)

expr, err = getOffsetExpression(StartPosition{Latest: to.Ptr(true)})
expr, err = getStartExpression(StartPosition{Latest: to.Ptr(true)})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-offset > '@latest'", expr)

expr, err = getOffsetExpression(StartPosition{Offset: to.Ptr(int64(101))})
expr, err = getStartExpression(StartPosition{Latest: to.Ptr(true), Inclusive: true})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-offset >= '@latest'", expr)

expr, err = getStartExpression(StartPosition{Offset: to.Ptr(int64(101))})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-offset > '101'", expr)

expr, err = getOffsetExpression(StartPosition{Offset: to.Ptr(int64(101)), Inclusive: true})
expr, err = getStartExpression(StartPosition{Offset: to.Ptr(int64(101)), Inclusive: true})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-offset >= '101'", expr)

expr, err = getOffsetExpression(StartPosition{SequenceNumber: to.Ptr(int64(202))})
expr, err = getStartExpression(StartPosition{SequenceNumber: to.Ptr(int64(202))})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-sequence-number > '202'", expr)

expr, err = getOffsetExpression(StartPosition{SequenceNumber: to.Ptr(int64(202)), Inclusive: true})
expr, err = getStartExpression(StartPosition{SequenceNumber: to.Ptr(int64(202)), Inclusive: true})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-sequence-number >= '202'", expr)

enqueueTime, err := time.Parse(time.RFC3339, "2020-01-01T01:02:03Z")
require.NoError(t, err)

expr, err = getOffsetExpression(StartPosition{EnqueuedTime: &enqueueTime})
expr, err = getStartExpression(StartPosition{EnqueuedTime: &enqueueTime})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-enqueued-time > '1577840523000'", expr)

expr, err = getOffsetExpression(StartPosition{EnqueuedTime: &enqueueTime, Inclusive: true})
expr, err = getStartExpression(StartPosition{EnqueuedTime: &enqueueTime, Inclusive: true})
require.NoError(t, err)
require.Equal(t, "amqp.annotation.x-opt-enqueued-time >= '1577840523000'", expr)
})
Expand All @@ -93,28 +97,28 @@ func TestUnit_getOffsetExpression(t *testing.T) {
enqueueTime, err := time.Parse(time.RFC3339, "2020-01-01T01:02:03Z")
require.NoError(t, err)

expr, err := getOffsetExpression(StartPosition{
expr, err := getStartExpression(StartPosition{
EnqueuedTime: &enqueueTime,
Offset: to.Ptr[int64](101),
})
require.EqualError(t, err, "only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber")
require.Empty(t, expr)

expr, err = getOffsetExpression(StartPosition{
expr, err = getStartExpression(StartPosition{
Offset: to.Ptr[int64](202),
Latest: to.Ptr(true),
})
require.EqualError(t, err, "only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber")
require.Empty(t, expr)

expr, err = getOffsetExpression(StartPosition{
expr, err = getStartExpression(StartPosition{
Latest: to.Ptr(true),
SequenceNumber: to.Ptr[int64](202),
})
require.EqualError(t, err, "only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber")
require.Empty(t, expr)

expr, err = getOffsetExpression(StartPosition{
expr, err = getStartExpression(StartPosition{
SequenceNumber: to.Ptr[int64](202),
Earliest: to.Ptr(true),
})
Expand Down
20 changes: 10 additions & 10 deletions sdk/messaging/azeventhubs/partition_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (pc *PartitionClient) ReceiveEvents(ctx context.Context, count int, options
numEvents := len(events)
lastSequenceNumber := events[numEvents-1].SequenceNumber

pc.offsetExpression = formatOffsetExpressionForSequence(">", lastSequenceNumber)
pc.offsetExpression = formatStartExpressionForSequence(">", lastSequenceNumber)
log.Writef(EventConsumer, "%d Events received, moving sequence to %d", numEvents, lastSequenceNumber)
return events, nil
}
Expand Down Expand Up @@ -274,7 +274,7 @@ func newPartitionClient(args partitionClientArgs, options *PartitionClientOption
options = &PartitionClientOptions{}
}

offsetExpr, err := getOffsetExpression(options.StartPosition)
offsetExpr, err := getStartExpression(options.StartPosition)

if err != nil {
return nil, err
Expand Down Expand Up @@ -317,11 +317,11 @@ func getAllPrefetched(receiver amqpwrap.AMQPReceiver, max int) []*amqp.Message {
return messages
}

func getOffsetExpression(startPosition StartPosition) (string, error) {
lt := ">"
func getStartExpression(startPosition StartPosition) (string, error) {
gt := ">"

if startPosition.Inclusive {
lt = ">="
gt = ">="
}

var errMultipleFieldsSet = errors.New("only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber")
Expand All @@ -330,7 +330,7 @@ func getOffsetExpression(startPosition StartPosition) (string, error) {

if startPosition.EnqueuedTime != nil {
// time-based, non-inclusive
offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-enqueued-time %s '%d'", lt, startPosition.EnqueuedTime.UnixMilli())
offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-enqueued-time %s '%d'", gt, startPosition.EnqueuedTime.UnixMilli())
}

if startPosition.Offset != nil {
Expand All @@ -340,23 +340,23 @@ func getOffsetExpression(startPosition StartPosition) (string, error) {
return "", errMultipleFieldsSet
}

offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-offset %s '%d'", lt, *startPosition.Offset)
offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-offset %s '%d'", gt, *startPosition.Offset)
}

if startPosition.Latest != nil && *startPosition.Latest {
if offsetExpr != "" {
return "", errMultipleFieldsSet
}

offsetExpr = "amqp.annotation.x-opt-offset > '@latest'"
offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-offset %s '@latest'", gt)
}

if startPosition.SequenceNumber != nil {
if offsetExpr != "" {
return "", errMultipleFieldsSet
}

offsetExpr = formatOffsetExpressionForSequence(lt, *startPosition.SequenceNumber)
offsetExpr = formatStartExpressionForSequence(gt, *startPosition.SequenceNumber)
}

if startPosition.Earliest != nil && *startPosition.Earliest {
Expand All @@ -375,6 +375,6 @@ func getOffsetExpression(startPosition StartPosition) (string, error) {
return "amqp.annotation.x-opt-offset > '@latest'", nil
}

func formatOffsetExpressionForSequence(op string, sequenceNumber int64) string {
func formatStartExpressionForSequence(op string, sequenceNumber int64) string {
return fmt.Sprintf("amqp.annotation.x-opt-sequence-number %s '%d'", op, sequenceNumber)
}
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/processor_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestUnit_Processor_Run_startPosition(t *testing.T) {
fakeConsumerClient := simpleFakeConsumerClient()

fakeConsumerClient.newPartitionClientFn = func(partitionID string, options *PartitionClientOptions) (*PartitionClient, error) {
offsetExpr, err := getOffsetExpression(options.StartPosition)
offsetExpr, err := getStartExpression(options.StartPosition)
require.NoError(t, err)

return newFakePartitionClient(partitionID, offsetExpr), nil
Expand Down