Skip to content

Commit

Permalink
Add broker transformation test to upgrade tests (#8190)
Browse files Browse the repository at this point in the history
* Add broker transformation test to upgrade tests

* Fix test names that are reported
  • Loading branch information
mgencur authored Sep 16, 2024
1 parent c91a9d8 commit 608e76c
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 49 deletions.
104 changes: 67 additions & 37 deletions test/rekt/features/broker/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/state"

duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
Expand Down Expand Up @@ -382,51 +383,64 @@ Note: the number denotes the sequence of the event that flows in this test case.
*/
func brokerEventTransformationForTrigger() *feature.Feature {
f := feature.NewFeatureNamed("Broker event transformation for trigger")
config := BrokerEventTransformationForTriggerSetup(f)
BrokerEventTransformationForTriggerAssert(f, config)
return f
}

source := feature.MakeRandomK8sName("source")
type brokerEventTransformationConfig struct {
Broker string
Sink1 string
Sink2 string
EventToSend cloudevents.Event
TransformedEvent cloudevents.Event
}

func BrokerEventTransformationForTriggerSetup(f *feature.Feature) brokerEventTransformationConfig {
sink1 := feature.MakeRandomK8sName("sink1")
sink2 := feature.MakeRandomK8sName("sink2")

trigger1 := feature.MakeRandomK8sName("trigger1")
trigger2 := feature.MakeRandomK8sName("trigger2")

// Construct original cloudevent message
eventType := "type1"
eventSource := "http://source1.com"
eventBody := `{"msg":"e2e-brokerchannel-body"}`
// Construct cloudevent message after transformation
transformedEventType := "type2"
transformedEventSource := "http://source2.com"
transformedBody := `{"msg":"transformed body"}`

// Construct eventToSend
eventToSend := cloudevents.NewEvent()
eventToSend.SetID(uuid.New().String())
eventToSend.SetType(eventType)
eventToSend.SetSource(eventSource)
eventToSend.SetData(cloudevents.ApplicationJSON, []byte(eventBody))
eventToSend.SetType("type1")
eventToSend.SetSource("http://source1.com")
eventToSend.SetData(cloudevents.ApplicationJSON, []byte(`{"msg":"e2e-brokerchannel-body"}`))

// Construct cloudevent message after transformation
transformedEvent := cloudevents.NewEvent()
transformedEvent.SetType("type2")
transformedEvent.SetSource("http://source2.com")
transformedEvent.SetData(cloudevents.ApplicationJSON, []byte(`{"msg":"transformed body"}`))

//Install the broker
brokerName := feature.MakeRandomK8sName("broker")
f.Setup("Set context variables", func(ctx context.Context, t feature.T) {
state.SetOrFail(ctx, t, "brokerName", brokerName)
state.SetOrFail(ctx, t, "sink1", sink1)
state.SetOrFail(ctx, t, "sink2", sink2)
})
f.Setup("install broker", broker.Install(brokerName, broker.WithEnvConfig()...))
f.Setup("broker is ready", broker.IsReady(brokerName))
f.Setup("broker is addressable", broker.IsAddressable(brokerName))

f.Setup("install sink1", eventshub.Install(sink1,
eventshub.ReplyWithTransformedEvent(transformedEventType, transformedEventSource, transformedBody),
eventshub.ReplyWithTransformedEvent(transformedEvent.Type(), transformedEvent.Source(), string(transformedEvent.Data())),
eventshub.StartReceiver),
)
f.Setup("install sink2", eventshub.Install(sink2, eventshub.StartReceiver))

// filter1 filters the original events
filter1 := eventingv1.TriggerFilterAttributes{
"type": eventType,
"source": eventSource,
"type": eventToSend.Type(),
"source": eventToSend.Source(),
}
// filter2 filters events after transformation
filter2 := eventingv1.TriggerFilterAttributes{
"type": transformedEventType,
"source": transformedEventSource,
"type": transformedEvent.Type(),
"source": transformedEvent.Source(),
}

// Install the trigger1 point to Broker and transform the original events to new events
Expand All @@ -446,33 +460,49 @@ func brokerEventTransformationForTrigger() *feature.Feature {
))
f.Setup("trigger2 goes ready", trigger.IsReady(trigger2))

return brokerEventTransformationConfig{
Broker: brokerName,
Sink1: sink1,
Sink2: sink2,
EventToSend: eventToSend,
TransformedEvent: transformedEvent,
}
}

func BrokerEventTransformationForTriggerAssert(f *feature.Feature,
cfg brokerEventTransformationConfig) {

source := feature.MakeRandomK8sName("source")

// Set new ID every time we send event to allow calling this function repeatedly
cfg.EventToSend.SetID(uuid.New().String())
f.Requirement("install source", eventshub.Install(
source,
eventshub.StartSenderToResource(broker.GVR(), brokerName),
eventshub.InputEvent(eventToSend),
eventshub.StartSenderToResource(broker.GVR(), cfg.Broker),
eventshub.InputEvent(cfg.EventToSend),
))

eventMatcher := eventasssert.MatchEvent(
test.HasSource(eventSource),
test.HasType(eventType),
test.HasData([]byte(eventBody)),
test.HasId(cfg.EventToSend.ID()),
test.HasSource(cfg.EventToSend.Source()),
test.HasType(cfg.EventToSend.Type()),
test.HasData(cfg.EventToSend.Data()),
)
transformEventMatcher := eventasssert.MatchEvent(
test.HasSource(transformedEventSource),
test.HasType(transformedEventType),
test.HasData([]byte(transformedBody)),
test.HasSource(cfg.TransformedEvent.Source()),
test.HasType(cfg.TransformedEvent.Type()),
test.HasData(cfg.TransformedEvent.Data()),
)

f.Stable("Trigger2 has filtered all transformed events").
Must("delivers original events",
eventasssert.OnStore(sink2).Match(transformEventMatcher).AtLeast(1))

f.Stable("Trigger2 has no original events").
Must("delivers original events",
eventasssert.OnStore(sink2).Match(eventMatcher).Not())

return f

f.Stable("Trigger has filtered all transformed events").
Must("trigger 1 delivers original events",
eventasssert.OnStore(cfg.Sink1).Match(eventMatcher).AtLeast(1)).
Must("trigger 1 does not deliver transformed events",
eventasssert.OnStore(cfg.Sink1).Match(transformEventMatcher).Not()).
Must("trigger 2 delivers transformed events",
eventasssert.OnStore(cfg.Sink2).Match(transformEventMatcher).AtLeast(1)).
Must("trigger 2 does not deliver original events",
eventasssert.OnStore(cfg.Sink2).Match(eventMatcher).Not())
}

func BrokerPreferHeaderCheck() *feature.Feature {
Expand Down
51 changes: 39 additions & 12 deletions test/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import (
"sync"
"testing"

"knative.dev/eventing/pkg/apis/eventing"
brokerfeatures "knative.dev/eventing/test/rekt/features/broker"
"knative.dev/eventing/test/rekt/features/channel"
brokerresources "knative.dev/eventing/test/rekt/resources/broker"
"knative.dev/eventing/test/rekt/resources/channel_impl"
"knative.dev/eventing/test/rekt/resources/subscription"
duckv1 "knative.dev/pkg/apis/duck/v1"
Expand All @@ -37,7 +40,16 @@ import (
"knative.dev/reconciler-test/pkg/manifest"
)

var channelConfigMux = &sync.Mutex{}
var (
channelConfigMux = &sync.Mutex{}
brokerConfigMux = &sync.Mutex{}
opts = []environment.EnvOpts{
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
}
)

// RunMainTest expects flags to be already initialized.
// This function needs to be exposed, so that test cases in other repositories can call the upgrade
Expand All @@ -63,7 +75,7 @@ type DurableFeature struct {
EnvOpts []environment.EnvOpts
setupEnv environment.Environment
setupCtx context.Context
VerifyF *feature.Feature
VerifyF func() *feature.Feature
Global environment.GlobalEnvironment
}

Expand All @@ -83,14 +95,14 @@ func (fe *DurableFeature) Setup(label string) pkgupgrade.Operation {
func (fe *DurableFeature) Verify(label string) pkgupgrade.Operation {
return pkgupgrade.NewOperation(label, func(c pkgupgrade.Context) {
c.T.Parallel()
fe.setupEnv.Test(fe.setupCtx, c.T, fe.VerifyF)
fe.setupEnv.Test(fe.setupCtx, c.T, fe.VerifyF())
})
}

func (fe *DurableFeature) VerifyAndTeardown(label string) pkgupgrade.Operation {
return pkgupgrade.NewOperation(label, func(c pkgupgrade.Context) {
c.T.Parallel()
fe.setupEnv.Test(fe.setupCtx, c.T, fe.VerifyF)
fe.setupEnv.Test(fe.setupCtx, c.T, fe.VerifyF())
// Ensures teardown of resources/namespace.
fe.setupEnv.Finish()
})
Expand All @@ -103,7 +115,7 @@ func (fe *DurableFeature) SetupVerifyAndTeardown(label string) pkgupgrade.Operat
append(fe.EnvOpts, environment.Managed(c.T))...,
)
env.Test(ctx, c.T, fe.SetupF)
env.Test(ctx, c.T, fe.VerifyF)
env.Test(ctx, c.T, fe.VerifyF())
})
}

Expand Down Expand Up @@ -290,14 +302,29 @@ func InMemoryChannelFeature(glob environment.GlobalEnvironment) *DurableFeature
setupF := feature.NewFeature()
sink, ch := channel.ChannelChainSetup(setupF, 1, createSubscriberFn)

verifyF := feature.NewFeature()
channel.ChannelChainAssert(verifyF, sink, ch)
verifyF := func() *feature.Feature {
f := feature.NewFeatureNamed(setupF.Name)
channel.ChannelChainAssert(f, sink, ch)
return f
}

opts := []environment.EnvOpts{
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
return &DurableFeature{SetupF: setupF, VerifyF: verifyF, Global: glob, EnvOpts: opts}
}

func BrokerEventTransformationForTrigger(glob environment.GlobalEnvironment,
) *DurableFeature {
// Prevent race conditions on EnvCfg.BrokerClass when running tests in parallel.
brokerConfigMux.Lock()
defer brokerConfigMux.Unlock()
brokerresources.EnvCfg.BrokerClass = eventing.MTChannelBrokerClassValue

setupF := feature.NewFeature()
cfg := brokerfeatures.BrokerEventTransformationForTriggerSetup(setupF)

verifyF := func() *feature.Feature {
f := feature.NewFeatureNamed(setupF.Name)
brokerfeatures.BrokerEventTransformationForTriggerAssert(f, cfg)
return f
}

return &DurableFeature{SetupF: setupF, VerifyF: verifyF, Global: glob, EnvOpts: opts}
Expand Down
4 changes: 4 additions & 0 deletions test/upgrade/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,16 @@ func TestEventingUpgrades(t *testing.T) {
g := FeatureGroupWithUpgradeTests{
// A feature that will run the same test post-upgrade and post-downgrade.
NewFeatureSmoke(InMemoryChannelFeature(global)),
NewFeatureSmoke(BrokerEventTransformationForTrigger(global)),
// A feature that will be created pre-upgrade and verified/removed post-upgrade.
NewFeatureOnlyUpgrade(InMemoryChannelFeature(global)),
NewFeatureOnlyUpgrade(BrokerEventTransformationForTrigger(global)),
// A feature that will be created pre-upgrade, verified post-upgrade, verified and removed post-downgrade.
NewFeatureUpgradeDowngrade(InMemoryChannelFeature(global)),
NewFeatureUpgradeDowngrade(BrokerEventTransformationForTrigger(global)),
// A feature that will be created post-upgrade, verified and removed post-downgrade.
NewFeatureOnlyDowngrade(InMemoryChannelFeature(global)),
NewFeatureOnlyDowngrade(BrokerEventTransformationForTrigger(global)),
}

suite := pkgupgrade.Suite{
Expand Down

0 comments on commit 608e76c

Please sign in to comment.