diff --git a/pkg/kncloudevents/message_sender.go b/pkg/kncloudevents/message_sender.go index b9fc4f70f15..1a425cd601b 100644 --- a/pkg/kncloudevents/message_sender.go +++ b/pkg/kncloudevents/message_sender.go @@ -150,6 +150,8 @@ func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error) retryConfig := NoRetries() + retryConfig.CheckRetry = checkRetry + if spec.Retry != nil { retryConfig.RetryMax = int(*spec.Retry) } @@ -176,3 +178,7 @@ func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error) return retryConfig, nil } + +func checkRetry(_ context.Context, resp *nethttp.Response, _ error) (bool, error) { + return resp != nil && resp.StatusCode >= 300, nil +} diff --git a/pkg/kncloudevents/message_sender_test.go b/pkg/kncloudevents/message_sender_test.go index aa6e4545b62..a6793c44823 100644 --- a/pkg/kncloudevents/message_sender_test.go +++ b/pkg/kncloudevents/message_sender_test.go @@ -17,11 +17,18 @@ limitations under the License. package kncloudevents import ( + "context" + nethttp "net/http" + "net/http/httptest" + "sync" "testing" "time" "github.com/stretchr/testify/assert" + "k8s.io/utils/pointer" + duckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingduck "knative.dev/eventing/pkg/apis/duck/v1" ) // Test The RetryConfigFromDeliverySpec() Functionality @@ -120,3 +127,150 @@ func TestRetryConfigFromDeliverySpec(t *testing.T) { }) } } + +func TestHttpMessageSenderSendWithRetries(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + config *RetryConfig + wantStatus int + wantDispatch int + wantErr bool + }{ + { + name: "5 max retry", + config: &RetryConfig{ + RetryMax: 5, + CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) { + return true, nil + }, + Backoff: func(attemptNum int, resp *nethttp.Response) time.Duration { + return time.Millisecond + }, + }, + wantStatus: nethttp.StatusServiceUnavailable, + wantDispatch: 6, + wantErr: false, + }, + { + name: "1 max retry", + config: &RetryConfig{ + RetryMax: 1, + CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) { + return true, nil + }, + Backoff: func(attemptNum int, resp *nethttp.Response) time.Duration { + return time.Millisecond + }, + }, + wantStatus: nethttp.StatusServiceUnavailable, + wantDispatch: 2, + wantErr: false, + }, + { + name: "with no retryConfig", + wantStatus: nethttp.StatusServiceUnavailable, + wantDispatch: 1, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + n := 0 + var mu sync.Mutex + server := httptest.NewServer(nethttp.HandlerFunc(func(writer nethttp.ResponseWriter, request *nethttp.Request) { + mu.Lock() + n++ + mu.Unlock() + + writer.WriteHeader(tt.wantStatus) + })) + + sender := &HttpMessageSender{ + Client: nethttp.DefaultClient, + } + + request, err := nethttp.NewRequest("POST", server.URL, nil) + assert.Nil(t, err) + got, err := sender.SendWithRetries(request, tt.config) + if (err != nil) != tt.wantErr || got == nil { + t.Errorf("SendWithRetries() error = %v, wantErr %v or got nil", err, tt.wantErr) + return + } + if got.StatusCode != nethttp.StatusServiceUnavailable { + t.Errorf("SendWithRetries() got = %v, want %v", got.StatusCode, nethttp.StatusServiceUnavailable) + return + } + if n != tt.wantDispatch { + t.Errorf("expected %d retries got %d", tt.config.RetryMax, n) + return + } + }) + } +} + +func TestRetryConfigFromDeliverySpecCheckDelivery(t *testing.T) { + linear := eventingduck.BackoffPolicyLinear + tests := []struct { + name string + spec eventingduck.DeliverySpec + retryMax int + wantErr bool + }{ + { + name: "full delivery", + spec: eventingduck.DeliverySpec{ + Retry: pointer.Int32Ptr(10), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("PT1S"), + }, + retryMax: 10, + wantErr: false, + }, + { + name: "only retry", + spec: eventingduck.DeliverySpec{ + Retry: pointer.Int32Ptr(10), + BackoffPolicy: &linear, + }, + retryMax: 10, + wantErr: false, + }, + { + name: "not ISO8601", + spec: eventingduck.DeliverySpec{ + Retry: pointer.Int32Ptr(10), + BackoffDelay: pointer.StringPtr("PP1"), + BackoffPolicy: &linear, + }, + retryMax: 10, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := RetryConfigFromDeliverySpec(tt.spec) + if (err != nil) != tt.wantErr { + t.Errorf("RetryConfigFromDeliverySpec() error = %v, wantErr %v", err, tt.wantErr) + return + } + if err != nil { + return + } + + if got.CheckRetry == nil { + t.Errorf("CheckRetry must not be nil") + return + } + if got.Backoff == nil { + t.Errorf("Backoff must not be nil") + } + if got.RetryMax != tt.retryMax { + t.Errorf("retryMax want %d got %d", tt.retryMax, got.RetryMax) + } + }) + } +} diff --git a/test/e2e/helpers/broker_redelivery_helper.go b/test/e2e/helpers/broker_redelivery_helper.go index 5437a9639c1..d01e7ec0be8 100644 --- a/test/e2e/helpers/broker_redelivery_helper.go +++ b/test/e2e/helpers/broker_redelivery_helper.go @@ -40,8 +40,7 @@ func BrokerRedelivery(t *testing.T, creator BrokerCreatorWithRetries) { t.Run(dropevents.Fibonacci, func(t *testing.T) { brokerRedelivery(t, creator, numRetries, func(pod *corev1.Pod, client *testlib.Client) error { - container := pod.Spec.Containers[0] - container.Env = append(container.Env, + pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, corev1.EnvVar{ Name: dropevents.SkipAlgorithmKey, Value: dropevents.Fibonacci, @@ -53,15 +52,14 @@ func BrokerRedelivery(t *testing.T, creator BrokerCreatorWithRetries) { t.Run(dropevents.Sequence, func(t *testing.T) { brokerRedelivery(t, creator, numRetries, func(pod *corev1.Pod, client *testlib.Client) error { - container := pod.Spec.Containers[0] - container.Env = append(container.Env, + pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, corev1.EnvVar{ Name: dropevents.SkipAlgorithmKey, Value: dropevents.Sequence, }, corev1.EnvVar{ Name: dropevents.NumberKey, - Value: fmt.Sprintf("%d", numRetries-1), + Value: fmt.Sprintf("%d", numRetries), }, ) return nil