From b27c0097b170ce2116b3aae91e3298024bd9f33b Mon Sep 17 00:00:00 2001 From: Karen Chen Date: Wed, 7 Aug 2024 15:04:10 -0400 Subject: [PATCH 1/5] add timeout to all settlement calls in go-shuttle --- v2/managedsettling.go | 4 +++- v2/settlehandler.go | 8 +++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/v2/managedsettling.go b/v2/managedsettling.go index f9ef735..bf7088a 100644 --- a/v2/managedsettling.go +++ b/v2/managedsettling.go @@ -38,7 +38,9 @@ func (m *ManagedSettler) Handle(ctx context.Context, settler MessageSettler, mes m.options.OnError(ctx, m.options, settler, message, err) return } - if err := settler.CompleteMessage(ctx, message, nil); err != nil { + settleCtx, cancel := context.WithTimeout(ctx, settlementTimeout) + defer cancel() + if err := settler.CompleteMessage(settleCtx, message, nil); err != nil { logger.Error(fmt.Sprintf("error completing message: %s", err)) m.options.OnAbandoned(ctx, message, err) return diff --git a/v2/settlehandler.go b/v2/settlehandler.go index 320efd0..fdc8022 100644 --- a/v2/settlehandler.go +++ b/v2/settlehandler.go @@ -3,11 +3,14 @@ package shuttle import ( "context" "fmt" + "time" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/devigned/tab" ) +const settlementTimeout = 30 * time.Second + // Settlement represents an action to take on a message. Abandon, Complete, DeadLetter, Defer, NoOp type Settlement interface { Settle(context.Context, MessageSettler, *azservicebus.ReceivedMessage) @@ -107,7 +110,10 @@ type settlement[T any] struct { func (s settlement[T]) settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage, options T) { span := tab.FromContext(ctx) span.Logger().Info(fmt.Sprintf("%s message", s.name)) - if err := s.settleFunc(ctx, settler, message, options); err != nil { + settleCtx, cancel := context.WithTimeout(ctx, settlementTimeout) + defer cancel() + getLogger(ctx).Info(fmt.Sprintf("%s message with ID: %s", s.name, message.MessageID)) + if err := s.settleFunc(settleCtx, settler, message, options); err != nil { wrapped := fmt.Errorf("%s settlement failed: %w", s.name, err) getLogger(ctx).Error(wrapped.Error()) span.Logger().Error(wrapped) From 21a18797de56660c4c545be20fbbbebd0b5438be Mon Sep 17 00:00:00 2001 From: Karen Chen Date: Mon, 19 Aug 2024 13:32:44 -0400 Subject: [PATCH 2/5] fix sender marshaller nil bug --- v2/sender.go | 5 ++++- v2/sender_test.go | 11 +++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/v2/sender.go b/v2/sender.go index 5fce187..6b8b8ae 100644 --- a/v2/sender.go +++ b/v2/sender.go @@ -53,7 +53,10 @@ type SenderOptions struct { // NewSender takes in a Sender and a Marshaller to create a new object that can send messages to the ServiceBus queue func NewSender(sender AzServiceBusSender, options *SenderOptions) *Sender { if options == nil { - options = &SenderOptions{Marshaller: &DefaultJSONMarshaller{}} + options = &SenderOptions{} + } + if options.Marshaller == nil { + options.Marshaller = &DefaultJSONMarshaller{} } if options.SendTimeout == 0 { options.SendTimeout = defaultSendTimeout diff --git a/v2/sender_test.go b/v2/sender_test.go index 38e8c91..a187610 100644 --- a/v2/sender_test.go +++ b/v2/sender_test.go @@ -22,6 +22,17 @@ func TestFunc_NewSender(t *testing.T) { if sender.options.Marshaller != marshaller { t.Errorf("failed to set marshaller, expected: %s, actual: %s", reflect.TypeOf(marshaller), reflect.TypeOf(sender.options.Marshaller)) } + + sender = NewSender(nil, &SenderOptions{EnableTracingPropagation: true}) + if sender.options.Marshaller == nil { + t.Errorf("failed to set marshaller, expected: %s, actual: %s", reflect.TypeOf(&DefaultJSONMarshaller{}), reflect.TypeOf(sender.options.Marshaller)) + } + if !sender.options.EnableTracingPropagation { + t.Errorf("failed to set EnableTracingPropagation, expected: true, actual: %t", sender.options.EnableTracingPropagation) + } + if sender.options.SendTimeout != defaultSendTimeout { + t.Errorf("failed to set SendTimeout, expected: %s, actual: %s", defaultSendTimeout, sender.options.SendTimeout) + } } func TestHandlers_SetMessageId(t *testing.T) { From b73d27dcf007de0129a6057e5e78d8d176f8e71b Mon Sep 17 00:00:00 2001 From: Karen Chen Date: Mon, 19 Aug 2024 13:35:42 -0400 Subject: [PATCH 3/5] revert --- v2/managedsettling.go | 4 +--- v2/settlehandler.go | 8 +------- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/v2/managedsettling.go b/v2/managedsettling.go index bf7088a..f9ef735 100644 --- a/v2/managedsettling.go +++ b/v2/managedsettling.go @@ -38,9 +38,7 @@ func (m *ManagedSettler) Handle(ctx context.Context, settler MessageSettler, mes m.options.OnError(ctx, m.options, settler, message, err) return } - settleCtx, cancel := context.WithTimeout(ctx, settlementTimeout) - defer cancel() - if err := settler.CompleteMessage(settleCtx, message, nil); err != nil { + if err := settler.CompleteMessage(ctx, message, nil); err != nil { logger.Error(fmt.Sprintf("error completing message: %s", err)) m.options.OnAbandoned(ctx, message, err) return diff --git a/v2/settlehandler.go b/v2/settlehandler.go index fdc8022..320efd0 100644 --- a/v2/settlehandler.go +++ b/v2/settlehandler.go @@ -3,14 +3,11 @@ package shuttle import ( "context" "fmt" - "time" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/devigned/tab" ) -const settlementTimeout = 30 * time.Second - // Settlement represents an action to take on a message. Abandon, Complete, DeadLetter, Defer, NoOp type Settlement interface { Settle(context.Context, MessageSettler, *azservicebus.ReceivedMessage) @@ -110,10 +107,7 @@ type settlement[T any] struct { func (s settlement[T]) settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage, options T) { span := tab.FromContext(ctx) span.Logger().Info(fmt.Sprintf("%s message", s.name)) - settleCtx, cancel := context.WithTimeout(ctx, settlementTimeout) - defer cancel() - getLogger(ctx).Info(fmt.Sprintf("%s message with ID: %s", s.name, message.MessageID)) - if err := s.settleFunc(settleCtx, settler, message, options); err != nil { + if err := s.settleFunc(ctx, settler, message, options); err != nil { wrapped := fmt.Errorf("%s settlement failed: %w", s.name, err) getLogger(ctx).Error(wrapped.Error()) span.Logger().Error(wrapped) From 1ab6b07b9f8df88c13e7487f5eec82dd06bcd57a Mon Sep 17 00:00:00 2001 From: Karen Chen Date: Mon, 19 Aug 2024 13:37:57 -0400 Subject: [PATCH 4/5] linting --- v2/sender_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/sender_test.go b/v2/sender_test.go index a187610..d3eac6d 100644 --- a/v2/sender_test.go +++ b/v2/sender_test.go @@ -25,7 +25,7 @@ func TestFunc_NewSender(t *testing.T) { sender = NewSender(nil, &SenderOptions{EnableTracingPropagation: true}) if sender.options.Marshaller == nil { - t.Errorf("failed to set marshaller, expected: %s, actual: %s", reflect.TypeOf(&DefaultJSONMarshaller{}), reflect.TypeOf(sender.options.Marshaller)) + t.Errorf("failed to set marshaller") } if !sender.options.EnableTracingPropagation { t.Errorf("failed to set EnableTracingPropagation, expected: true, actual: %t", sender.options.EnableTracingPropagation) From bdd1d80524dfbdca71dacd341b546aea593cb0b9 Mon Sep 17 00:00:00 2001 From: Karen Chen Date: Tue, 20 Aug 2024 13:36:33 -0400 Subject: [PATCH 5/5] try to fix linterg --- v2/logging_test.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/v2/logging_test.go b/v2/logging_test.go index 611e5f1..b07ab21 100644 --- a/v2/logging_test.go +++ b/v2/logging_test.go @@ -31,7 +31,7 @@ func TestSetLoggerFunc(t *testing.T) { }) defer SetLoggerFunc(func(ctx context.Context) Logger { return &contextLogger{ctx: ctx, logger: slog.Default()} }) logger := &testLogger{} - ctx := context.WithValue(context.Background(), testlogkey, logger) + ctx := context.WithValue(context.Background(), testLogKey{}, logger) getLogger(ctx).Info("test") g := NewWithT(t) g.Expect(getLogger(ctx)).To(Equal(logger)) @@ -41,7 +41,7 @@ func TestSetLoggerFunc(t *testing.T) { g.Expect(func() { getLogger(ctx).Info("") }).ToNot(Panic()) // getLogger returns nil - nilCtx := context.WithValue(context.Background(), testlogkey, nil) + nilCtx := context.WithValue(context.Background(), testLogKey{}, nil) g.Expect(func() { getLogger(nilCtx).Info("test") }).ToNot(Panic()) //coverage on testlogger @@ -53,12 +53,11 @@ type testLogger struct { entries []string } -var testlogkey = struct{}{} +type testLogKey struct{} func (t *testLogger) Info(s string) { t.entries = append(t.entries, s) } - func (t *testLogger) Warn(s string) { } @@ -66,7 +65,7 @@ func (t *testLogger) Error(s string) { } func getTestLogger(ctx context.Context) Logger { - if l, ok := ctx.Value(testlogkey).(*testLogger); ok { + if l, ok := ctx.Value(testLogKey{}).(*testLogger); ok { return l } return &contextLogger{ctx: ctx, logger: slog.Default()}