Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-0.18] Retry on network failures (#4454) #4457

Merged
merged 2 commits into from
Nov 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/kncloudevents/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,6 @@ func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error)
return retryConfig, nil
}

func checkRetry(_ context.Context, resp *nethttp.Response, _ error) (bool, error) {
return resp != nil && resp.StatusCode >= 300, nil
func checkRetry(_ context.Context, resp *nethttp.Response, err error) (bool, error) {
return !(resp != nil && resp.StatusCode < 300), err
}
123 changes: 101 additions & 22 deletions pkg/kncloudevents/message_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package kncloudevents

import (
"context"
nethttp "net/http"
"net"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -146,60 +147,60 @@ func TestHttpMessageSenderSendWithRetries(t *testing.T) {
name: "5 max retry",
config: &RetryConfig{
RetryMax: 5,
CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) {
CheckRetry: func(ctx context.Context, resp *http.Response, err error) (bool, error) {
return true, nil
},
Backoff: func(attemptNum int, resp *nethttp.Response) time.Duration {
Backoff: func(attemptNum int, resp *http.Response) time.Duration {
return time.Millisecond
},
},
wantStatus: nethttp.StatusServiceUnavailable,
wantStatus: http.StatusServiceUnavailable,
wantDispatch: 6,
wantErr: false,
},
{
name: "1 max retry",
config: &RetryConfig{
RetryMax: 1,
CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) {
CheckRetry: func(ctx context.Context, resp *http.Response, err error) (bool, error) {
return true, nil
},
Backoff: func(attemptNum int, resp *nethttp.Response) time.Duration {
Backoff: func(attemptNum int, resp *http.Response) time.Duration {
return time.Millisecond
},
},
wantStatus: nethttp.StatusServiceUnavailable,
wantStatus: http.StatusServiceUnavailable,
wantDispatch: 2,
wantErr: false,
},
{
name: "with no retryConfig",
wantStatus: nethttp.StatusServiceUnavailable,
wantStatus: http.StatusServiceUnavailable,
wantDispatch: 1,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var n int32
server := httptest.NewServer(nethttp.HandlerFunc(func(writer nethttp.ResponseWriter, request *nethttp.Request) {
server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
atomic.AddInt32(&n, 1)
writer.WriteHeader(tt.wantStatus)
}))

sender := &HttpMessageSender{
Client: nethttp.DefaultClient,
Client: http.DefaultClient,
}

request, err := nethttp.NewRequest("POST", server.URL, nil)
request, err := http.NewRequest("POST", server.URL, nil)
assert.Nil(t, err)
got, err := sender.SendWithRetries(request, tt.config)
if (err != nil) != tt.wantErr || got == nil {
t.Errorf("SendWithRetries() error = %v, wantErr %v or got nil", err, tt.wantErr)
return
}
if got.StatusCode != nethttp.StatusServiceUnavailable {
t.Errorf("SendWithRetries() got = %v, want %v", got.StatusCode, nethttp.StatusServiceUnavailable)
if got.StatusCode != http.StatusServiceUnavailable {
t.Errorf("SendWithRetries() got = %v, want %v", got.StatusCode, http.StatusServiceUnavailable)
return
}
if count := int(atomic.LoadInt32(&n)); count != tt.wantDispatch {
Expand All @@ -209,35 +210,113 @@ func TestHttpMessageSenderSendWithRetries(t *testing.T) {
}
}

func TestRetriesOnNetworkErrors(t *testing.T) {

n := int32(10)
linear := duckv1.BackoffPolicyLinear
target := "127.0.0.1:63468"

calls := make(chan struct{})
defer close(calls)

nCalls := int32(0)

cont := make(chan struct{})
defer close(cont)

go func() {
for range calls {

nCalls++
// Simulate that the target service is back up.
//
// First n/2-1 calls we get connection refused since there is no server running.
// Now we start a server that responds with a retryable error, so we expect that
// the client continues to retry for a different reason.
//
// The last time we return 200, so we don't expect a new retry.
if n/2 == nCalls {

l, err := net.Listen("tcp", target)
assert.Nil(t, err)

s := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
if n-1 != nCalls {
writer.WriteHeader(http.StatusServiceUnavailable)
return
}
}))
defer s.Close() //nolint // defers in this range loop won't run unless the channel gets closed

assert.Nil(t, s.Listener.Close())

s.Listener = l

s.Start()
}
cont <- struct{}{}
}
}()

r, err := RetryConfigFromDeliverySpec(duckv1.DeliverySpec{
Retry: pointer.Int32Ptr(n),
BackoffPolicy: &linear,
BackoffDelay: pointer.StringPtr("PT0.1S"),
})
assert.Nil(t, err)

checkRetry := r.CheckRetry

r.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
calls <- struct{}{}
<-cont

return checkRetry(ctx, resp, err)
}

req, err := http.NewRequest("POST", "http://"+target, nil)
assert.Nil(t, err)

sender, err := NewHttpMessageSender(nil, "")
assert.Nil(t, err)

_, err = sender.SendWithRetries(req, &r)
assert.Nil(t, err)

// nCalls keeps track of how many times a call to check retry occurs.
// Since the number of request are n + 1 and the last one is successful the expected number of calls are n.
assert.Equal(t, n, nCalls, "expected %d got %d", n, nCalls)
}

func TestHTTPMessageSenderSendWithRetriesWithBufferedMessage(t *testing.T) {
t.Parallel()

const wantToSkip = 9
config := &RetryConfig{
RetryMax: wantToSkip,
CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) {
CheckRetry: func(ctx context.Context, resp *http.Response, err error) (bool, error) {
return true, nil
},
Backoff: func(attemptNum int, resp *nethttp.Response) time.Duration {
Backoff: func(attemptNum int, resp *http.Response) time.Duration {
return time.Millisecond * 50 * time.Duration(attemptNum)
},
}

var n uint32
server := httptest.NewServer(nethttp.HandlerFunc(func(writer nethttp.ResponseWriter, request *nethttp.Request) {
server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
thisReqN := atomic.AddUint32(&n, 1)
if thisReqN <= wantToSkip {
writer.WriteHeader(nethttp.StatusServiceUnavailable)
writer.WriteHeader(http.StatusServiceUnavailable)
} else {
writer.WriteHeader(nethttp.StatusAccepted)
writer.WriteHeader(http.StatusAccepted)
}
}))

sender := &HttpMessageSender{
Client: nethttp.DefaultClient,
Client: http.DefaultClient,
}

request, err := nethttp.NewRequest("POST", server.URL, nil)
request, err := http.NewRequest("POST", server.URL, nil)
assert.Nil(t, err)

// Create a message similar to the one we send with channels
Expand All @@ -252,8 +331,8 @@ func TestHTTPMessageSenderSendWithRetriesWithBufferedMessage(t *testing.T) {
if err != nil {
t.Fatalf("SendWithRetries() error = %v, wantErr nil", err)
}
if got.StatusCode != nethttp.StatusAccepted {
t.Fatalf("SendWithRetries() got = %v, want %v", got.StatusCode, nethttp.StatusAccepted)
if got.StatusCode != http.StatusAccepted {
t.Fatalf("SendWithRetries() got = %v, want %v", got.StatusCode, http.StatusAccepted)
}
if count := atomic.LoadUint32(&n); count != wantToSkip+1 {
t.Fatalf("expected %d count got %d", wantToSkip+1, count)
Expand Down