diff --git a/go.mod b/go.mod index 77d99b1d855..17a6fc509f8 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( knative.dev/hack v0.0.0-20210325223819-b6ab329907d3 knative.dev/hack/schema v0.0.0-20210325223819-b6ab329907d3 knative.dev/pkg v0.0.0-20210412173742-b51994e3b312 - knative.dev/reconciler-test v0.0.0-20210412220859-730c55592c9b + knative.dev/reconciler-test v0.0.0-20210414181401-b0c3de288f3b sigs.k8s.io/yaml v1.2.0 ) diff --git a/go.sum b/go.sum index c573bf373f6..b77ccf9c088 100644 --- a/go.sum +++ b/go.sum @@ -125,7 +125,6 @@ github.com/cloudevents/conformance v0.2.0 h1:NvSXOKlagcsOWMEbi8U7Ex/0oQ4JZE1HQ45 github.com/cloudevents/conformance v0.2.0/go.mod h1:rHKDwylBH89Rns6U3wL9ww8bg9/4GbwRCDNuyoC6bcc= github.com/cloudevents/sdk-go/observability/opencensus/v2 v2.4.1 h1:UHjY9+DJyjELyFA8vU/KHHXix1F1z7QLFskzdJZkP+0= github.com/cloudevents/sdk-go/observability/opencensus/v2 v2.4.1/go.mod h1:lhEpxMrIUkeu9rVRgoAbyqZ8GR8Hd3DUy+thHUxAHoI= -github.com/cloudevents/sdk-go/v2 v2.4.0/go.mod h1:MZiMwmAh5tGj+fPFvtHv9hKurKqXtdB9haJYMJ/7GJY= github.com/cloudevents/sdk-go/v2 v2.4.1 h1:rZJoz9QVLbWQmnvLPDFEmv17Czu+CfSPwMO6lhJ72xQ= github.com/cloudevents/sdk-go/v2 v2.4.1/go.mod h1:MZiMwmAh5tGj+fPFvtHv9hKurKqXtdB9haJYMJ/7GJY= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= @@ -1116,11 +1115,10 @@ knative.dev/hack v0.0.0-20210325223819-b6ab329907d3 h1:km0Rrh0T9/wA2pivQm1hqSPVw knative.dev/hack v0.0.0-20210325223819-b6ab329907d3/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI= knative.dev/hack/schema v0.0.0-20210325223819-b6ab329907d3 h1:F/pVm+rB+WpyVhH9cmVn3Lh53+UI24qlnjaYiqaw1pw= knative.dev/hack/schema v0.0.0-20210325223819-b6ab329907d3/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0= -knative.dev/pkg v0.0.0-20210409203851-3a2ae6db7097/go.mod h1:V/yjYpwRpIoUCavOoF8plCw72kF7rMjWPms5v2QqxA4= knative.dev/pkg v0.0.0-20210412173742-b51994e3b312 h1:tE80vxKw9ENrLRe+U9BvLAcJ5UYpDc40r5hFoRFUXh0= knative.dev/pkg v0.0.0-20210412173742-b51994e3b312/go.mod h1:V/yjYpwRpIoUCavOoF8plCw72kF7rMjWPms5v2QqxA4= -knative.dev/reconciler-test v0.0.0-20210412220859-730c55592c9b h1:iZmF4VAtAVmgroOTeZkcEMADn2hTvJ/x462s+WJe7Mc= -knative.dev/reconciler-test v0.0.0-20210412220859-730c55592c9b/go.mod h1:25eIFZKVo0CgZ8hGCK9Lxv2sH9VIZGQyCX64RQnP9Uk= +knative.dev/reconciler-test v0.0.0-20210414181401-b0c3de288f3b h1:7doQCjkEY0Zg7H5Wa5Bqt0azIL4tjlvJK6qoKRjWjFc= +knative.dev/reconciler-test v0.0.0-20210414181401-b0c3de288f3b/go.mod h1:lo1LZBzDHGKn2KXxEbZfmrmEH03RkC4uC9nBlMvwyz4= pgregory.net/rapid v0.3.3 h1:jCjBsY4ln4Atz78QoBWxUEvAHaFyNDQg9+WU62aCn1U= pgregory.net/rapid v0.3.3/go.mod h1:UYpPVyjFHzYBGHIxLFoupi8vwk6rXNzRY9OMvVxFIOU= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= diff --git a/test/rekt/features/broker/control_plane.go b/test/rekt/features/broker/control_plane.go index db6a5347c6b..fa6cf157adf 100644 --- a/test/rekt/features/broker/control_plane.go +++ b/test/rekt/features/broker/control_plane.go @@ -19,20 +19,26 @@ package broker import ( "context" "encoding/json" + "fmt" + "sort" "strings" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + v1 "knative.dev/eventing/pkg/apis/duck/v1" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" eventingclientsetv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1" eventingclient "knative.dev/eventing/pkg/client/injection/client" "knative.dev/eventing/test/rekt/features/knconf" brokerresources "knative.dev/eventing/test/rekt/resources/broker" + "knative.dev/eventing/test/rekt/resources/delivery" triggerresources "knative.dev/eventing/test/rekt/resources/trigger" "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/manifest" "knative.dev/reconciler-test/pkg/state" "knative.dev/reconciler-test/resources/svc" ) @@ -281,13 +287,79 @@ func ControlPlaneDelivery(brokerName string) *feature.Feature { f.Setup("Set Broker Name", setBrokerName(brokerName)) - f.Stable("Conformance"). - Should("When `BrokerSpec.Delivery` and `TriggerSpec.Delivery` are both not configured, no delivery spec SHOULD be used.", - todo). - Should("When `BrokerSpec.Delivery` is configured, but not the specific `TriggerSpec.Delivery`, then the `BrokerSpec.Delivery` SHOULD be used.", - todo). - Should("When `TriggerSpec.Delivery` is configured, then `TriggerSpec.Delivery` SHOULD be used.", - todo) + for i, tt := range []struct { + name string + brokerDS *v1.DeliverySpec + // Trigger 1 Delivery spec + t1DS *v1.DeliverySpec + // How many events to fail before succeeding + t1FailCount uint + // Trigger 2 Delivery spec + t2DS *v1.DeliverySpec + // How many events to fail before succeeding + t2FailCount uint + }{{ + name: "When `BrokerSpec.Delivery` and `TriggerSpec.Delivery` are both not configured, no delivery spec SHOULD be used.", + // TODO: save these for a followup, just trigger spec seems to be having issues. Might be a bug in eventing? + //}, { + // name: "When `BrokerSpec.Delivery` is configured, but not the specific `TriggerSpec.Delivery`, then the `BrokerSpec.Delivery` SHOULD be used. (Retry)", + // brokerDS: &v1.DeliverySpec{ + // DeadLetterSink: new(duckv1.Destination), + // Retry: ptr.Int32(3), + // }, + // t1FailCount: 3, // Should get event. + // t2FailCount: 4, // Should end up in DLQ. + //}, { + // name: "When `TriggerSpec.Delivery` is configured, then `TriggerSpec.Delivery` SHOULD be used. (Retry)", + // t1DS: &v1.DeliverySpec{ + // DeadLetterSink: new(duckv1.Destination), + // Retry: ptr.Int32(3), + // }, + // t1FailCount: 3, // Should get event. + // t2FailCount: 1, // Should be dropped. + //}, { + // name: "When both `BrokerSpec.Delivery` and `TriggerSpec.Delivery` is configured, then `TriggerSpec.Delivery` SHOULD be used. (Retry)", + // brokerDS: &v1.DeliverySpec{ + // DeadLetterSink: new(duckv1.Destination), + // Retry: ptr.Int32(1), + // }, + // t1DS: &v1.DeliverySpec{ + // DeadLetterSink: new(duckv1.Destination), + // Retry: ptr.Int32(3), + // }, + // t1FailCount: 3, // Should get event. + // t2FailCount: 2, // Should end up in DLQ. + }} { + brokerName := fmt.Sprintf("dlq-test-%d", i) + prober := createBrokerTriggerDeliveryTopology(f, brokerName, tt.brokerDS, tt.t1DS, tt.t2DS, tt.t1FailCount, tt.t2FailCount) + + // Send an event into the matrix and hope for the best + prober.SenderFullEvents(1) + f.Setup("install source", prober.SenderInstall("source")) + f.Requirement("sender is finished", prober.SenderDone("source")) + + // All events have been sent, time to look at the specs and confirm we got them. + expectedEvents := createExpectedEventMap(tt.brokerDS, tt.t1DS, tt.t2DS, tt.t1FailCount, tt.t2FailCount) + + f.Requirement("wait until done", func(ctx context.Context, t feature.T) { + interval, timeout := environment.PollTimingsFromContext(ctx) + err := wait.PollImmediate(interval, timeout, func() (bool, error) { + gtg := true + for prefix, want := range expectedEvents { + events := prober.ReceivedOrRejectedBy(ctx, prefix) + if len(events) != len(want.eventSuccess) { + gtg = false + } + } + return gtg, nil + }) + if err != nil { + t.Failed() + } + }) + + f.Stable("Conformance").Should(tt.name, assertExpectedEvents(prober, expectedEvents)) + } return f } @@ -428,3 +500,252 @@ func triggerSpecBrokerIsImmutable(ctx context.Context, t feature.T) { t.Errorf("Trigger spec.broker is mutable") } } + +// +// createBrokerTriggerDeliveryTopology creates a topology that allows us to test the various +// delivery configurations. +// +// source ---> [broker (brokerDS)] --+--[trigger1 (t1ds)]--> "t1" +// | | | +// | | +--> "t1dlq" (optional) +// | | +// | +-[trigger2 (t2ds)]--> "t2" +// | | +// | +--> "t2dlq" (optional) +// | +// +--[DLQ]--> "dlq" (optional) +// +func createBrokerTriggerDeliveryTopology(f *feature.Feature, brokerName string, brokerDS, t1DS, t2DS *v1.DeliverySpec, t1FailCount, t2FailCount uint) *eventshub.EventProber { + prober := eventshub.NewProber() + // This will set or clear the broker delivery spec settings. + // Make trigger with delivery settings. + // Make a trigger with no delivery spec. + + // TODO: Optimize these to only install things required. For example, if there's no t2 dlq, no point creating a prober for it. + f.Setup("install recorder for t1", prober.ReceiverInstall("t1", eventshub.DropFirstN(t1FailCount))) + f.Setup("install recorder for t1dlq", prober.ReceiverInstall("t1dlq")) + f.Setup("install recorder for t2", prober.ReceiverInstall("t2", eventshub.DropFirstN(t2FailCount))) + f.Setup("install recorder for t2dlq", prober.ReceiverInstall("t2dlq")) + f.Setup("install recorder for broker dlq", prober.ReceiverInstall("brokerdlq")) + + brokerOpts := brokerresources.WithEnvConfig() + + if brokerDS != nil { + if brokerDS.DeadLetterSink != nil { + brokerOpts = append(brokerOpts, delivery.WithDeadLetterSink(prober.AsKReference("brokerdlq"), "")) + } + if brokerDS.Retry != nil { + brokerOpts = append(brokerOpts, delivery.WithRetry(*brokerDS.Retry, brokerDS.BackoffPolicy, brokerDS.BackoffDelay)) + } + } + f.Setup("Create Broker", brokerresources.Install(brokerName, brokerOpts...)) + f.Setup("Broker is Ready", brokerresources.IsReady(brokerName)) // We want to block until broker is ready to go. + + prober.SetTargetResource(brokerresources.GVR(), brokerName) + t1Opts := []manifest.CfgFn{triggerresources.WithSubscriber(prober.AsKReference("t1"), "")} + + if t1DS != nil { + if t1DS.DeadLetterSink != nil { + t1Opts = append(t1Opts, delivery.WithDeadLetterSink(prober.AsKReference("t1dlq"), "")) + } + if t1DS.Retry != nil { + t1Opts = append(t1Opts, delivery.WithRetry(*t1DS.Retry, t1DS.BackoffPolicy, t1DS.BackoffDelay)) + } + } + f.Setup("Create Trigger1 with recorder", triggerresources.Install(feature.MakeRandomK8sName("t1"), brokerName, t1Opts...)) + + t2Opts := []manifest.CfgFn{triggerresources.WithSubscriber(prober.AsKReference("t2"), "")} + if t2DS != nil { + if t2DS.DeadLetterSink != nil { + t2Opts = append(t2Opts, delivery.WithDeadLetterSink(prober.AsKReference("t2dlq"), "")) + } + if t2DS.Retry != nil { + t2Opts = append(t2Opts, delivery.WithRetry(*t2DS.Retry, t2DS.BackoffPolicy, t2DS.BackoffDelay)) + } + } + f.Setup("Create Trigger2 with recorder", triggerresources.Install(feature.MakeRandomK8sName("t2"), brokerName, t2Opts...)) + return prober +} + +// createExpectedEventMap creates a datastructure for a given test topology created by `createBrokerTriggerDeliveryTopology` function. +// Things we know from the DeliverySpecs passed in are where failed events from both t1 and t2 should land in. +// We also know how many events (incoming as well as how many failures the trigger subscriber is supposed to see). +// Note there are lot of baked assumptions and very tight coupling between this and `createBrokerTriggerDeliveryTopology` function. +type expectedEvents struct { + eventSuccess []bool // events and their outcomes (succeeded or failed) in order received by the Receiver + // What is the minimum time between events above. If there's only one entry, it's irrelevant and will be useless. If there's, say + // two entries, the second entry is the time between the first and second event. And yeah, there should be 2 events in the above array. + eventInterval []uint +} + +func retryCount(r *int32) uint { + if r == nil { + return 0 + } + return uint(*r) +} + +func createExpectedEventMap(brokerDS, t1DS, t2DS *v1.DeliverySpec, t1FailCount, t2FailCount uint) map[string]expectedEvents { + // By default, assume that nothing gets anything. + r := map[string]expectedEvents{ + "t1": { + eventSuccess: []bool{}, + eventInterval: []uint{}, + }, + "t2": { + eventSuccess: []bool{}, + eventInterval: []uint{}, + }, + "t1dlq": { + eventSuccess: []bool{}, + eventInterval: []uint{}, + }, + "t2dlq": { + eventSuccess: []bool{}, + eventInterval: []uint{}, + }, + "brokerdlq": { + eventSuccess: []bool{}, + eventInterval: []uint{}, + }, + } + + // For now we assume that there is only one incoming event that will then get retried at respective triggers according + // to their Delivery policy. Also depending on how the Delivery is configured, it may be delivered to triggers DLQ or + // the Broker DLQ. + if t1DS != nil && t1DS.DeadLetterSink != nil { + // There's a dead letter sink specified. Events can end up here if t1FailCount is greater than retry count + retryCount := retryCount(t1DS.Retry) + if t1FailCount >= retryCount { + // Ok, so we should have more failures than retries => one event in the t1dlq + r["t1dlq"] = expectedEvents{ + eventSuccess: []bool{true}, + eventInterval: []uint{0}, + } + } + } + + if t2DS != nil && t2DS.DeadLetterSink != nil { + // There's a dead letter sink specified. Events can end up here if t1FailCount is greater than retry count + retryCount := retryCount(t2DS.Retry) + if t2FailCount >= retryCount { + // Ok, so we should have more failures than retries => one event in the t1dlq + r["t2dlq"] = expectedEvents{ + eventSuccess: []bool{true}, + eventInterval: []uint{0}, + } + } + } + + if brokerDS != nil && brokerDS.DeadLetterSink != nil { + // There's a dead letter sink specified. Events can end up here if t1FailCount or t2FailCount is greater than retry count + retryCount := retryCount(brokerDS.Retry) + if t2FailCount >= retryCount || t1FailCount >= retryCount { + // Ok, so we should have more failures than retries => one event in the t1dlq + r["brokerdlq"] = expectedEvents{ + eventSuccess: []bool{true}, + eventInterval: []uint{0}, + } + } + } + + // Ok, so that basically hopefully took care of if any of the DLQs should get events. + + // Default is that there are no retries (they will get constructed below if there are), so assume + // no retries and failure or success based on the t1FailCount + r["t1"] = expectedEvents{ + eventSuccess: []bool{t1FailCount == 0}, + eventInterval: []uint{0}, + } + + if t1DS != nil || brokerDS != nil { + // Check to see which DeliverySpec applies to Trigger + effectiveT1DS := t1DS + if t1DS == nil { + effectiveT1DS = brokerDS + } + r["t1"] = helper(retryCount(effectiveT1DS.Retry), t1FailCount, true) + } + r["t2"] = expectedEvents{ + eventSuccess: []bool{t2FailCount == 0}, + eventInterval: []uint{0}, + } + if t2DS != nil || brokerDS != nil { + // Check to see which DeliverySpec applies to Trigger + effectiveT2DS := t2DS + if t2DS == nil { + effectiveT2DS = brokerDS + } + r["t2"] = helper(retryCount(effectiveT2DS.Retry), t2FailCount, true) + } + return r +} + +func helper(retry, failures uint, isLinear bool) expectedEvents { + if retry == 0 { + return expectedEvents{ + eventSuccess: []bool{failures == 0}, + eventInterval: []uint{0}, + } + + } + r := expectedEvents{ + eventSuccess: make([]bool, 0), + eventInterval: make([]uint, 0), + } + for i := uint(0); i <= retry; i++ { + if failures == i { + r.eventSuccess = append(r.eventSuccess, true) + r.eventInterval = append(r.eventInterval, 0) + break + } + r.eventSuccess = append(r.eventSuccess, false) + r.eventInterval = append(r.eventInterval, 0) + } + return r +} + +func assertExpectedEvents(prober *eventshub.EventProber, expected map[string]expectedEvents) feature.StepFn { + return func(ctx context.Context, t feature.T) { + for prefix, want := range expected { + got := happened(ctx, prober, prefix) + + t.Logf("Expected Events %s; \nGot: %#v\n Want: %#v", prefix, got, want) + + // Check event acceptance. + if len(want.eventSuccess) != 0 && len(got.eventSuccess) != 0 { + if diff := cmp.Diff(want.eventSuccess, got.eventSuccess); diff != "" { + t.Error("unexpected event acceptance behaviour (-want, +got) =", diff) + } + } + // Check timing. + if len(want.eventInterval) != 0 && len(got.eventInterval) != 0 { + if diff := cmp.Diff(want.eventInterval, got.eventInterval); diff != "" { + t.Error("unexpected event interval behaviour (-want, +got) =", diff) + } + } + } + } +} + +// TODO: this function could be moved to the prober directly. +func happened(ctx context.Context, prober *eventshub.EventProber, prefix string) expectedEvents { + events := prober.ReceivedOrRejectedBy(ctx, prefix) + sort.Slice(events, func(i, j int) bool { + return events[i].Time.Before(events[j].Time) + }) + got := expectedEvents{ + eventSuccess: make([]bool, 0), + eventInterval: make([]uint, 0), + } + for i, event := range events { + got.eventSuccess = append(got.eventSuccess, event.Kind == eventshub.EventReceived) + if i == 0 { + got.eventInterval = []uint{0} + } else { + diff := events[i-1].Time.Unix() - event.Time.Unix() + got.eventInterval = append(got.eventInterval, uint(diff)) + } + } + return got +} diff --git a/test/rekt/features/broker/control_plane_test.go b/test/rekt/features/broker/control_plane_test.go new file mode 100644 index 00000000000..a2fa3a984a5 --- /dev/null +++ b/test/rekt/features/broker/control_plane_test.go @@ -0,0 +1,278 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package broker + +import ( + "reflect" + "testing" + + "github.com/google/go-cmp/cmp" + + v1 "knative.dev/eventing/pkg/apis/duck/v1" + pkgduckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/ptr" +) + +var noEvents = expectedEvents{ + eventSuccess: []bool{}, + eventInterval: []uint{}, +} + +var oneSuccessfulEvent = expectedEvents{ + eventSuccess: []bool{true}, + eventInterval: []uint{0}, +} + +var dlqSink = &pkgduckv1.Destination{ + Ref: &pkgduckv1.KReference{ + Kind: "doesnotmatter", + Namespace: "yah", + Name: "sumtin", + }, +} + +func TestHelper(t *testing.T) { + for _, tt := range []struct { + retry uint + failures uint + want expectedEvents + }{{ + retry: 0, + failures: 1, + want: expectedEvents{ + eventSuccess: []bool{false}, + eventInterval: []uint{0}, + }, + }, { + retry: 0, + failures: 0, + want: expectedEvents{ + eventSuccess: []bool{true}, + eventInterval: []uint{0}, + }, + }, { + retry: 0, + failures: 5, + want: expectedEvents{ + eventSuccess: []bool{false}, + eventInterval: []uint{0}, + }, + }, { + retry: 3, + failures: 0, + want: expectedEvents{ + eventSuccess: []bool{true}, + eventInterval: []uint{0}, + }, + }, { + retry: 3, + failures: 2, + want: expectedEvents{ + eventSuccess: []bool{false, false, true}, + eventInterval: []uint{0, 0, 0}, + }, + }, { + // Test with 3 retries => 4 sends altogether(one initial, plus 3 retries) + retry: 3, + failures: 4, + want: expectedEvents{ + eventSuccess: []bool{false, false, false, false}, + eventInterval: []uint{0, 0, 0, 0}, + }, + }} { + got := helper(tt.retry, tt.failures, false) + if diff := cmp.Diff(tt.want, got, cmp.AllowUnexported(expectedEvents{})); diff != "" { + t.Log("Unexpected out: ", diff) + } + } +} + +func TestCreateExpectedEventMap(t *testing.T) { + // two := int32(2) + + for _, tt := range []struct { + name string + brokerDS *v1.DeliverySpec + t1DS *v1.DeliverySpec + t2DS *v1.DeliverySpec + t1FailCount uint + t2FailCount uint + want map[string]expectedEvents + }{{ + name: "no retries, no failures, both t1 / t2 get it, nothing else", + brokerDS: nil, + t1DS: nil, + t2DS: nil, + t1FailCount: 0, + t2FailCount: 0, + want: map[string]expectedEvents{ + "t1": { + eventSuccess: []bool{true}, + eventInterval: []uint{0}, + }, + "t2": { + eventSuccess: []bool{true}, + eventInterval: []uint{0}, + }, + "t1dlq": noEvents, + "t2dlq": noEvents, + "brokerdlq": noEvents, + }, + }, { + name: "t1, one retry, no failures, both t1 / t2 get it, nothing else", + brokerDS: nil, + t1DS: &v1.DeliverySpec{ + Retry: ptr.Int32(1), + }, + t2DS: nil, + t1FailCount: 0, + t2FailCount: 0, + want: map[string]expectedEvents{ + "t1": { + eventSuccess: []bool{true}, + eventInterval: []uint{0}, + }, + "t2": { + eventSuccess: []bool{true}, + eventInterval: []uint{0}, + }, + "t1dlq": noEvents, + "t2dlq": noEvents, + "brokerdlq": noEvents, + }, + }, { + name: "t1, one retry, one failure, both t1 / t2 get it, t1dlq gets it too", + brokerDS: nil, + t1DS: &v1.DeliverySpec{ + Retry: ptr.Int32(1), + DeadLetterSink: dlqSink, + }, + t2DS: nil, + t1FailCount: 2, + t2FailCount: 0, + want: map[string]expectedEvents{ + "t1": { + eventSuccess: []bool{false, false}, + eventInterval: []uint{0, 0}, + }, + "t2": { + eventSuccess: []bool{true}, + eventInterval: []uint{0}, + }, + "t1dlq": oneSuccessfulEvent, + "t2dlq": noEvents, + "brokerdlq": noEvents, + }, + }, { + name: "t1, four retries with linear, 5 seconds apart, 3 failures, both t1 / t2 get it, no dlq gets it because succeeds on 4th try", + brokerDS: nil, + t1DS: &v1.DeliverySpec{ + Retry: ptr.Int32(4), + DeadLetterSink: dlqSink, + }, + t2DS: nil, + t1FailCount: 3, + t2FailCount: 0, + want: map[string]expectedEvents{ + "t1": { + eventSuccess: []bool{false, false, false, true}, + eventInterval: []uint{0, 0, 0, 0}, + }, + "t2": { + eventSuccess: []bool{true}, + eventInterval: []uint{0}, + }, + "t1dlq": noEvents, + "t2dlq": noEvents, + "brokerdlq": noEvents, + }, + }, { + name: "t1, three retries with linear, 5 seconds apart, 4 failures, both t1 / t2 get it, t1dlq gets it because fails all 4 tries", + brokerDS: nil, + t1DS: &v1.DeliverySpec{ + Retry: ptr.Int32(3), + DeadLetterSink: dlqSink, + }, + t2DS: nil, + t1FailCount: 4, + t2FailCount: 0, + want: map[string]expectedEvents{ + "t1": { + eventSuccess: []bool{false, false, false, false}, + eventInterval: []uint{0, 0, 0, 0}, + }, + "t2": { + eventSuccess: []bool{true}, + eventInterval: []uint{0}, + }, + "t1dlq": oneSuccessfulEvent, + "t2dlq": noEvents, + "brokerdlq": noEvents, + }, + }, { + name: "t2, one retry, one failure, both t1 / t2 get it, t2dlq gets it too", + brokerDS: nil, + t1DS: nil, + t2DS: &v1.DeliverySpec{ + Retry: ptr.Int32(1), + DeadLetterSink: dlqSink, + }, + t1FailCount: 0, + t2FailCount: 1, + want: map[string]expectedEvents{ + "t1": { + eventSuccess: []bool{true}, + eventInterval: []uint{0}, + }, + "t2": { + eventSuccess: []bool{false, true}, + eventInterval: []uint{0, 0}, + }, + "t1dlq": noEvents, + "t2dlq": oneSuccessfulEvent, + "brokerdlq": noEvents, + }, + }, { + name: "t1, one retry, one failure, both t1 / t2 get it, brokerdlq gets it too", + brokerDS: &v1.DeliverySpec{ + Retry: ptr.Int32(1), + DeadLetterSink: dlqSink, + }, + t1DS: nil, + t2DS: nil, + t1FailCount: 1, + t2FailCount: 0, + want: map[string]expectedEvents{ + "t1": { + eventSuccess: []bool{false, true}, + eventInterval: []uint{0, 0}, + }, + "t2": { + eventSuccess: []bool{true}, + eventInterval: []uint{0}, + }, + "t1dlq": noEvents, + "t2dlq": noEvents, + "brokerdlq": oneSuccessfulEvent, + }, + }} { + got := createExpectedEventMap(tt.brokerDS, tt.t1DS, tt.t2DS, tt.t1FailCount, tt.t2FailCount) + if !reflect.DeepEqual(tt.want, got) { + t.Logf("%s: Maps unequal: want:\n%+v\ngot:\n%+v", tt.name, tt.want, got) + } + } +} diff --git a/test/rekt/features/broker/data_plane.go b/test/rekt/features/broker/data_plane.go index 06fd6359af1..25f4738d21f 100644 --- a/test/rekt/features/broker/data_plane.go +++ b/test/rekt/features/broker/data_plane.go @@ -53,7 +53,7 @@ func DataPlaneIngress(brokerName string) *feature.Feature { May("Other versions MAY be rejected.", brokerRejectsUnknownCEVersion). ShouldNot("The Broker SHOULD NOT perform an upgrade of the produced event's CloudEvents version.", - todo). + brokerEventVersionNotUpgraded). Should("It SHOULD support both Binary Content Mode and Structured Content Mode of the HTTP Protocol Binding for CloudEvents.", todo). May("The HTTP(S) endpoint MAY be on any port, not just the standard 80 and 443.", @@ -275,3 +275,10 @@ func brokerRejectsMalformedCE(ctx context.Context, t feature.T) { } } } + +// source ---> [broker] ---[trigger]--> recorder +func brokerEventVersionNotUpgraded(ctx context.Context, t feature.T) { + // brokerName := state.GetStringOrFail(ctx, t, "brokerName") + + // Create a trigger, +} diff --git a/test/rekt/resources/trigger/trigger.go b/test/rekt/resources/trigger/trigger.go index 5fdac6c2a20..4f2bed9be46 100644 --- a/test/rekt/resources/trigger/trigger.go +++ b/test/rekt/resources/trigger/trigger.go @@ -21,6 +21,7 @@ import ( "time" "k8s.io/apimachinery/pkg/runtime/schema" + "knative.dev/eventing/test/rekt/resources/delivery" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/k8s" @@ -73,6 +74,12 @@ func WithSubscriber(ref *duckv1.KReference, uri string) manifest.CfgFn { } } +// WithDeadLetterSink adds the dead letter sink related config to a Trigger spec. +var WithDeadLetterSink = delivery.WithDeadLetterSink + +// WithRetry adds the retry related config to a Trigger spec. +var WithRetry = delivery.WithRetry + // Install will create a Trigger resource, augmented with the config fn options. func Install(name, brokerName string, opts ...manifest.CfgFn) feature.StepFn { cfg := map[string]interface{}{ diff --git a/test/rekt/resources/trigger/trigger.yaml b/test/rekt/resources/trigger/trigger.yaml index 40accda18a8..5a13576b4fd 100644 --- a/test/rekt/resources/trigger/trigger.yaml +++ b/test/rekt/resources/trigger/trigger.yaml @@ -27,8 +27,8 @@ spec: {{ range $key, $value := .filter.attributes }} {{ $key }}: "{{ $value }}" {{ end }} - {{end}} - {{if .subscriber }} + {{ end }} + {{ if .subscriber }} subscriber: {{ if .subscriber.ref }} ref: @@ -41,3 +41,28 @@ spec: uri: {{ .subscriber.uri }} {{ end }} {{ end }} + {{ if .delivery }} + delivery: + {{ if .delivery.deadLetterSink }} + deadLetterSink: + {{ if .delivery.deadLetterSink.ref }} + ref: + kind: {{ .delivery.deadLetterSink.ref.kind }} + namespace: {{ .namespace }} + name: {{ .delivery.deadLetterSink.ref.name }} + apiVersion: {{ .delivery.deadLetterSink.ref.apiVersion }} + {{ end }} + {{ if .delivery.deadLetterSink.uri }} + uri: {{ .delivery.deadLetterSink.uri }} + {{ end }} + {{ end }} + {{ if .delivery.retry }} + retry: {{ .delivery.retry}} + {{ end }} + {{ if .delivery.backoffPolicy }} + backoffPolicy: {{ .delivery.backoffPolicy}} + {{ end }} + {{ if .delivery.backoffDelay }} + backoffDelay: "{{ .delivery.backoffDelay}}" + {{ end }} + {{ end }} diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/prober.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/prober.go index 9492f292f1e..d816eba882c 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/prober.go +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/prober.go @@ -20,6 +20,8 @@ import ( "context" "errors" "fmt" + "strconv" + "time" conformanceevent "github.com/cloudevents/conformance/pkg/event" cetest "github.com/cloudevents/sdk-go/v2/test" @@ -45,6 +47,7 @@ type EventProber struct { shortNameToName map[string]string ids []string senderOptions []EventsHubOption + receiverOptions []EventsHubOption } type target struct { @@ -94,23 +97,36 @@ func (p *EventProber) SetTargetURI(targetURI string) { } } +// ReceiversRejectFirstN adds DropFirstN to the default config for new receivers. +func (p *EventProber) ReceiversRejectFirstN(n uint) { + p.receiverOptions = append(p.receiverOptions, DropFirstN(n)) +} + +// ReceiversHaveResponseDelay adds ResponseWaitTime to the default config for +// new receivers. +func (p *EventProber) ReceiversHaveResponseDelay(delay time.Duration) { + p.receiverOptions = append(p.receiverOptions, ResponseWaitTime(delay)) +} + // ReceiverInstall installs an eventshub receiver into the test env. -func (p *EventProber) ReceiverInstall(prefix string) feature.StepFn { +func (p *EventProber) ReceiverInstall(prefix string, opts ...EventsHubOption) feature.StepFn { name := feature.MakeRandomK8sName(prefix) p.shortNameToName[prefix] = name - return Install(name, StartReceiver) + opts = append(p.receiverOptions, opts...) + opts = append(opts, StartReceiver) + return Install(name, opts...) } // SenderInstall installs an eventshub sender resource into the test env. -func (p *EventProber) SenderInstall(prefix string) feature.StepFn { +func (p *EventProber) SenderInstall(prefix string, opts ...EventsHubOption) feature.StepFn { name := feature.MakeRandomK8sName(prefix) p.shortNameToName[prefix] = name return func(ctx context.Context, t feature.T) { - var opts []EventsHubOption + opts := append(opts, p.senderOptions...) if len(p.target.uri) > 0 { - opts = append(p.senderOptions, StartSenderURL(p.target.uri)) + opts = append(opts, StartSenderURL(p.target.uri)) } else if !p.target.gvr.Empty() { - opts = append(p.senderOptions, StartSenderToResource(p.target.gvr, p.target.name)) + opts = append(opts, StartSenderToResource(p.target.gvr, p.target.name)) } else { t.Fatal("no target is configured for event loop") } @@ -125,6 +141,7 @@ func (p *EventProber) SenderDone(prefix string) feature.StepFn { interval, timeout := environment.PollTimingsFromContext(ctx) err := wait.PollImmediate(interval, timeout, func() (bool, error) { events := p.SentBy(ctx, prefix) + fmt.Println(prefix, "has sent", len(events)) if len(events) == len(p.ids) { return true, nil } @@ -136,6 +153,31 @@ func (p *EventProber) SenderDone(prefix string) feature.StepFn { } } +// ReceiverDone will poll until the receiver has received all expected events. +func (p *EventProber) ReceiverDone(from, to string) feature.StepFn { + return func(ctx context.Context, t feature.T) { + interval, timeout := environment.PollTimingsFromContext(ctx) + err := wait.PollImmediate(interval, timeout, func() (bool, error) { + sent := p.SentBy(ctx, from) + fmt.Println(from, "has sent", len(sent)) + + received := p.ReceivedBy(ctx, to) + fmt.Println(to, "has received", len(received)) + + rejected := p.RejectedBy(ctx, to) + fmt.Println(to, "has rejected", len(rejected)) + + if len(sent) == len(received)+len(rejected) { + return true, nil + } + return false, nil + }) + if err != nil { + t.Failed() + } + } +} + // CorrelateSent takes in an array of mixed Sent / Response events (matched with sentEventMatcher for example) // and correlates them based on the sequence into a pair. func CorrelateSent(origin string, in []EventInfo) []EventInfoCombined { @@ -159,10 +201,6 @@ func (p *EventProber) SentBy(ctx context.Context, prefix string) []EventInfoComb name := p.shortNameToName[prefix] store := StoreFromContext(ctx, name) - for _, c := range store.Collected() { - fmt.Printf("%#v\n", c) - } - return CorrelateSent(name, store.Collected()) } @@ -171,10 +209,6 @@ func (p *EventProber) ReceivedBy(ctx context.Context, prefix string) []EventInfo name := p.shortNameToName[prefix] store := StoreFromContext(ctx, name) - for _, c := range store.Collected() { - fmt.Printf("%#v\n", c) - } - events, _, _, _ := store.Find(func(info EventInfo) error { if info.Observer == name && info.Kind == EventReceived { return nil @@ -185,6 +219,36 @@ func (p *EventProber) ReceivedBy(ctx context.Context, prefix string) []EventInfo return events } +// RejectedBy returns events rejected by the named receiver. +func (p *EventProber) RejectedBy(ctx context.Context, prefix string) []EventInfo { + name := p.shortNameToName[prefix] + store := StoreFromContext(ctx, name) + + events, _, _, _ := store.Find(func(info EventInfo) error { + if info.Observer == name && info.Kind == EventRejected { + return nil + } + return errors.New("not a match") + }) + + return events +} + +// ReceivedOrRejectedBy returns events received or rejected by the named receiver. +func (p *EventProber) ReceivedOrRejectedBy(ctx context.Context, prefix string) []EventInfo { + name := p.shortNameToName[prefix] + store := StoreFromContext(ctx, name) + + events, _, _, _ := store.Find(func(info EventInfo) error { + if info.Observer == name && (info.Kind == EventReceived || info.Kind == EventRejected) { + return nil + } + return errors.New("not a match") + }) + + return events +} + // ExpectYAMLEvents registered expected events into the prober. func (p *EventProber) ExpectYAMLEvents(path string) error { events, err := conformanceevent.FromYaml(path, true) @@ -221,25 +285,40 @@ func (p *EventProber) SenderEventsFromSVC(svcName, path string) feature.StepFn { // SenderFullEvents creates `count` cloudevents.FullEvent events with new IDs into a // sender and registers them for the prober. +// Warning: only call once. func (p *EventProber) SenderFullEvents(count int) { - for i := 0; i < count; i++ { + event := cetest.FullEvent() + if count == 1 { id := uuid.New().String() - event := cetest.FullEvent() event.SetID(id) - p.ids = append(p.ids, id) + p.ids = []string{id} p.senderOptions = append(p.senderOptions, InputEvent(event)) + } else { + p.senderOptions = append(p.senderOptions, + InputEvent(event), SendMultipleEvents(count, 10*time.Millisecond), EnableIncrementalId) // TODO: make configurable. + p.ids = []string{} + for i := 1; i <= count; i++ { + p.ids = append(p.ids, strconv.Itoa(i)) + } } } // SenderMinEvents creates `count` cloudevents.MinEvent events with new IDs into a // sender and registers them for the prober. +// Warning: only call once. func (p *EventProber) SenderMinEvents(count int) { - for i := 0; i < count; i++ { + event := cetest.MinEvent() + if count == 1 { id := uuid.New().String() - event := cetest.MinEvent() event.SetID(id) - p.ids = append(p.ids, id) + p.ids = []string{id} p.senderOptions = append(p.senderOptions, InputEvent(event)) + } else { + p.senderOptions = append(p.senderOptions, InputEvent(event), SendMultipleEvents(count, 10*time.Millisecond)) // TODO: make configurable. + p.ids = []string{} + for i := 1; i <= count; i++ { + p.ids = append(p.ids, strconv.Itoa(i)) + } } } @@ -277,13 +356,43 @@ func (p *EventProber) AssertSentAll(fromPrefix string) feature.StepFn { // AssertReceivedAll tests that all events sent by `fromPrefix` were received by `toPrefix`. func (p *EventProber) AssertReceivedAll(fromPrefix, toPrefix string) feature.StepFn { return func(ctx context.Context, t feature.T) { - sent := p.SentBy(ctx, toPrefix) + sent := p.SentBy(ctx, fromPrefix) ids := make([]string, len(sent)) for i, s := range sent { ids[i] = s.Sent.SentId } - events := p.ReceivedBy(ctx, fromPrefix) + events := p.ReceivedBy(ctx, toPrefix) + if len(ids) != len(events) { + t.Errorf("expected %q to have received %d events, actually received %d", + toPrefix, len(ids), len(events)) + } + for _, id := range ids { + found := false + for _, event := range events { + if event.Event != nil && id == event.Event.ID() { + found = true + break + } + } + if !found { + t.Errorf("Failed to receive event id=%s", id) + } + } + } +} + +// AssertReceivedAll tests that all events sent by `fromPrefix` were received by `toPrefix`. +func (p *EventProber) AssertReceivedOrRejectedAll(fromPrefix, toPrefix string) feature.StepFn { + return func(ctx context.Context, t feature.T) { + sent := p.SentBy(ctx, fromPrefix) + ids := make([]string, len(sent)) + for i, s := range sent { + ids[i] = s.Sent.SentId + } + + events := p.ReceivedOrRejectedBy(ctx, toPrefix) + if len(ids) != len(events) { t.Errorf("expected %q to have received %d events, actually received %d", fromPrefix, len(ids), len(events)) @@ -291,7 +400,7 @@ func (p *EventProber) AssertReceivedAll(fromPrefix, toPrefix string) feature.Ste for _, id := range ids { found := false for _, event := range events { - if id == event.SentId { + if event.Event != nil && id == event.Event.ID() { found = true break } diff --git a/vendor/modules.txt b/vendor/modules.txt index 7894b9a2f4c..743c297aecf 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1104,7 +1104,7 @@ knative.dev/pkg/webhook/resourcesemantics knative.dev/pkg/webhook/resourcesemantics/conversion knative.dev/pkg/webhook/resourcesemantics/defaulting knative.dev/pkg/webhook/resourcesemantics/validation -# knative.dev/reconciler-test v0.0.0-20210412220859-730c55592c9b +# knative.dev/reconciler-test v0.0.0-20210414181401-b0c3de288f3b ## explicit knative.dev/reconciler-test/cmd/eventshub knative.dev/reconciler-test/pkg/environment