From 767d5917cf13fa93308f3311642f84ed5895b2c3 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Tue, 3 Nov 2020 19:54:35 +0100 Subject: [PATCH 1/2] Retry on network failures (#4454) Signed-off-by: Pierangelo Di Pilato --- pkg/kncloudevents/message_sender.go | 4 +- pkg/kncloudevents/message_sender_test.go | 80 ++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 2 deletions(-) diff --git a/pkg/kncloudevents/message_sender.go b/pkg/kncloudevents/message_sender.go index e87705cec84..cca0de5074d 100644 --- a/pkg/kncloudevents/message_sender.go +++ b/pkg/kncloudevents/message_sender.go @@ -184,6 +184,6 @@ func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error) return retryConfig, nil } -func checkRetry(_ context.Context, resp *nethttp.Response, _ error) (bool, error) { - return resp != nil && resp.StatusCode >= 300, nil +func checkRetry(_ context.Context, resp *nethttp.Response, err error) (bool, error) { + return !(resp != nil && resp.StatusCode < 300), err } diff --git a/pkg/kncloudevents/message_sender_test.go b/pkg/kncloudevents/message_sender_test.go index 5bd033455fe..fde35f17691 100644 --- a/pkg/kncloudevents/message_sender_test.go +++ b/pkg/kncloudevents/message_sender_test.go @@ -18,6 +18,8 @@ package kncloudevents import ( "context" + "net" + "net/http" nethttp "net/http" "net/http/httptest" "sync/atomic" @@ -209,6 +211,84 @@ func TestHttpMessageSenderSendWithRetries(t *testing.T) { } } +func TestRetriesOnNetworkErrors(t *testing.T) { + + n := int32(10) + linear := duckv1.BackoffPolicyLinear + target := "127.0.0.1:63468" + + calls := make(chan struct{}) + defer close(calls) + + nCalls := int32(0) + + cont := make(chan struct{}) + defer close(cont) + + go func() { + for range calls { + + nCalls++ + // Simulate that the target service is back up. + // + // First n/2-1 calls we get connection refused since there is no server running. + // Now we start a server that responds with a retryable error, so we expect that + // the client continues to retry for a different reason. + // + // The last time we return 200, so we don't expect a new retry. + if n/2 == nCalls { + + l, err := net.Listen("tcp", target) + assert.Nil(t, err) + + s := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + if n-1 != nCalls { + writer.WriteHeader(http.StatusServiceUnavailable) + return + } + })) + defer s.Close() //nolint // defers in this range loop won't run unless the channel gets closed + + assert.Nil(t, s.Listener.Close()) + + s.Listener = l + + s.Start() + } + cont <- struct{}{} + } + }() + + r, err := RetryConfigFromDeliverySpec(duckv1.DeliverySpec{ + Retry: pointer.Int32Ptr(n), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("PT0.1S"), + }) + assert.Nil(t, err) + + checkRetry := r.CheckRetry + + r.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) { + calls <- struct{}{} + <-cont + + return checkRetry(ctx, resp, err) + } + + req, err := http.NewRequest("POST", "http://"+target, nil) + assert.Nil(t, err) + + sender, err := NewHttpMessageSender(nil, "") + assert.Nil(t, err) + + _, err = sender.SendWithRetries(req, &r) + assert.Nil(t, err) + + // nCalls keeps track of how many times a call to check retry occurs. + // Since the number of request are n + 1 and the last one is successful the expected number of calls are n. + assert.Equal(t, n, nCalls, "expected %d got %d", n, nCalls) +} + func TestHTTPMessageSenderSendWithRetriesWithBufferedMessage(t *testing.T) { t.Parallel() From 249e7d62559b36c08457b8a3960588703287111b Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Tue, 3 Nov 2020 21:41:42 +0100 Subject: [PATCH 2/2] nethttp -> http Signed-off-by: Pierangelo Di Pilato --- pkg/kncloudevents/message_sender_test.go | 43 ++++++++++++------------ 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/pkg/kncloudevents/message_sender_test.go b/pkg/kncloudevents/message_sender_test.go index fde35f17691..e04a1c41cc2 100644 --- a/pkg/kncloudevents/message_sender_test.go +++ b/pkg/kncloudevents/message_sender_test.go @@ -20,7 +20,6 @@ import ( "context" "net" "net/http" - nethttp "net/http" "net/http/httptest" "sync/atomic" "testing" @@ -148,14 +147,14 @@ func TestHttpMessageSenderSendWithRetries(t *testing.T) { name: "5 max retry", config: &RetryConfig{ RetryMax: 5, - CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) { + CheckRetry: func(ctx context.Context, resp *http.Response, err error) (bool, error) { return true, nil }, - Backoff: func(attemptNum int, resp *nethttp.Response) time.Duration { + Backoff: func(attemptNum int, resp *http.Response) time.Duration { return time.Millisecond }, }, - wantStatus: nethttp.StatusServiceUnavailable, + wantStatus: http.StatusServiceUnavailable, wantDispatch: 6, wantErr: false, }, @@ -163,20 +162,20 @@ func TestHttpMessageSenderSendWithRetries(t *testing.T) { name: "1 max retry", config: &RetryConfig{ RetryMax: 1, - CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) { + CheckRetry: func(ctx context.Context, resp *http.Response, err error) (bool, error) { return true, nil }, - Backoff: func(attemptNum int, resp *nethttp.Response) time.Duration { + Backoff: func(attemptNum int, resp *http.Response) time.Duration { return time.Millisecond }, }, - wantStatus: nethttp.StatusServiceUnavailable, + wantStatus: http.StatusServiceUnavailable, wantDispatch: 2, wantErr: false, }, { name: "with no retryConfig", - wantStatus: nethttp.StatusServiceUnavailable, + wantStatus: http.StatusServiceUnavailable, wantDispatch: 1, wantErr: false, }, @@ -184,24 +183,24 @@ func TestHttpMessageSenderSendWithRetries(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { var n int32 - server := httptest.NewServer(nethttp.HandlerFunc(func(writer nethttp.ResponseWriter, request *nethttp.Request) { + server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { atomic.AddInt32(&n, 1) writer.WriteHeader(tt.wantStatus) })) sender := &HttpMessageSender{ - Client: nethttp.DefaultClient, + Client: http.DefaultClient, } - request, err := nethttp.NewRequest("POST", server.URL, nil) + request, err := http.NewRequest("POST", server.URL, nil) assert.Nil(t, err) got, err := sender.SendWithRetries(request, tt.config) if (err != nil) != tt.wantErr || got == nil { t.Errorf("SendWithRetries() error = %v, wantErr %v or got nil", err, tt.wantErr) return } - if got.StatusCode != nethttp.StatusServiceUnavailable { - t.Errorf("SendWithRetries() got = %v, want %v", got.StatusCode, nethttp.StatusServiceUnavailable) + if got.StatusCode != http.StatusServiceUnavailable { + t.Errorf("SendWithRetries() got = %v, want %v", got.StatusCode, http.StatusServiceUnavailable) return } if count := int(atomic.LoadInt32(&n)); count != tt.wantDispatch { @@ -295,29 +294,29 @@ func TestHTTPMessageSenderSendWithRetriesWithBufferedMessage(t *testing.T) { const wantToSkip = 9 config := &RetryConfig{ RetryMax: wantToSkip, - CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) { + CheckRetry: func(ctx context.Context, resp *http.Response, err error) (bool, error) { return true, nil }, - Backoff: func(attemptNum int, resp *nethttp.Response) time.Duration { + Backoff: func(attemptNum int, resp *http.Response) time.Duration { return time.Millisecond * 50 * time.Duration(attemptNum) }, } var n uint32 - server := httptest.NewServer(nethttp.HandlerFunc(func(writer nethttp.ResponseWriter, request *nethttp.Request) { + server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { thisReqN := atomic.AddUint32(&n, 1) if thisReqN <= wantToSkip { - writer.WriteHeader(nethttp.StatusServiceUnavailable) + writer.WriteHeader(http.StatusServiceUnavailable) } else { - writer.WriteHeader(nethttp.StatusAccepted) + writer.WriteHeader(http.StatusAccepted) } })) sender := &HttpMessageSender{ - Client: nethttp.DefaultClient, + Client: http.DefaultClient, } - request, err := nethttp.NewRequest("POST", server.URL, nil) + request, err := http.NewRequest("POST", server.URL, nil) assert.Nil(t, err) // Create a message similar to the one we send with channels @@ -332,8 +331,8 @@ func TestHTTPMessageSenderSendWithRetriesWithBufferedMessage(t *testing.T) { if err != nil { t.Fatalf("SendWithRetries() error = %v, wantErr nil", err) } - if got.StatusCode != nethttp.StatusAccepted { - t.Fatalf("SendWithRetries() got = %v, want %v", got.StatusCode, nethttp.StatusAccepted) + if got.StatusCode != http.StatusAccepted { + t.Fatalf("SendWithRetries() got = %v, want %v", got.StatusCode, http.StatusAccepted) } if count := atomic.LoadUint32(&n); count != wantToSkip+1 { t.Fatalf("expected %d count got %d", wantToSkip+1, count)