diff --git a/consumer_with_idle_trigger_test.go b/consumer_with_idle_trigger_test.go index 26366c4..4594645 100644 --- a/consumer_with_idle_trigger_test.go +++ b/consumer_with_idle_trigger_test.go @@ -19,15 +19,16 @@ import ( type MsgHandlerWithIdleTrigger struct { t *testing.T - msgsReceivedCount int expectedMsg TestMsg expectedMsgAttributes interface{} shutdownReceived bool - idleTimeoutTriggered bool + + msgsReceivedCount int + idleTimeoutTriggeredCount int } const ( - IdleTimeout = 1 * time.Second + IdleTimeout = 500 * time.Millisecond ) func TestConsumeWithIdleTrigger(t *testing.T) { @@ -164,15 +165,70 @@ func TestConsumeWithIdleTimeout_TimesOut(t *testing.T) { // Wait for the timeout time.Sleep(time.Second * 2) - assert.True(t, msgHandler.idleTimeoutTriggered) + // ensure that it gets called multiple times + assert.GreaterOrEqual(t, msgHandler.idleTimeoutTriggeredCount, 2) +} +func TestConsumeWithIdleTimeout_TimesOutAndConsumes(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + awsCfg := loadAWSDefaultConfig(ctx) + + queueName := strings.ToLower(t.Name()) + queueUrl := createQueue(t, ctx, awsCfg, queueName) + + expectedMsg := TestMsg{Name: "TestName"} + expectedMsgAttributes := map[string]types.MessageAttributeValue{ + "TraceID": { + DataType: aws.String("String"), + StringValue: aws.String(traceId), + }, + "SpanID": { + DataType: aws.String("String"), + StringValue: aws.String(spanId), + }, + } + + config := Config{ + QueueURL: *queueUrl, + WorkersNum: workersNum, + VisibilityTimeout: visibilityTimeout, + BatchSize: batchSize, + ExtendEnabled: true, + } + msgHandler := handlerWithIdleTrigger(t, expectedMsg, expectedMsgAttributes) + consumer := NewConsumerWithIdleTrigger(awsCfg, config, msgHandler, IdleTimeout) + go consumer.Consume(ctx) + + t.Cleanup(func() { + _, err := consumer.sqs.PurgeQueue(ctx, &sqs.PurgeQueueInput{QueueUrl: queueUrl}) + if err != nil { + zap.S().Error("failed to purge queue") + t.FailNow() + } + cancel() + }) + time.Sleep(time.Second * 1) + + // ensure that it gets called first before receiving a message + assert.GreaterOrEqual(t, msgHandler.idleTimeoutTriggeredCount, 1) + assert.Equal(t, 0, msgHandler.msgsReceivedCount) + + // Send message to the queue + sendTestMsg(t, ctx, consumer.sqs, queueUrl, expectedMsg) + + time.Sleep(time.Second * 2) + // Check that the message arrived + assert.Equal(t, 1, msgHandler.msgsReceivedCount) + assert.GreaterOrEqual(t, msgHandler.idleTimeoutTriggeredCount, 3) + } func handlerWithIdleTrigger(t *testing.T, expectedMsg TestMsg, expectedMsgAttributes map[string]types.MessageAttributeValue) *MsgHandlerWithIdleTrigger { return &MsgHandlerWithIdleTrigger{ - t: t, - msgsReceivedCount: 0, - expectedMsg: expectedMsg, - expectedMsgAttributes: expectedMsgAttributes, + t: t, + msgsReceivedCount: 0, + expectedMsg: expectedMsg, + expectedMsgAttributes: expectedMsgAttributes, + idleTimeoutTriggeredCount: 0, } } @@ -201,7 +257,7 @@ func (m *MsgHandlerWithIdleTrigger) Shutdown() { } func (m *MsgHandlerWithIdleTrigger) IdleTimeout(ctx context.Context) error { - zap.S().Info("Idle timeout triggered") - m.idleTimeoutTriggered = true + zap.S().Info("Idle timeout triggered: ", m.idleTimeoutTriggeredCount) + m.idleTimeoutTriggeredCount++ return nil }