From e344acd29d5b342147e7c7754ee3e7cb3b1f48aa Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Thu, 24 Sep 2020 22:34:28 +0200 Subject: [PATCH] Set check retry function Signed-off-by: Pierangelo Di Pilato --- pkg/kncloudevents/message_sender.go | 5 + pkg/kncloudevents/message_sender_test.go | 178 +++++++++++++++++++ test/e2e/helpers/broker_redelivery_helper.go | 8 +- 3 files changed, 186 insertions(+), 5 deletions(-) create mode 100644 pkg/kncloudevents/message_sender_test.go diff --git a/pkg/kncloudevents/message_sender.go b/pkg/kncloudevents/message_sender.go index 96506b28aef..aa31b734177 100644 --- a/pkg/kncloudevents/message_sender.go +++ b/pkg/kncloudevents/message_sender.go @@ -151,6 +151,7 @@ func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error) retryConfig := NoRetries() if spec.Retry != nil { + retryConfig.CheckRetry = checkRetry retryConfig.RetryMax = int(*spec.Retry) } @@ -176,3 +177,7 @@ func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error) return retryConfig, nil } + +func checkRetry(_ context.Context, resp *nethttp.Response, _ error) (bool, error) { + return resp.StatusCode >= 300, nil +} diff --git a/pkg/kncloudevents/message_sender_test.go b/pkg/kncloudevents/message_sender_test.go new file mode 100644 index 00000000000..bb344e890d8 --- /dev/null +++ b/pkg/kncloudevents/message_sender_test.go @@ -0,0 +1,178 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kncloudevents + +import ( + "context" + nethttp "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/utils/pointer" + + eventingduck "knative.dev/eventing/pkg/apis/duck/v1" +) + +func TestHttpMessageSenderSendWithRetries(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + config *RetryConfig + wantStatus int + wantDispatch int + wantErr bool + }{ + { + name: "5 max retry", + config: &RetryConfig{ + RetryMax: 5, + CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) { + return true, nil + }, + Backoff: func(attemptNum int, resp *nethttp.Response) time.Duration { + return time.Millisecond + }, + }, + wantStatus: nethttp.StatusServiceUnavailable, + wantDispatch: 6, + wantErr: false, + }, + { + name: "1 max retry", + config: &RetryConfig{ + RetryMax: 1, + CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) { + return true, nil + }, + Backoff: func(attemptNum int, resp *nethttp.Response) time.Duration { + return time.Millisecond + }, + }, + wantStatus: nethttp.StatusServiceUnavailable, + wantDispatch: 2, + wantErr: false, + }, + { + name: "with no retryConfig", + wantStatus: nethttp.StatusServiceUnavailable, + wantDispatch: 1, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + n := 0 + var mu sync.Mutex + server := httptest.NewServer(nethttp.HandlerFunc(func(writer nethttp.ResponseWriter, request *nethttp.Request) { + mu.Lock() + n++ + mu.Unlock() + + writer.WriteHeader(tt.wantStatus) + })) + + sender := &HttpMessageSender{ + Client: nethttp.DefaultClient, + } + + request, err := nethttp.NewRequest("POST", server.URL, nil) + assert.Nil(t, err) + got, err := sender.SendWithRetries(request, tt.config) + if (err != nil) != tt.wantErr || got == nil { + t.Errorf("SendWithRetries() error = %v, wantErr %v or got nil", err, tt.wantErr) + return + } + if got.StatusCode != nethttp.StatusServiceUnavailable { + t.Errorf("SendWithRetries() got = %v, want %v", got.StatusCode, nethttp.StatusServiceUnavailable) + return + } + if n != tt.wantDispatch { + t.Errorf("expected %d retries got %d", tt.config.RetryMax, n) + return + } + }) + } +} + +func TestRetryConfigFromDeliverySpec(t *testing.T) { + linear := eventingduck.BackoffPolicyLinear + tests := []struct { + name string + spec eventingduck.DeliverySpec + retryMax int + wantErr bool + }{ + { + name: "full delivery", + spec: eventingduck.DeliverySpec{ + Retry: pointer.Int32Ptr(10), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("PT1S"), + }, + retryMax: 10, + wantErr: false, + }, + { + name: "only retry", + spec: eventingduck.DeliverySpec{ + Retry: pointer.Int32Ptr(10), + BackoffPolicy: &linear, + }, + retryMax: 10, + wantErr: false, + }, + { + name: "not ISO8601", + spec: eventingduck.DeliverySpec{ + Retry: pointer.Int32Ptr(10), + BackoffDelay: pointer.StringPtr("PP1"), + BackoffPolicy: &linear, + }, + retryMax: 10, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := RetryConfigFromDeliverySpec(tt.spec) + if (err != nil) != tt.wantErr { + t.Errorf("RetryConfigFromDeliverySpec() error = %v, wantErr %v", err, tt.wantErr) + return + } + if err != nil { + return + } + + if got.CheckRetry == nil { + t.Errorf("CheckRetry must not be nil") + return + } + if got.Backoff == nil { + t.Errorf("Backoff must not be nil") + } + if got.RetryMax != tt.retryMax { + t.Errorf("retryMax want %d got %d", tt.retryMax, got.RetryMax) + } + }) + } +} diff --git a/test/e2e/helpers/broker_redelivery_helper.go b/test/e2e/helpers/broker_redelivery_helper.go index fab308dd3cb..0d8a1ca751a 100644 --- a/test/e2e/helpers/broker_redelivery_helper.go +++ b/test/e2e/helpers/broker_redelivery_helper.go @@ -41,8 +41,7 @@ func BrokerRedelivery(ctx context.Context, t *testing.T, creator BrokerCreatorWi t.Run(dropevents.Fibonacci, func(t *testing.T) { brokerRedelivery(ctx, t, creator, numRetries, func(pod *corev1.Pod, client *testlib.Client) error { - container := pod.Spec.Containers[0] - container.Env = append(container.Env, + pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, corev1.EnvVar{ Name: dropevents.SkipAlgorithmKey, Value: dropevents.Fibonacci, @@ -54,15 +53,14 @@ func BrokerRedelivery(ctx context.Context, t *testing.T, creator BrokerCreatorWi t.Run(dropevents.Sequence, func(t *testing.T) { brokerRedelivery(ctx, t, creator, numRetries, func(pod *corev1.Pod, client *testlib.Client) error { - container := pod.Spec.Containers[0] - container.Env = append(container.Env, + pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, corev1.EnvVar{ Name: dropevents.SkipAlgorithmKey, Value: dropevents.Sequence, }, corev1.EnvVar{ Name: dropevents.NumberKey, - Value: fmt.Sprintf("%d", numRetries-1), + Value: fmt.Sprintf("%d", numRetries), }, ) return nil