Skip to content

Commit

Permalink
Retry on network failures (knative#4454)
Browse files Browse the repository at this point in the history
* Retry on network failure

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* Use a fixed port number

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi authored and matzew committed Nov 3, 2020
1 parent 56c41d2 commit bda7294
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 2 deletions.
4 changes: 2 additions & 2 deletions pkg/kncloudevents/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,6 @@ func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error)
return retryConfig, nil
}

func checkRetry(_ context.Context, resp *nethttp.Response, _ error) (bool, error) {
return resp != nil && resp.StatusCode >= 300, nil
func checkRetry(_ context.Context, resp *nethttp.Response, err error) (bool, error) {
return !(resp != nil && resp.StatusCode < 300), err
}
80 changes: 80 additions & 0 deletions pkg/kncloudevents/message_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package kncloudevents

import (
"context"
"net"
"net/http"
nethttp "net/http"
"net/http/httptest"
"sync/atomic"
Expand Down Expand Up @@ -209,6 +211,84 @@ func TestHttpMessageSenderSendWithRetries(t *testing.T) {
}
}

func TestRetriesOnNetworkErrors(t *testing.T) {

n := int32(10)
linear := duckv1.BackoffPolicyLinear
target := "127.0.0.1:63468"

calls := make(chan struct{})
defer close(calls)

nCalls := int32(0)

cont := make(chan struct{})
defer close(cont)

go func() {
for range calls {

nCalls++
// Simulate that the target service is back up.
//
// First n/2-1 calls we get connection refused since there is no server running.
// Now we start a server that responds with a retryable error, so we expect that
// the client continues to retry for a different reason.
//
// The last time we return 200, so we don't expect a new retry.
if n/2 == nCalls {

l, err := net.Listen("tcp", target)
assert.Nil(t, err)

s := httptest.NewUnstartedServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
if n-1 != nCalls {
writer.WriteHeader(http.StatusServiceUnavailable)
return
}
}))
defer s.Close() //nolint // defers in this range loop won't run unless the channel gets closed

assert.Nil(t, s.Listener.Close())

s.Listener = l

s.Start()
}
cont <- struct{}{}
}
}()

r, err := RetryConfigFromDeliverySpec(duckv1.DeliverySpec{
Retry: pointer.Int32Ptr(n),
BackoffPolicy: &linear,
BackoffDelay: pointer.StringPtr("PT0.1S"),
})
assert.Nil(t, err)

checkRetry := r.CheckRetry

r.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
calls <- struct{}{}
<-cont

return checkRetry(ctx, resp, err)
}

req, err := http.NewRequest("POST", "http://"+target, nil)
assert.Nil(t, err)

sender, err := NewHttpMessageSender(nil, "")
assert.Nil(t, err)

_, err = sender.SendWithRetries(req, &r)
assert.Nil(t, err)

// nCalls keeps track of how many times a call to check retry occurs.
// Since the number of request are n + 1 and the last one is successful the expected number of calls are n.
assert.Equal(t, n, nCalls, "expected %d got %d", n, nCalls)
}

func TestHTTPMessageSenderSendWithRetriesWithBufferedMessage(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit bda7294

Please sign in to comment.