Skip to content

Commit

Permalink
[release-v1.9] Send namespace header in MT components (#265) (#278)
Browse files Browse the repository at this point in the history
* Refactor PingSource adapter client creation (knative#6880)

This is just a refactoring to make it easier to implement Eventing TLS

Part of knative#6879

---------



* Send namespace header in MT components (knative#7048)

When running MT components [1] in mesh mode with Istio,
we lose the ability to define fine grained policies since we
don't know the resource namespace that originated such
request, therefore, by having a `Kn-Namespace` header,
in mesh mode, users case define fine-grained policies and
isolate namespaces.

[1] IMC, MTChannelBasedBroker, and PingSource



* Fix compile error



---------

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi authored Jul 20, 2023
1 parent 7d0c427 commit 1eb2f7a
Show file tree
Hide file tree
Showing 16 changed files with 310 additions and 100 deletions.
4 changes: 2 additions & 2 deletions pkg/adapter/mtping/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func NewEnvConfig() adapter.EnvConfigAccessor {
return &adapter.EnvConfig{}
}

func NewAdapter(ctx context.Context, _ adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {
func NewAdapter(ctx context.Context, env adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {
logger := logging.FromContext(ctx)
runner := NewCronJobsRunner(ceClient, kubeclient.Get(ctx), logging.FromContext(ctx))
runner := NewCronJobsRunner(adapter.GetClientConfig(ctx), kubeclient.Get(ctx), logging.FromContext(ctx))

return &mtpingAdapter{
logger: logger,
Expand Down
59 changes: 48 additions & 11 deletions pkg/adapter/mtping/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"

"knative.dev/eventing/pkg/adapter/v2"
kncloudevents "knative.dev/eventing/pkg/adapter/v2"
"knative.dev/eventing/pkg/adapter/v2/util/crstatusevent"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
Expand All @@ -50,26 +51,25 @@ type cronJobsRunner struct {
// The cron job runner
cron cron.Cron

// client sends cloudevents.
Client cloudevents.Client

// Where to send logs
Logger *zap.SugaredLogger

// kubeClient for sending k8s events
kubeClient kubernetes.Interface

clientConfig kncloudevents.ClientConfig
}

const (
resourceGroup = "pingsources.sources.knative.dev"
)

func NewCronJobsRunner(ceClient cloudevents.Client, kubeClient kubernetes.Interface, logger *zap.SugaredLogger, opts ...cron.Option) *cronJobsRunner {
func NewCronJobsRunner(cfg adapter.ClientConfig, kubeClient kubernetes.Interface, logger *zap.SugaredLogger, opts ...cron.Option) *cronJobsRunner {
return &cronJobsRunner{
cron: *cron.New(opts...),
Client: ceClient,
Logger: logger,
kubeClient: kubeClient,
cron: *cron.New(opts...),
Logger: logger,
kubeClient: kubeClient,
clientConfig: cfg,
}
}

Expand Down Expand Up @@ -107,7 +107,18 @@ func (a *cronJobsRunner) AddSchedule(source *sourcesv1.PingSource) cron.EntryID
}

ctx = kncloudevents.ContextWithMetricTag(ctx, metricTag)
id, _ := a.cron.AddFunc(schedule, a.cronTick(ctx, event))

client, err := a.newPingSourceClient(source)
if err != nil {
a.Logger.Desugar().Error("Failed to create client",
zap.String("name", source.GetName()),
zap.String("namespace", source.GetNamespace()),
zap.Error(err),
)
return -1
}

id, _ := a.cron.AddFunc(schedule, a.cronTick(ctx, client, event))
return id
}

Expand All @@ -128,7 +139,7 @@ func (a *cronJobsRunner) Stop() {
}
}

func (a *cronJobsRunner) cronTick(ctx context.Context, event cloudevents.Event) func() {
func (a *cronJobsRunner) cronTick(ctx context.Context, client adapter.Client, event cloudevents.Event) func() {
return func() {
event := event.Clone()
event.SetID(uuid.New().String()) // provide an ID here so we can track it with logging
Expand All @@ -141,11 +152,13 @@ func (a *cronJobsRunner) cronTick(ctx context.Context, event cloudevents.Event)

a.Logger.Debugf("sending cloudevent id: %s, source: %s, target: %s", event.ID(), source, target)

if result := a.Client.Send(ctx, event); !cloudevents.IsACK(result) {
if result := client.Send(ctx, event); !cloudevents.IsACK(result) {
// Exhausted number of retries. Event is lost.
a.Logger.Error("failed to send cloudevent result: ", zap.Any("result", result),
zap.String("source", source), zap.String("target", target), zap.String("id", event.ID()))
}

client.CloseIdleConnections()
}
}

Expand Down Expand Up @@ -174,3 +187,27 @@ func makeEvent(source *sourcesv1.PingSource) (cloudevents.Event, error) {

return event, nil
}

func (a *cronJobsRunner) newPingSourceClient(source *sourcesv1.PingSource) (adapter.Client, error) {
var env adapter.EnvConfig
if a.clientConfig.Env != nil {
env = adapter.EnvConfig{
Namespace: source.GetNamespace(),
Name: a.clientConfig.Env.GetName(),
EnvSinkTimeout: fmt.Sprintf("%d", a.clientConfig.Env.GetSinktimeout()),
}
}

env.Sink = source.Status.SinkURI.String()

cfg := adapter.ClientConfig{
Env: &env,
CeOverrides: source.Spec.CloudEventOverrides,
Reporter: a.clientConfig.Reporter,
CrStatusEventClient: a.clientConfig.CrStatusEventClient,
Options: a.clientConfig.Options,
Client: a.clientConfig.Client,
}

return adapter.NewClient(cfg)
}
7 changes: 4 additions & 3 deletions pkg/adapter/mtping/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"knative.dev/pkg/logging"
rectesting "knative.dev/pkg/reconciler/testing"

"knative.dev/eventing/pkg/adapter/v2"
adaptertesting "knative.dev/eventing/pkg/adapter/v2/test"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
)
Expand Down Expand Up @@ -201,7 +202,7 @@ func TestAddRunRemoveSchedules(t *testing.T) {
logger := logging.FromContext(ctx)
ce := adaptertesting.NewTestClient()

runner := NewCronJobsRunner(ce, kubeclient.Get(ctx), logger)
runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger)
entryId := runner.AddSchedule(tc.src)

entry := runner.cron.Entry(entryId)
Expand All @@ -228,7 +229,7 @@ func TestStartStopCron(t *testing.T) {
logger := logging.FromContext(ctx)
ce := adaptertesting.NewTestClient()

runner := NewCronJobsRunner(ce, kubeclient.Get(ctx), logger)
runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger)

ctx, cancel := context.WithCancel(context.Background())
wctx, wcancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -257,7 +258,7 @@ func TestStartStopCronDelayWait(t *testing.T) {
logger := logging.FromContext(ctx)
ce := adaptertesting.NewTestClientWithDelay(time.Second * 5)

runner := NewCronJobsRunner(ce, kubeclient.Get(ctx), logger)
runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
129 changes: 89 additions & 40 deletions pkg/adapter/v2/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,23 @@ import (
"knative.dev/pkg/tracing/propagation/tracecontextb3"

"knative.dev/eventing/pkg/adapter/v2/util/crstatusevent"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/eventing/pkg/apis"
"knative.dev/eventing/pkg/metrics/source"
obsclient "knative.dev/eventing/pkg/observability/client"
)

type closeIdler interface {
CloseIdleConnections()
}

type Client interface {
cloudevents.Client
closeIdler
}

var newClientHTTPObserved = NewClientHTTPObserved

func NewClientHTTPObserved(topt []http.Option, copt []ceclient.Option) (ceclient.Client, error) {
func NewClientHTTPObserved(topt []http.Option, copt []ceclient.Option) (Client, error) {
t, err := obshttp.NewObservedHTTP(topt...)
if err != nil {
return nil, err
Expand All @@ -55,85 +64,117 @@ func NewClientHTTPObserved(topt []http.Option, copt []ceclient.Option) (ceclient
return nil, err
}

return c, nil
return &client{
ceClient: c,
}, nil
}

// NewCloudEventsClient returns a client that will apply the ceOverrides to
// outbound events and report outbound event counts.
func NewCloudEventsClient(target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter) (cloudevents.Client, error) {
func NewCloudEventsClient(target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter) (Client, error) {
opts := make([]http.Option, 0)
if len(target) > 0 {
opts = append(opts, cloudevents.WithTarget(target))
}
return newCloudEventsClientCRStatus(nil, ceOverrides, reporter, nil, opts...)
return NewClient(ClientConfig{
CeOverrides: ceOverrides,
Reporter: reporter,
Options: opts,
})
}

// NewCloudEventsClientWithOptions returns a client created with provided options
func NewCloudEventsClientWithOptions(ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter, opts ...http.Option) (cloudevents.Client, error) {
return newCloudEventsClientCRStatus(nil, ceOverrides, reporter, nil, opts...)
func NewCloudEventsClientWithOptions(ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter, opts ...http.Option) (Client, error) {
return NewClient(ClientConfig{
CeOverrides: ceOverrides,
Reporter: reporter,
Options: opts,
})
}

// NewCloudEventsClientCRStatus returns a client CR status
func NewCloudEventsClientCRStatus(env EnvConfigAccessor, reporter source.StatsReporter, crStatusEventClient *crstatusevent.CRStatusEventClient) (cloudevents.Client, error) {
return newCloudEventsClientCRStatus(env, nil, reporter, crStatusEventClient)
func NewCloudEventsClientCRStatus(env EnvConfigAccessor, reporter source.StatsReporter, crStatusEventClient *crstatusevent.CRStatusEventClient) (Client, error) {
return NewClient(ClientConfig{
Env: env,
Reporter: reporter,
CrStatusEventClient: crStatusEventClient,
})
}
func newCloudEventsClientCRStatus(env EnvConfigAccessor, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter,
crStatusEventClient *crstatusevent.CRStatusEventClient, opts ...http.Option) (cloudevents.Client, error) {

pOpts := make([]http.Option, 0)
pOpts = append(pOpts, cloudevents.WithRoundTripper(&ochttp.Transport{
type ClientConfig struct {
Env EnvConfigAccessor
CeOverrides *duckv1.CloudEventOverrides
Reporter source.StatsReporter
CrStatusEventClient *crstatusevent.CRStatusEventClient
Options []http.Option

Client Client
}

type clientConfigKey struct{}

func withClientConfig(ctx context.Context, r ClientConfig) context.Context {
return context.WithValue(ctx, clientConfigKey{}, r)
}

func GetClientConfig(ctx context.Context) ClientConfig {
val := ctx.Value(clientConfigKey{})
if val == nil {
return ClientConfig{}
}
return val.(ClientConfig)
}

func NewClient(cfg ClientConfig) (Client, error) {
if cfg.Client != nil {
return cfg.Client, nil
}

transport := &ochttp.Transport{
Propagation: tracecontextb3.TraceContextEgress,
}))
}

if env != nil {
if target := env.GetSink(); len(target) > 0 {
pOpts := make([]http.Option, 0)
var closeIdler closeIdler = nethttp.DefaultTransport.(*nethttp.Transport)

ceOverrides := cfg.CeOverrides
if cfg.Env != nil {
if target := cfg.Env.GetSink(); len(target) > 0 {
pOpts = append(pOpts, cloudevents.WithTarget(target))
}
if sinkWait := env.GetSinktimeout(); sinkWait > 0 {
if sinkWait := cfg.Env.GetSinktimeout(); sinkWait > 0 {
pOpts = append(pOpts, setTimeOut(time.Duration(sinkWait)*time.Second))
}
if caCerts := env.GetCACerts(); (caCerts != nil && *caCerts != "") && eventingtls.IsHttpsSink(env.GetSink()) {
var err error

clientConfig := eventingtls.NewDefaultClientConfig()
clientConfig.CACerts = caCerts

transport := nethttp.DefaultTransport.(*nethttp.Transport).Clone()
transport.TLSClientConfig, err = eventingtls.GetTLSClientConfig(clientConfig)
if err != nil {
return nil, err
}

pOpts = append(pOpts, http.WithRoundTripper(&ochttp.Transport{
Base: transport,
Propagation: tracecontextb3.TraceContextEgress,
}))
}
if ceOverrides == nil {
var err error
ceOverrides, err = env.GetCloudEventOverrides()
ceOverrides, err = cfg.Env.GetCloudEventOverrides()
if err != nil {
return nil, err
}
}

pOpts = append(pOpts, http.WithHeader(apis.KnNamespaceHeader, cfg.Env.GetNamespace()))
}

pOpts = append(pOpts, http.WithRoundTripper(transport))

// Make sure that explicitly set options have priority
opts = append(pOpts, opts...)
opts := append(pOpts, cfg.Options...)

ceClient, err := newClientHTTPObserved(opts, nil)

if crStatusEventClient == nil {
crStatusEventClient = crstatusevent.GetDefaultClient()
if cfg.CrStatusEventClient == nil {
cfg.CrStatusEventClient = crstatusevent.GetDefaultClient()
}
if err != nil {
return nil, err
}
return &client{
ceClient: ceClient,
closeIdler: closeIdler,
ceOverrides: ceOverrides,
reporter: reporter,
crStatusEventClient: crStatusEventClient,
reporter: cfg.Reporter,
crStatusEventClient: cfg.CrStatusEventClient,
}, nil
}

Expand All @@ -155,6 +196,11 @@ type client struct {
ceOverrides *duckv1.CloudEventOverrides
reporter source.StatsReporter
crStatusEventClient *crstatusevent.CRStatusEventClient
closeIdler closeIdler
}

func (c *client) CloseIdleConnections() {
c.closeIdler.CloseIdleConnections()
}

var _ cloudevents.Client = (*client)(nil)
Expand Down Expand Up @@ -189,6 +235,9 @@ func (c *client) applyOverrides(event *cloudevents.Event) {
}

func (c *client) reportMetrics(ctx context.Context, event cloudevents.Event, result protocol.Result) {
if c.reporter == nil {
return
}
tags := MetricTagFromContext(ctx)
reportArgs := &source.ReportArgs{
Namespace: tags.Namespace,
Expand Down
4 changes: 2 additions & 2 deletions pkg/adapter/v2/cloudevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestNewCloudEventsClient_send(t *testing.T) {
}
}

defer func(restoreHTTP func(topt []http.Option, copt []v2client.Option) (v2client.Client, error), restoreEnv string, setEnv bool) {
defer func(restoreHTTP func(topt []http.Option, copt []v2client.Option) (Client, error), restoreEnv string, setEnv bool) {
newClientHTTPObserved = restoreHTTP
if setEnv {
if err := os.Setenv("K_SINK_TIMEOUT", restoreEnv); err != nil {
Expand All @@ -157,7 +157,7 @@ func TestNewCloudEventsClient_send(t *testing.T) {
}(restoreHTTP, restoreEnv, setEnv)

sendOptions := []http.Option{}
newClientHTTPObserved = func(topt []http.Option, copt []v2client.Option) (v2client.Client, error) {
newClientHTTPObserved = func(topt []http.Option, copt []v2client.Option) (Client, error) {
sendOptions = append(sendOptions, topt...)
return nil, nil
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/adapter/v2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,14 @@ func MainWithInformers(ctx context.Context, component string, env EnvConfigAcces
logger.Errorw("Error building statsreporter", zap.Error(err))
}

eventsClient, err := NewCloudEventsClientCRStatus(env, reporter, crStatusEventClient)
clientConfig := ClientConfig{
Env: env,
Reporter: reporter,
CrStatusEventClient: crStatusEventClient,
}
ctx = withClientConfig(ctx, clientConfig)

eventsClient, err := NewClient(clientConfig)
if err != nil {
logger.Fatalw("Error building cloud event client", zap.Error(err))
}
Expand Down
Loading

0 comments on commit 1eb2f7a

Please sign in to comment.