Skip to content

Commit

Permalink
Adding dataplane conformance tests for Delivery Specs for Brokers. (#…
Browse files Browse the repository at this point in the history
…5262)

* stub the tests

* testing tracking

* fill out topology strawman

* stepfn -> feature

* stuff

* add more pseudo code

* add expected events helper

* add unit tests, fix some logic errors

* update reconciler-test

* move to unit

* init the same as the expected, pass

* almost working

* simplify

* rebase and small changes

* wire retry through

* mrege / cutpaset fixes

* fix off-by-one error + tests. remove unused pseudo code

* cleanup

* adding back the other spec langauge

* make dlq not nil in config

* skip the trigger ds tests for now

* fix lint issues

Co-authored-by: Ville Aikas <vaikas@vmware.com>
  • Loading branch information
Scott Nichols and vaikas authored Apr 15, 2021
1 parent 92fb0f4 commit 71153e6
Show file tree
Hide file tree
Showing 9 changed files with 784 additions and 39 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ require (
knative.dev/hack v0.0.0-20210325223819-b6ab329907d3
knative.dev/hack/schema v0.0.0-20210325223819-b6ab329907d3
knative.dev/pkg v0.0.0-20210412173742-b51994e3b312
knative.dev/reconciler-test v0.0.0-20210412220859-730c55592c9b
knative.dev/reconciler-test v0.0.0-20210414181401-b0c3de288f3b
sigs.k8s.io/yaml v1.2.0
)

Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ github.com/cloudevents/conformance v0.2.0 h1:NvSXOKlagcsOWMEbi8U7Ex/0oQ4JZE1HQ45
github.com/cloudevents/conformance v0.2.0/go.mod h1:rHKDwylBH89Rns6U3wL9ww8bg9/4GbwRCDNuyoC6bcc=
github.com/cloudevents/sdk-go/observability/opencensus/v2 v2.4.1 h1:UHjY9+DJyjELyFA8vU/KHHXix1F1z7QLFskzdJZkP+0=
github.com/cloudevents/sdk-go/observability/opencensus/v2 v2.4.1/go.mod h1:lhEpxMrIUkeu9rVRgoAbyqZ8GR8Hd3DUy+thHUxAHoI=
github.com/cloudevents/sdk-go/v2 v2.4.0/go.mod h1:MZiMwmAh5tGj+fPFvtHv9hKurKqXtdB9haJYMJ/7GJY=
github.com/cloudevents/sdk-go/v2 v2.4.1 h1:rZJoz9QVLbWQmnvLPDFEmv17Czu+CfSPwMO6lhJ72xQ=
github.com/cloudevents/sdk-go/v2 v2.4.1/go.mod h1:MZiMwmAh5tGj+fPFvtHv9hKurKqXtdB9haJYMJ/7GJY=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
Expand Down Expand Up @@ -1116,11 +1115,10 @@ knative.dev/hack v0.0.0-20210325223819-b6ab329907d3 h1:km0Rrh0T9/wA2pivQm1hqSPVw
knative.dev/hack v0.0.0-20210325223819-b6ab329907d3/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack/schema v0.0.0-20210325223819-b6ab329907d3 h1:F/pVm+rB+WpyVhH9cmVn3Lh53+UI24qlnjaYiqaw1pw=
knative.dev/hack/schema v0.0.0-20210325223819-b6ab329907d3/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0=
knative.dev/pkg v0.0.0-20210409203851-3a2ae6db7097/go.mod h1:V/yjYpwRpIoUCavOoF8plCw72kF7rMjWPms5v2QqxA4=
knative.dev/pkg v0.0.0-20210412173742-b51994e3b312 h1:tE80vxKw9ENrLRe+U9BvLAcJ5UYpDc40r5hFoRFUXh0=
knative.dev/pkg v0.0.0-20210412173742-b51994e3b312/go.mod h1:V/yjYpwRpIoUCavOoF8plCw72kF7rMjWPms5v2QqxA4=
knative.dev/reconciler-test v0.0.0-20210412220859-730c55592c9b h1:iZmF4VAtAVmgroOTeZkcEMADn2hTvJ/x462s+WJe7Mc=
knative.dev/reconciler-test v0.0.0-20210412220859-730c55592c9b/go.mod h1:25eIFZKVo0CgZ8hGCK9Lxv2sH9VIZGQyCX64RQnP9Uk=
knative.dev/reconciler-test v0.0.0-20210414181401-b0c3de288f3b h1:7doQCjkEY0Zg7H5Wa5Bqt0azIL4tjlvJK6qoKRjWjFc=
knative.dev/reconciler-test v0.0.0-20210414181401-b0c3de288f3b/go.mod h1:lo1LZBzDHGKn2KXxEbZfmrmEH03RkC4uC9nBlMvwyz4=
pgregory.net/rapid v0.3.3 h1:jCjBsY4ln4Atz78QoBWxUEvAHaFyNDQg9+WU62aCn1U=
pgregory.net/rapid v0.3.3/go.mod h1:UYpPVyjFHzYBGHIxLFoupi8vwk6rXNzRY9OMvVxFIOU=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
Expand Down
335 changes: 328 additions & 7 deletions test/rekt/features/broker/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,26 @@ package broker
import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
eventingclientsetv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/eventing/test/rekt/features/knconf"
brokerresources "knative.dev/eventing/test/rekt/resources/broker"
"knative.dev/eventing/test/rekt/resources/delivery"
triggerresources "knative.dev/eventing/test/rekt/resources/trigger"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/manifest"
"knative.dev/reconciler-test/pkg/state"
"knative.dev/reconciler-test/resources/svc"
)
Expand Down Expand Up @@ -281,13 +287,79 @@ func ControlPlaneDelivery(brokerName string) *feature.Feature {

f.Setup("Set Broker Name", setBrokerName(brokerName))

f.Stable("Conformance").
Should("When `BrokerSpec.Delivery` and `TriggerSpec.Delivery` are both not configured, no delivery spec SHOULD be used.",
todo).
Should("When `BrokerSpec.Delivery` is configured, but not the specific `TriggerSpec.Delivery`, then the `BrokerSpec.Delivery` SHOULD be used.",
todo).
Should("When `TriggerSpec.Delivery` is configured, then `TriggerSpec.Delivery` SHOULD be used.",
todo)
for i, tt := range []struct {
name string
brokerDS *v1.DeliverySpec
// Trigger 1 Delivery spec
t1DS *v1.DeliverySpec
// How many events to fail before succeeding
t1FailCount uint
// Trigger 2 Delivery spec
t2DS *v1.DeliverySpec
// How many events to fail before succeeding
t2FailCount uint
}{{
name: "When `BrokerSpec.Delivery` and `TriggerSpec.Delivery` are both not configured, no delivery spec SHOULD be used.",
// TODO: save these for a followup, just trigger spec seems to be having issues. Might be a bug in eventing?
//}, {
// name: "When `BrokerSpec.Delivery` is configured, but not the specific `TriggerSpec.Delivery`, then the `BrokerSpec.Delivery` SHOULD be used. (Retry)",
// brokerDS: &v1.DeliverySpec{
// DeadLetterSink: new(duckv1.Destination),
// Retry: ptr.Int32(3),
// },
// t1FailCount: 3, // Should get event.
// t2FailCount: 4, // Should end up in DLQ.
//}, {
// name: "When `TriggerSpec.Delivery` is configured, then `TriggerSpec.Delivery` SHOULD be used. (Retry)",
// t1DS: &v1.DeliverySpec{
// DeadLetterSink: new(duckv1.Destination),
// Retry: ptr.Int32(3),
// },
// t1FailCount: 3, // Should get event.
// t2FailCount: 1, // Should be dropped.
//}, {
// name: "When both `BrokerSpec.Delivery` and `TriggerSpec.Delivery` is configured, then `TriggerSpec.Delivery` SHOULD be used. (Retry)",
// brokerDS: &v1.DeliverySpec{
// DeadLetterSink: new(duckv1.Destination),
// Retry: ptr.Int32(1),
// },
// t1DS: &v1.DeliverySpec{
// DeadLetterSink: new(duckv1.Destination),
// Retry: ptr.Int32(3),
// },
// t1FailCount: 3, // Should get event.
// t2FailCount: 2, // Should end up in DLQ.
}} {
brokerName := fmt.Sprintf("dlq-test-%d", i)
prober := createBrokerTriggerDeliveryTopology(f, brokerName, tt.brokerDS, tt.t1DS, tt.t2DS, tt.t1FailCount, tt.t2FailCount)

// Send an event into the matrix and hope for the best
prober.SenderFullEvents(1)
f.Setup("install source", prober.SenderInstall("source"))
f.Requirement("sender is finished", prober.SenderDone("source"))

// All events have been sent, time to look at the specs and confirm we got them.
expectedEvents := createExpectedEventMap(tt.brokerDS, tt.t1DS, tt.t2DS, tt.t1FailCount, tt.t2FailCount)

f.Requirement("wait until done", func(ctx context.Context, t feature.T) {
interval, timeout := environment.PollTimingsFromContext(ctx)
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
gtg := true
for prefix, want := range expectedEvents {
events := prober.ReceivedOrRejectedBy(ctx, prefix)
if len(events) != len(want.eventSuccess) {
gtg = false
}
}
return gtg, nil
})
if err != nil {
t.Failed()
}
})

f.Stable("Conformance").Should(tt.name, assertExpectedEvents(prober, expectedEvents))
}

return f
}
Expand Down Expand Up @@ -428,3 +500,252 @@ func triggerSpecBrokerIsImmutable(ctx context.Context, t feature.T) {
t.Errorf("Trigger spec.broker is mutable")
}
}

//
// createBrokerTriggerDeliveryTopology creates a topology that allows us to test the various
// delivery configurations.
//
// source ---> [broker (brokerDS)] --+--[trigger1 (t1ds)]--> "t1"
// | | |
// | | +--> "t1dlq" (optional)
// | |
// | +-[trigger2 (t2ds)]--> "t2"
// | |
// | +--> "t2dlq" (optional)
// |
// +--[DLQ]--> "dlq" (optional)
//
func createBrokerTriggerDeliveryTopology(f *feature.Feature, brokerName string, brokerDS, t1DS, t2DS *v1.DeliverySpec, t1FailCount, t2FailCount uint) *eventshub.EventProber {
prober := eventshub.NewProber()
// This will set or clear the broker delivery spec settings.
// Make trigger with delivery settings.
// Make a trigger with no delivery spec.

// TODO: Optimize these to only install things required. For example, if there's no t2 dlq, no point creating a prober for it.
f.Setup("install recorder for t1", prober.ReceiverInstall("t1", eventshub.DropFirstN(t1FailCount)))
f.Setup("install recorder for t1dlq", prober.ReceiverInstall("t1dlq"))
f.Setup("install recorder for t2", prober.ReceiverInstall("t2", eventshub.DropFirstN(t2FailCount)))
f.Setup("install recorder for t2dlq", prober.ReceiverInstall("t2dlq"))
f.Setup("install recorder for broker dlq", prober.ReceiverInstall("brokerdlq"))

brokerOpts := brokerresources.WithEnvConfig()

if brokerDS != nil {
if brokerDS.DeadLetterSink != nil {
brokerOpts = append(brokerOpts, delivery.WithDeadLetterSink(prober.AsKReference("brokerdlq"), ""))
}
if brokerDS.Retry != nil {
brokerOpts = append(brokerOpts, delivery.WithRetry(*brokerDS.Retry, brokerDS.BackoffPolicy, brokerDS.BackoffDelay))
}
}
f.Setup("Create Broker", brokerresources.Install(brokerName, brokerOpts...))
f.Setup("Broker is Ready", brokerresources.IsReady(brokerName)) // We want to block until broker is ready to go.

prober.SetTargetResource(brokerresources.GVR(), brokerName)
t1Opts := []manifest.CfgFn{triggerresources.WithSubscriber(prober.AsKReference("t1"), "")}

if t1DS != nil {
if t1DS.DeadLetterSink != nil {
t1Opts = append(t1Opts, delivery.WithDeadLetterSink(prober.AsKReference("t1dlq"), ""))
}
if t1DS.Retry != nil {
t1Opts = append(t1Opts, delivery.WithRetry(*t1DS.Retry, t1DS.BackoffPolicy, t1DS.BackoffDelay))
}
}
f.Setup("Create Trigger1 with recorder", triggerresources.Install(feature.MakeRandomK8sName("t1"), brokerName, t1Opts...))

t2Opts := []manifest.CfgFn{triggerresources.WithSubscriber(prober.AsKReference("t2"), "")}
if t2DS != nil {
if t2DS.DeadLetterSink != nil {
t2Opts = append(t2Opts, delivery.WithDeadLetterSink(prober.AsKReference("t2dlq"), ""))
}
if t2DS.Retry != nil {
t2Opts = append(t2Opts, delivery.WithRetry(*t2DS.Retry, t2DS.BackoffPolicy, t2DS.BackoffDelay))
}
}
f.Setup("Create Trigger2 with recorder", triggerresources.Install(feature.MakeRandomK8sName("t2"), brokerName, t2Opts...))
return prober
}

// createExpectedEventMap creates a datastructure for a given test topology created by `createBrokerTriggerDeliveryTopology` function.
// Things we know from the DeliverySpecs passed in are where failed events from both t1 and t2 should land in.
// We also know how many events (incoming as well as how many failures the trigger subscriber is supposed to see).
// Note there are lot of baked assumptions and very tight coupling between this and `createBrokerTriggerDeliveryTopology` function.
type expectedEvents struct {
eventSuccess []bool // events and their outcomes (succeeded or failed) in order received by the Receiver
// What is the minimum time between events above. If there's only one entry, it's irrelevant and will be useless. If there's, say
// two entries, the second entry is the time between the first and second event. And yeah, there should be 2 events in the above array.
eventInterval []uint
}

func retryCount(r *int32) uint {
if r == nil {
return 0
}
return uint(*r)
}

func createExpectedEventMap(brokerDS, t1DS, t2DS *v1.DeliverySpec, t1FailCount, t2FailCount uint) map[string]expectedEvents {
// By default, assume that nothing gets anything.
r := map[string]expectedEvents{
"t1": {
eventSuccess: []bool{},
eventInterval: []uint{},
},
"t2": {
eventSuccess: []bool{},
eventInterval: []uint{},
},
"t1dlq": {
eventSuccess: []bool{},
eventInterval: []uint{},
},
"t2dlq": {
eventSuccess: []bool{},
eventInterval: []uint{},
},
"brokerdlq": {
eventSuccess: []bool{},
eventInterval: []uint{},
},
}

// For now we assume that there is only one incoming event that will then get retried at respective triggers according
// to their Delivery policy. Also depending on how the Delivery is configured, it may be delivered to triggers DLQ or
// the Broker DLQ.
if t1DS != nil && t1DS.DeadLetterSink != nil {
// There's a dead letter sink specified. Events can end up here if t1FailCount is greater than retry count
retryCount := retryCount(t1DS.Retry)
if t1FailCount >= retryCount {
// Ok, so we should have more failures than retries => one event in the t1dlq
r["t1dlq"] = expectedEvents{
eventSuccess: []bool{true},
eventInterval: []uint{0},
}
}
}

if t2DS != nil && t2DS.DeadLetterSink != nil {
// There's a dead letter sink specified. Events can end up here if t1FailCount is greater than retry count
retryCount := retryCount(t2DS.Retry)
if t2FailCount >= retryCount {
// Ok, so we should have more failures than retries => one event in the t1dlq
r["t2dlq"] = expectedEvents{
eventSuccess: []bool{true},
eventInterval: []uint{0},
}
}
}

if brokerDS != nil && brokerDS.DeadLetterSink != nil {
// There's a dead letter sink specified. Events can end up here if t1FailCount or t2FailCount is greater than retry count
retryCount := retryCount(brokerDS.Retry)
if t2FailCount >= retryCount || t1FailCount >= retryCount {
// Ok, so we should have more failures than retries => one event in the t1dlq
r["brokerdlq"] = expectedEvents{
eventSuccess: []bool{true},
eventInterval: []uint{0},
}
}
}

// Ok, so that basically hopefully took care of if any of the DLQs should get events.

// Default is that there are no retries (they will get constructed below if there are), so assume
// no retries and failure or success based on the t1FailCount
r["t1"] = expectedEvents{
eventSuccess: []bool{t1FailCount == 0},
eventInterval: []uint{0},
}

if t1DS != nil || brokerDS != nil {
// Check to see which DeliverySpec applies to Trigger
effectiveT1DS := t1DS
if t1DS == nil {
effectiveT1DS = brokerDS
}
r["t1"] = helper(retryCount(effectiveT1DS.Retry), t1FailCount, true)
}
r["t2"] = expectedEvents{
eventSuccess: []bool{t2FailCount == 0},
eventInterval: []uint{0},
}
if t2DS != nil || brokerDS != nil {
// Check to see which DeliverySpec applies to Trigger
effectiveT2DS := t2DS
if t2DS == nil {
effectiveT2DS = brokerDS
}
r["t2"] = helper(retryCount(effectiveT2DS.Retry), t2FailCount, true)
}
return r
}

func helper(retry, failures uint, isLinear bool) expectedEvents {
if retry == 0 {
return expectedEvents{
eventSuccess: []bool{failures == 0},
eventInterval: []uint{0},
}

}
r := expectedEvents{
eventSuccess: make([]bool, 0),
eventInterval: make([]uint, 0),
}
for i := uint(0); i <= retry; i++ {
if failures == i {
r.eventSuccess = append(r.eventSuccess, true)
r.eventInterval = append(r.eventInterval, 0)
break
}
r.eventSuccess = append(r.eventSuccess, false)
r.eventInterval = append(r.eventInterval, 0)
}
return r
}

func assertExpectedEvents(prober *eventshub.EventProber, expected map[string]expectedEvents) feature.StepFn {
return func(ctx context.Context, t feature.T) {
for prefix, want := range expected {
got := happened(ctx, prober, prefix)

t.Logf("Expected Events %s; \nGot: %#v\n Want: %#v", prefix, got, want)

// Check event acceptance.
if len(want.eventSuccess) != 0 && len(got.eventSuccess) != 0 {
if diff := cmp.Diff(want.eventSuccess, got.eventSuccess); diff != "" {
t.Error("unexpected event acceptance behaviour (-want, +got) =", diff)
}
}
// Check timing.
if len(want.eventInterval) != 0 && len(got.eventInterval) != 0 {
if diff := cmp.Diff(want.eventInterval, got.eventInterval); diff != "" {
t.Error("unexpected event interval behaviour (-want, +got) =", diff)
}
}
}
}
}

// TODO: this function could be moved to the prober directly.
func happened(ctx context.Context, prober *eventshub.EventProber, prefix string) expectedEvents {
events := prober.ReceivedOrRejectedBy(ctx, prefix)
sort.Slice(events, func(i, j int) bool {
return events[i].Time.Before(events[j].Time)
})
got := expectedEvents{
eventSuccess: make([]bool, 0),
eventInterval: make([]uint, 0),
}
for i, event := range events {
got.eventSuccess = append(got.eventSuccess, event.Kind == eventshub.EventReceived)
if i == 0 {
got.eventInterval = []uint{0}
} else {
diff := events[i-1].Time.Unix() - event.Time.Unix()
got.eventInterval = append(got.eventInterval, uint(diff))
}
}
return got
}
Loading

0 comments on commit 71153e6

Please sign in to comment.