Skip to content

Commit

Permalink
Set check retry function
Browse files Browse the repository at this point in the history
Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi committed Sep 25, 2020
1 parent 724ff78 commit 84c5f88
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 5 deletions.
6 changes: 6 additions & 0 deletions pkg/kncloudevents/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error)

retryConfig := NoRetries()

retryConfig.CheckRetry = checkRetry

if spec.Retry != nil {
retryConfig.RetryMax = int(*spec.Retry)
}
Expand All @@ -176,3 +178,7 @@ func RetryConfigFromDeliverySpec(spec duckv1.DeliverySpec) (RetryConfig, error)

return retryConfig, nil
}

func checkRetry(_ context.Context, resp *nethttp.Response, _ error) (bool, error) {
return resp != nil && resp.StatusCode >= 300, nil
}
154 changes: 154 additions & 0 deletions pkg/kncloudevents/message_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@ limitations under the License.
package kncloudevents

import (
"context"
nethttp "net/http"
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"k8s.io/utils/pointer"

duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
)

// Test The RetryConfigFromDeliverySpec() Functionality
Expand Down Expand Up @@ -120,3 +127,150 @@ func TestRetryConfigFromDeliverySpec(t *testing.T) {
})
}
}

func TestHttpMessageSenderSendWithRetries(t *testing.T) {
t.Parallel()

tests := []struct {
name string
config *RetryConfig
wantStatus int
wantDispatch int
wantErr bool
}{
{
name: "5 max retry",
config: &RetryConfig{
RetryMax: 5,
CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) {
return true, nil
},
Backoff: func(attemptNum int, resp *nethttp.Response) time.Duration {
return time.Millisecond
},
},
wantStatus: nethttp.StatusServiceUnavailable,
wantDispatch: 6,
wantErr: false,
},
{
name: "1 max retry",
config: &RetryConfig{
RetryMax: 1,
CheckRetry: func(ctx context.Context, resp *nethttp.Response, err error) (bool, error) {
return true, nil
},
Backoff: func(attemptNum int, resp *nethttp.Response) time.Duration {
return time.Millisecond
},
},
wantStatus: nethttp.StatusServiceUnavailable,
wantDispatch: 2,
wantErr: false,
},
{
name: "with no retryConfig",
wantStatus: nethttp.StatusServiceUnavailable,
wantDispatch: 1,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

n := 0
var mu sync.Mutex
server := httptest.NewServer(nethttp.HandlerFunc(func(writer nethttp.ResponseWriter, request *nethttp.Request) {
mu.Lock()
n++
mu.Unlock()

writer.WriteHeader(tt.wantStatus)
}))

sender := &HttpMessageSender{
Client: nethttp.DefaultClient,
}

request, err := nethttp.NewRequest("POST", server.URL, nil)
assert.Nil(t, err)
got, err := sender.SendWithRetries(request, tt.config)
if (err != nil) != tt.wantErr || got == nil {
t.Errorf("SendWithRetries() error = %v, wantErr %v or got nil", err, tt.wantErr)
return
}
if got.StatusCode != nethttp.StatusServiceUnavailable {
t.Errorf("SendWithRetries() got = %v, want %v", got.StatusCode, nethttp.StatusServiceUnavailable)
return
}
if n != tt.wantDispatch {
t.Errorf("expected %d retries got %d", tt.config.RetryMax, n)
return
}
})
}
}

func TestRetryConfigFromDeliverySpecCheckDelivery(t *testing.T) {
linear := eventingduck.BackoffPolicyLinear
tests := []struct {
name string
spec eventingduck.DeliverySpec
retryMax int
wantErr bool
}{
{
name: "full delivery",
spec: eventingduck.DeliverySpec{
Retry: pointer.Int32Ptr(10),
BackoffPolicy: &linear,
BackoffDelay: pointer.StringPtr("PT1S"),
},
retryMax: 10,
wantErr: false,
},
{
name: "only retry",
spec: eventingduck.DeliverySpec{
Retry: pointer.Int32Ptr(10),
BackoffPolicy: &linear,
},
retryMax: 10,
wantErr: false,
},
{
name: "not ISO8601",
spec: eventingduck.DeliverySpec{
Retry: pointer.Int32Ptr(10),
BackoffDelay: pointer.StringPtr("PP1"),
BackoffPolicy: &linear,
},
retryMax: 10,
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := RetryConfigFromDeliverySpec(tt.spec)
if (err != nil) != tt.wantErr {
t.Errorf("RetryConfigFromDeliverySpec() error = %v, wantErr %v", err, tt.wantErr)
return
}
if err != nil {
return
}

if got.CheckRetry == nil {
t.Errorf("CheckRetry must not be nil")
return
}
if got.Backoff == nil {
t.Errorf("Backoff must not be nil")
}
if got.RetryMax != tt.retryMax {
t.Errorf("retryMax want %d got %d", tt.retryMax, got.RetryMax)
}
})
}
}
8 changes: 3 additions & 5 deletions test/e2e/helpers/broker_redelivery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ func BrokerRedelivery(t *testing.T, creator BrokerCreatorWithRetries) {

t.Run(dropevents.Fibonacci, func(t *testing.T) {
brokerRedelivery(t, creator, numRetries, func(pod *corev1.Pod, client *testlib.Client) error {
container := pod.Spec.Containers[0]
container.Env = append(container.Env,
pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env,
corev1.EnvVar{
Name: dropevents.SkipAlgorithmKey,
Value: dropevents.Fibonacci,
Expand All @@ -53,15 +52,14 @@ func BrokerRedelivery(t *testing.T, creator BrokerCreatorWithRetries) {

t.Run(dropevents.Sequence, func(t *testing.T) {
brokerRedelivery(t, creator, numRetries, func(pod *corev1.Pod, client *testlib.Client) error {
container := pod.Spec.Containers[0]
container.Env = append(container.Env,
pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env,
corev1.EnvVar{
Name: dropevents.SkipAlgorithmKey,
Value: dropevents.Sequence,
},
corev1.EnvVar{
Name: dropevents.NumberKey,
Value: fmt.Sprintf("%d", numRetries-1),
Value: fmt.Sprintf("%d", numRetries),
},
)
return nil
Expand Down

0 comments on commit 84c5f88

Please sign in to comment.