Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eventing TLS: Test ContainerSource with eventshub TLS receiver as sink #6957

Merged
merged 23 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/core/resources/containersource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ spec:
uri:
description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref.
type: string
CACerts:
description: CACerts is the Certification Authority (CA) certificates in PEM format that the source trusts when sending events to the sink.
type: string
# WARNING: the schema tool can not parse PodTemplateSpec, stub here and redirect to Deployment documentation.
template:
type: object
Expand Down
16 changes: 16 additions & 0 deletions test/rekt/container_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"knative.dev/eventing/test/rekt/features/containersource"
"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/pkg/knative"
)
Expand Down Expand Up @@ -85,3 +86,18 @@ func TestContainerSourceWithArgs(t *testing.T) {

env.Test(ctx, t, containersource.SendsEventsWithArgs())
}

func TestContainerSourceWithTLS(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
// environment.Managed(t),
vishal-chdhry marked this conversation as resolved.
Show resolved Hide resolved
eventshub.WithTLS(t),
)

env.Test(ctx, t, containersource.SendEventsWithTLSRecieverAsSink())
}
2 changes: 1 addition & 1 deletion test/rekt/features/channel/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func ChannelChain(length int, createSubscriberFn func(ref *duckv1.KReference, ur
}

// attach the first channel to the source
f.Requirement("install containersource", containersource.Install(cs, containersource.WithSink(channel_impl.AsRef(channels[0]), "")))
f.Requirement("install containersource", containersource.Install(cs, containersource.WithSink(channel_impl.AsDest(channels[0]))))
f.Requirement("containersource goes ready", containersource.IsReady(cs))

f.Assert("chained channels relay events", assert.OnStore(sink).MatchEvent(test.HasType("dev.knative.eventing.samples.heartbeat")).AtLeast(1))
Expand Down
29 changes: 27 additions & 2 deletions test/rekt/features/containersource/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package containersource

import (
"context"
"fmt"

"github.com/cloudevents/sdk-go/v2/test"
Expand Down Expand Up @@ -56,7 +57,7 @@ func SendsEventsWithSinkURI() *feature.Feature {

f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver))

f.Requirement("install containersource", containersource.Install(source, containersource.WithSink(service.AsKReference(sink), "")))
f.Requirement("install containersource", containersource.Install(source, containersource.WithSink(service.AsDestinationRef(sink))))
f.Requirement("containersource goes ready", containersource.IsReady(source))

f.Stable("containersource as event source").
Expand Down Expand Up @@ -105,7 +106,7 @@ func SendsEventsWithArgs() *feature.Feature {
f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver))

f.Requirement("install containersource", containersource.Install(source,
containersource.WithSink(service.AsKReference(sink), ""),
containersource.WithSink(service.AsDestinationRef(sink)),
containersource.WithArgs(args),
))
f.Requirement("containersource goes ready", containersource.IsReady(source))
Expand All @@ -118,3 +119,27 @@ func SendsEventsWithArgs() *feature.Feature {

return f
}

func SendEventsWithTLSRecieverAsSink() *feature.Feature {
source := feature.MakeRandomK8sName("containersource")
sink := feature.MakeRandomK8sName("sink")
f := feature.NewFeature()

f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiverTLS))

f.Requirement("install ContainerSource", func(ctx context.Context, t feature.T) {
d := service.AsDestinationRef(sink)
d.CACerts = eventshub.GetCaCerts(ctx)

containersource.Install(source, containersource.WithSink(d))(ctx, t)
})
f.Requirement("containersource goes ready", containersource.IsReady(source))

f.Stable("containersource as event source").
Must("delivers events",
assert.OnStore(sink).MatchEvent(
test.HasType("dev.knative.eventing.samples.heartbeat"),
Copy link
Member

@pierDipi pierDipi May 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLS tests and your PR made me realize this: knative-extensions/reconciler-test#535, so technically, this match might match events that are "rejected" so travelling in plaintext on non TLS connection, therefore, for the time being, we need the MatchKind call, similar to APIServerSource TLS test:

eventasssert.OnStore(sink).
Match(eventasssert.MatchKind(eventshub.EventReceived)).
MatchEvent(test.HasType("dev.knative.apiserver.resource.update")).
AtLeast(1),

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the matching, all the events were of type "rejected"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that's a nice finding, I believe we need to add CACerts to the SinkBinding resources (similar to changes in config/core/resources/containersource.yaml) since ContainerSource uses SinkBinding under the hood, so if CACerts is not propagated to it, it won't work

).AtLeast(1))

return f
}
8 changes: 1 addition & 7 deletions test/rekt/features/containersource/readyness.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package containersource

import (
"knative.dev/eventing/test/rekt/resources/containersource"
"knative.dev/eventing/test/rekt/resources/pingsource"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/manifest"
"knative.dev/reconciler-test/pkg/resources/service"
Expand All @@ -33,11 +31,7 @@ func GoesReady(name string, cfg ...manifest.CfgFn) *feature.Feature {
f.Setup("install a service", service.Install(sink,
service.WithSelectors(map[string]string{"app": "rekt"})))

cfg = append(cfg, pingsource.WithSink(&duckv1.KReference{
Kind: "Service",
Name: sink,
APIVersion: "v1",
}, ""))
cfg = append(cfg, containersource.WithSink(service.AsDestinationRef(sink)))

f.Setup("install a ContainerSource", containersource.Install(name, cfg...))

Expand Down
12 changes: 12 additions & 0 deletions test/rekt/resources/channel_impl/channel_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,17 @@ func AsRef(name string) *duckv1.KReference {
}
}

// AsRef returns a KRef for a Channel without namespace.
func AsDest(name string) *duckv1.Destination {
apiVersion, kind := GVK().ToAPIVersionAndKind()
return &duckv1.Destination{
Ref: &duckv1.KReference{
Kind: kind,
APIVersion: apiVersion,
Name: name,
},
}
}

// WithDeadLetterSink adds the dead letter sink related config to a Subscription spec.
var WithDeadLetterSink = delivery.WithDeadLetterSink
36 changes: 33 additions & 3 deletions test/rekt/resources/containersource/containersource.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package containersource
import (
"context"
"embed"
"strings"
"time"

"k8s.io/apimachinery/pkg/runtime/schema"

"knative.dev/eventing/test/rekt/resources/source"

duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/k8s"
Expand Down Expand Up @@ -67,7 +67,37 @@ func Install(name string, opts ...manifest.CfgFn) feature.StepFn {
}

// WithSink adds the sink related config to a ContainerSource spec.
var WithSink = source.WithSink
func WithSink(d *duckv1.Destination) manifest.CfgFn {
return func(cfg map[string]interface{}) {
if _, set := cfg["sink"]; !set {
cfg["sink"] = map[string]interface{}{}
}
sink := cfg["sink"].(map[string]interface{})

ref := d.Ref
uri := d.URI

if d.CACerts != nil {
// This is a multi-line string and should be indented accordingly.
// Replace "new line" with "new line + spaces".
sink["CACerts"] = strings.ReplaceAll(*d.CACerts, "\n", "\n ")
}

if uri != nil {
sink["uri"] = uri.String()
}
if ref != nil {
if _, set := sink["ref"]; !set {
sink["ref"] = map[string]interface{}{}
}
sref := sink["ref"].(map[string]interface{})
sref["apiVersion"] = ref.APIVersion
sref["kind"] = ref.Kind
// skip namespace
sref["name"] = ref.Name
}
}
}

// WithExtensions adds the ceOverrides related config to a ContainerSource spec.
func WithExtensions(extensions map[string]interface{}) manifest.CfgFn {
Expand Down
4 changes: 4 additions & 0 deletions test/rekt/resources/containersource/containersource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ spec:
name: {{ .sink.ref.name }}
apiVersion: {{ .sink.ref.apiVersion }}
{{ end }}
{{ if .sink.CACerts }}
CACerts: |-
{{ .sink.CACerts }}
{{ end }}
{{ if .sink.uri }}
uri: {{ .sink.uri }}
{{ end }}
Expand Down
68 changes: 68 additions & 0 deletions test/rekt/resources/containersource/containersource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"embed"
"os"

"knative.dev/eventing/test/rekt/resources/containersource"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
testlog "knative.dev/reconciler-test/pkg/logging"
"knative.dev/reconciler-test/pkg/manifest"
)
Expand Down Expand Up @@ -126,3 +129,68 @@ func Example_full() {
// - name: POD_NAMESPACE
// value: bar
}

func Example_sink() {
ctx := testlog.NewContext()
images := map[string]string{}
cfg := map[string]interface{}{
"name": "foo",
"namespace": "bar",
"args": "--period=1",
"ceOverrides": map[string]interface{}{
"extensions": map[string]string{
"ext1": "val1",
"ext2": "val2",
},
},
}

sinkRef := &duckv1.Destination{
Ref: &duckv1.KReference{
Kind: "AKind",
// Namespace: "sinknamespace",
vishal-chdhry marked this conversation as resolved.
Show resolved Hide resolved
Name: "thesink",
APIVersion: "something.valid/v1",
},
URI: &apis.URL{Path: "uri/parts"},
}
containersource.WithSink(sinkRef)(cfg)

files, err := manifest.ExecuteYAML(ctx, yaml, images, cfg)
if err != nil {
panic(err)
}

manifest.OutputYAML(os.Stdout, files)
// Output:
// apiVersion: sources.knative.dev/v1
// kind: ContainerSource
// metadata:
// name: foo
// namespace: bar
// spec:
// ceOverrides:
// extensions:
// ext1: val1
// ext2: val2
// sink:
// ref:
// kind: AKind
// namespace: bar
// name: thesink
// apiVersion: something.valid/v1
// uri: uri/parts
// template:
// spec:
// containers:
// - name: heartbeats
// image: ko://knative.dev/eventing/cmd/heartbeats
// imagePullPolicy: IfNotPresent
// args:
// - --period=1
// env:
// - name: POD_NAME
// value: heartbeats
// - name: POD_NAMESPACE
// value: bar
}