diff --git a/pkg/kncloudevents/message_sender.go b/pkg/kncloudevents/message_sender.go index 96506b28aef..aa31b734177 100644 --- a/pkg/kncloudevents/message_sender.go +++ b/pkg/kncloudevents/message_sender.go @@ -151,6 +151,7 @@ func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error) retryConfig := NoRetries() if spec.Retry != nil { + retryConfig.CheckRetry = checkRetry retryConfig.RetryMax = int(*spec.Retry) } @@ -176,3 +177,7 @@ func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error) return retryConfig, nil } + +func checkRetry(_ context.Context, resp *nethttp.Response, _ error) (bool, error) { + return resp.StatusCode >= 300, nil +} diff --git a/pkg/kncloudevents/message_sender_test.go b/pkg/kncloudevents/message_sender_test.go new file mode 100644 index 00000000000..bb344e890d8 --- /dev/null +++ b/pkg/kncloudevents/message_sender_test.go @@ -0,0 +1,178 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kncloudevents + +import ( + "context" + nethttp "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/utils/pointer" + + eventingduck "knative.dev/eventing/pkg/apis/duck/v1" +) + +func TestHttpMessageSenderSendWithRetries(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + config *RetryConfig + wantStatus int + wantDispatch int + wantErr bool + }{ + { + name: "5 max retry", + config: &RetryConfig{ + RetryMax: 5, + CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) { + return true, nil + }, + Backoff: func(attemptNum int, resp *nethttp.Response) time.Duration { + return time.Millisecond + }, + }, + wantStatus: nethttp.StatusServiceUnavailable, + wantDispatch: 6, + wantErr: false, + }, + { + name: "1 max retry", + config: &RetryConfig{ + RetryMax: 1, + CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) { + return true, nil + }, + Backoff: func(attemptNum int, resp *nethttp.Response) time.Duration { + return time.Millisecond + }, + }, + wantStatus: nethttp.StatusServiceUnavailable, + wantDispatch: 2, + wantErr: false, + }, + { + name: "with no retryConfig", + wantStatus: nethttp.StatusServiceUnavailable, + wantDispatch: 1, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + n := 0 + var mu sync.Mutex + server := httptest.NewServer(nethttp.HandlerFunc(func(writer nethttp.ResponseWriter, request *nethttp.Request) { + mu.Lock() + n++ + mu.Unlock() + + writer.WriteHeader(tt.wantStatus) + })) + + sender := &HttpMessageSender{ + Client: nethttp.DefaultClient, + } + + request, err := nethttp.NewRequest("POST", server.URL, nil) + assert.Nil(t, err) + got, err := sender.SendWithRetries(request, tt.config) + if (err != nil) != tt.wantErr || got == nil { + t.Errorf("SendWithRetries() error = %v, wantErr %v or got nil", err, tt.wantErr) + return + } + if got.StatusCode != nethttp.StatusServiceUnavailable { + t.Errorf("SendWithRetries() got = %v, want %v", got.StatusCode, nethttp.StatusServiceUnavailable) + return + } + if n != tt.wantDispatch { + t.Errorf("expected %d retries got %d", tt.config.RetryMax, n) + return + } + }) + } +} + +func TestRetryConfigFromDeliverySpec(t *testing.T) { + linear := eventingduck.BackoffPolicyLinear + tests := []struct { + name string + spec eventingduck.DeliverySpec + retryMax int + wantErr bool + }{ + { + name: "full delivery", + spec: eventingduck.DeliverySpec{ + Retry: pointer.Int32Ptr(10), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("PT1S"), + }, + retryMax: 10, + wantErr: false, + }, + { + name: "only retry", + spec: eventingduck.DeliverySpec{ + Retry: pointer.Int32Ptr(10), + BackoffPolicy: &linear, + }, + retryMax: 10, + wantErr: false, + }, + { + name: "not ISO8601", + spec: eventingduck.DeliverySpec{ + Retry: pointer.Int32Ptr(10), + BackoffDelay: pointer.StringPtr("PP1"), + BackoffPolicy: &linear, + }, + retryMax: 10, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := RetryConfigFromDeliverySpec(tt.spec) + if (err != nil) != tt.wantErr { + t.Errorf("RetryConfigFromDeliverySpec() error = %v, wantErr %v", err, tt.wantErr) + return + } + if err != nil { + return + } + + if got.CheckRetry == nil { + t.Errorf("CheckRetry must not be nil") + return + } + if got.Backoff == nil { + t.Errorf("Backoff must not be nil") + } + if got.RetryMax != tt.retryMax { + t.Errorf("retryMax want %d got %d", tt.retryMax, got.RetryMax) + } + }) + } +} diff --git a/test/e2e/helpers/broker_redelivery_helper.go b/test/e2e/helpers/broker_redelivery_helper.go index 5437a9639c1..d01e7ec0be8 100644 --- a/test/e2e/helpers/broker_redelivery_helper.go +++ b/test/e2e/helpers/broker_redelivery_helper.go @@ -40,8 +40,7 @@ func BrokerRedelivery(t *testing.T, creator BrokerCreatorWithRetries) { t.Run(dropevents.Fibonacci, func(t *testing.T) { brokerRedelivery(t, creator, numRetries, func(pod *corev1.Pod, client *testlib.Client) error { - container := pod.Spec.Containers[0] - container.Env = append(container.Env, + pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, corev1.EnvVar{ Name: dropevents.SkipAlgorithmKey, Value: dropevents.Fibonacci, @@ -53,15 +52,14 @@ func BrokerRedelivery(t *testing.T, creator BrokerCreatorWithRetries) { t.Run(dropevents.Sequence, func(t *testing.T) { brokerRedelivery(t, creator, numRetries, func(pod *corev1.Pod, client *testlib.Client) error { - container := pod.Spec.Containers[0] - container.Env = append(container.Env, + pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, corev1.EnvVar{ Name: dropevents.SkipAlgorithmKey, Value: dropevents.Sequence, }, corev1.EnvVar{ Name: dropevents.NumberKey, - Value: fmt.Sprintf("%d", numRetries-1), + Value: fmt.Sprintf("%d", numRetries), }, ) return nil