Skip to content

Commit

Permalink
Refactor PingSource adapter client creation (#6880)
Browse files Browse the repository at this point in the history
This is just a refactoring to make it easier to implement Eventing TLS

Part of #6879

---------

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi authored Apr 28, 2023
1 parent e2f1c77 commit 1f917d0
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 50 deletions.
4 changes: 2 additions & 2 deletions pkg/adapter/mtping/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func NewEnvConfig() adapter.EnvConfigAccessor {
return &adapter.EnvConfig{}
}

func NewAdapter(ctx context.Context, _ adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {
func NewAdapter(ctx context.Context, env adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {
logger := logging.FromContext(ctx)
runner := NewCronJobsRunner(ceClient, kubeclient.Get(ctx), logging.FromContext(ctx))
runner := NewCronJobsRunner(adapter.GetClientConfig(ctx), kubeclient.Get(ctx), logging.FromContext(ctx))

return &mtpingAdapter{
logger: logger,
Expand Down
60 changes: 49 additions & 11 deletions pkg/adapter/mtping/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"

"knative.dev/eventing/pkg/adapter/v2"
kncloudevents "knative.dev/eventing/pkg/adapter/v2"
"knative.dev/eventing/pkg/adapter/v2/util/crstatusevent"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
Expand All @@ -50,26 +51,25 @@ type cronJobsRunner struct {
// The cron job runner
cron cron.Cron

// client sends cloudevents.
Client cloudevents.Client

// Where to send logs
Logger *zap.SugaredLogger

// kubeClient for sending k8s events
kubeClient kubernetes.Interface

clientConfig kncloudevents.ClientConfig
}

const (
resourceGroup = "pingsources.sources.knative.dev"
)

func NewCronJobsRunner(ceClient cloudevents.Client, kubeClient kubernetes.Interface, logger *zap.SugaredLogger, opts ...cron.Option) *cronJobsRunner {
func NewCronJobsRunner(cfg adapter.ClientConfig, kubeClient kubernetes.Interface, logger *zap.SugaredLogger, opts ...cron.Option) *cronJobsRunner {
return &cronJobsRunner{
cron: *cron.New(opts...),
Client: ceClient,
Logger: logger,
kubeClient: kubeClient,
cron: *cron.New(opts...),
Logger: logger,
kubeClient: kubeClient,
clientConfig: cfg,
}
}

Expand Down Expand Up @@ -107,7 +107,18 @@ func (a *cronJobsRunner) AddSchedule(source *sourcesv1.PingSource) cron.EntryID
}

ctx = kncloudevents.ContextWithMetricTag(ctx, metricTag)
id, _ := a.cron.AddFunc(schedule, a.cronTick(ctx, event))

client, err := a.newPingSourceClient(source)
if err != nil {
a.Logger.Desugar().Error("Failed to create client",
zap.String("name", source.GetName()),
zap.String("namespace", source.GetNamespace()),
zap.Error(err),
)
return -1
}

id, _ := a.cron.AddFunc(schedule, a.cronTick(ctx, client, event))
return id
}

Expand All @@ -128,7 +139,7 @@ func (a *cronJobsRunner) Stop() {
}
}

func (a *cronJobsRunner) cronTick(ctx context.Context, event cloudevents.Event) func() {
func (a *cronJobsRunner) cronTick(ctx context.Context, client adapter.Client, event cloudevents.Event) func() {
return func() {
event := event.Clone()
event.SetID(uuid.New().String()) // provide an ID here so we can track it with logging
Expand All @@ -141,11 +152,13 @@ func (a *cronJobsRunner) cronTick(ctx context.Context, event cloudevents.Event)

a.Logger.Debugf("sending cloudevent id: %s, source: %s, target: %s", event.ID(), source, target)

if result := a.Client.Send(ctx, event); !cloudevents.IsACK(result) {
if result := client.Send(ctx, event); !cloudevents.IsACK(result) {
// Exhausted number of retries. Event is lost.
a.Logger.Error("failed to send cloudevent result: ", zap.Any("result", result),
zap.String("source", source), zap.String("target", target), zap.String("id", event.ID()))
}

client.CloseIdleConnections()
}
}

Expand Down Expand Up @@ -174,3 +187,28 @@ func makeEvent(source *sourcesv1.PingSource) (cloudevents.Event, error) {

return event, nil
}

func (a *cronJobsRunner) newPingSourceClient(source *sourcesv1.PingSource) (adapter.Client, error) {
var env adapter.EnvConfig
if a.clientConfig.Env != nil {
env = adapter.EnvConfig{
Namespace: a.clientConfig.Env.GetNamespace(),
Name: a.clientConfig.Env.GetName(),
EnvSinkTimeout: fmt.Sprintf("%d", a.clientConfig.Env.GetSinktimeout()),
}
}

env.Sink = source.Status.SinkURI.String()
env.CACerts = nil // TODO CA Certs from source status

cfg := adapter.ClientConfig{
Env: &env,
CeOverrides: source.Spec.CloudEventOverrides,
Reporter: a.clientConfig.Reporter,
CrStatusEventClient: a.clientConfig.CrStatusEventClient,
Options: a.clientConfig.Options,
Client: a.clientConfig.Client,
}

return adapter.NewClient(cfg)
}
7 changes: 4 additions & 3 deletions pkg/adapter/mtping/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"knative.dev/pkg/logging"
rectesting "knative.dev/pkg/reconciler/testing"

"knative.dev/eventing/pkg/adapter/v2"
adaptertesting "knative.dev/eventing/pkg/adapter/v2/test"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
)
Expand Down Expand Up @@ -201,7 +202,7 @@ func TestAddRunRemoveSchedules(t *testing.T) {
logger := logging.FromContext(ctx)
ce := adaptertesting.NewTestClient()

runner := NewCronJobsRunner(ce, kubeclient.Get(ctx), logger)
runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger)
entryId := runner.AddSchedule(tc.src)

entry := runner.cron.Entry(entryId)
Expand All @@ -228,7 +229,7 @@ func TestStartStopCron(t *testing.T) {
logger := logging.FromContext(ctx)
ce := adaptertesting.NewTestClient()

runner := NewCronJobsRunner(ce, kubeclient.Get(ctx), logger)
runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger)

ctx, cancel := context.WithCancel(context.Background())
wctx, wcancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -257,7 +258,7 @@ func TestStartStopCronDelayWait(t *testing.T) {
logger := logging.FromContext(ctx)
ce := adaptertesting.NewTestClientWithDelay(time.Second * 5)

runner := NewCronJobsRunner(ce, kubeclient.Get(ctx), logger)
runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
122 changes: 94 additions & 28 deletions pkg/adapter/v2/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,18 @@ import (
obsclient "knative.dev/eventing/pkg/observability/client"
)

type closeIdler interface {
CloseIdleConnections()
}

type Client interface {
cloudevents.Client
closeIdler
}

var newClientHTTPObserved = NewClientHTTPObserved

func NewClientHTTPObserved(topt []http.Option, copt []ceclient.Option) (ceclient.Client, error) {
func NewClientHTTPObserved(topt []http.Option, copt []ceclient.Option) (Client, error) {
t, err := obshttp.NewObservedHTTP(topt...)
if err != nil {
return nil, err
Expand All @@ -55,85 +64,134 @@ func NewClientHTTPObserved(topt []http.Option, copt []ceclient.Option) (ceclient
return nil, err
}

return c, nil
return &client{
ceClient: c,
}, nil
}

// NewCloudEventsClient returns a client that will apply the ceOverrides to
// outbound events and report outbound event counts.
func NewCloudEventsClient(target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter) (cloudevents.Client, error) {
func NewCloudEventsClient(target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter) (Client, error) {
opts := make([]http.Option, 0)
if len(target) > 0 {
opts = append(opts, cloudevents.WithTarget(target))
}
return newCloudEventsClientCRStatus(nil, ceOverrides, reporter, nil, opts...)
return NewClient(ClientConfig{
CeOverrides: ceOverrides,
Reporter: reporter,
Options: opts,
})
}

// NewCloudEventsClientWithOptions returns a client created with provided options
func NewCloudEventsClientWithOptions(ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter, opts ...http.Option) (cloudevents.Client, error) {
return newCloudEventsClientCRStatus(nil, ceOverrides, reporter, nil, opts...)
func NewCloudEventsClientWithOptions(ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter, opts ...http.Option) (Client, error) {
return NewClient(ClientConfig{
CeOverrides: ceOverrides,
Reporter: reporter,
Options: opts,
})
}

// NewCloudEventsClientCRStatus returns a client CR status
func NewCloudEventsClientCRStatus(env EnvConfigAccessor, reporter source.StatsReporter, crStatusEventClient *crstatusevent.CRStatusEventClient) (cloudevents.Client, error) {
return newCloudEventsClientCRStatus(env, nil, reporter, crStatusEventClient)
func NewCloudEventsClientCRStatus(env EnvConfigAccessor, reporter source.StatsReporter, crStatusEventClient *crstatusevent.CRStatusEventClient) (Client, error) {
return NewClient(ClientConfig{
Env: env,
Reporter: reporter,
CrStatusEventClient: crStatusEventClient,
})
}
func newCloudEventsClientCRStatus(env EnvConfigAccessor, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter,
crStatusEventClient *crstatusevent.CRStatusEventClient, opts ...http.Option) (cloudevents.Client, error) {

pOpts := make([]http.Option, 0)
pOpts = append(pOpts, cloudevents.WithRoundTripper(&ochttp.Transport{
type ClientConfig struct {
Env EnvConfigAccessor
CeOverrides *duckv1.CloudEventOverrides
Reporter source.StatsReporter
CrStatusEventClient *crstatusevent.CRStatusEventClient
Options []http.Option

Client Client
}

type clientConfigKey struct{}

func withClientConfig(ctx context.Context, r ClientConfig) context.Context {
return context.WithValue(ctx, clientConfigKey{}, r)
}

func GetClientConfig(ctx context.Context) ClientConfig {
val := ctx.Value(clientConfigKey{})
if val == nil {
return ClientConfig{}
}
return val.(ClientConfig)
}

func NewClient(cfg ClientConfig) (Client, error) {
if cfg.Client != nil {
return cfg.Client, nil
}

transport := &ochttp.Transport{
Propagation: tracecontextb3.TraceContextEgress,
}))
}

pOpts := make([]http.Option, 0)
var closeIdler closeIdler = nethttp.DefaultTransport.(*nethttp.Transport)

if env != nil {
if target := env.GetSink(); len(target) > 0 {
ceOverrides := cfg.CeOverrides
if cfg.Env != nil {
if target := cfg.Env.GetSink(); len(target) > 0 {
pOpts = append(pOpts, cloudevents.WithTarget(target))
}
if sinkWait := env.GetSinktimeout(); sinkWait > 0 {
if sinkWait := cfg.Env.GetSinktimeout(); sinkWait > 0 {
pOpts = append(pOpts, setTimeOut(time.Duration(sinkWait)*time.Second))
}
if caCerts := env.GetCACerts(); (caCerts != nil && *caCerts != "") && eventingtls.IsHttpsSink(env.GetSink()) {
if caCerts := cfg.Env.GetCACerts(); (caCerts != nil && *caCerts != "") && eventingtls.IsHttpsSink(cfg.Env.GetSink()) {
var err error

clientConfig := eventingtls.NewDefaultClientConfig()
clientConfig.CACerts = caCerts

transport := nethttp.DefaultTransport.(*nethttp.Transport).Clone()
transport.TLSClientConfig, err = eventingtls.GetTLSClientConfig(clientConfig)
httpTransport := nethttp.DefaultTransport.(*nethttp.Transport).Clone()
httpTransport.TLSClientConfig, err = eventingtls.GetTLSClientConfig(clientConfig)
if err != nil {
return nil, err
}

pOpts = append(pOpts, http.WithRoundTripper(&ochttp.Transport{
Base: transport,
closeIdler = httpTransport

transport = &ochttp.Transport{
Base: httpTransport,
Propagation: tracecontextb3.TraceContextEgress,
}))
}
}
if ceOverrides == nil {
var err error
ceOverrides, err = env.GetCloudEventOverrides()
ceOverrides, err = cfg.Env.GetCloudEventOverrides()
if err != nil {
return nil, err
}
}
}

pOpts = append(pOpts, http.WithRoundTripper(transport))

// Make sure that explicitly set options have priority
opts = append(pOpts, opts...)
opts := append(pOpts, cfg.Options...)

ceClient, err := newClientHTTPObserved(opts, nil)

if crStatusEventClient == nil {
crStatusEventClient = crstatusevent.GetDefaultClient()
if cfg.CrStatusEventClient == nil {
cfg.CrStatusEventClient = crstatusevent.GetDefaultClient()
}
if err != nil {
return nil, err
}
return &client{
ceClient: ceClient,
closeIdler: closeIdler,
ceOverrides: ceOverrides,
reporter: reporter,
crStatusEventClient: crStatusEventClient,
reporter: cfg.Reporter,
crStatusEventClient: cfg.CrStatusEventClient,
}, nil
}

Expand All @@ -155,6 +213,11 @@ type client struct {
ceOverrides *duckv1.CloudEventOverrides
reporter source.StatsReporter
crStatusEventClient *crstatusevent.CRStatusEventClient
closeIdler closeIdler
}

func (c *client) CloseIdleConnections() {
c.closeIdler.CloseIdleConnections()
}

var _ cloudevents.Client = (*client)(nil)
Expand Down Expand Up @@ -189,6 +252,9 @@ func (c *client) applyOverrides(event *cloudevents.Event) {
}

func (c *client) reportMetrics(ctx context.Context, event cloudevents.Event, result protocol.Result) {
if c.reporter == nil {
return
}
tags := MetricTagFromContext(ctx)
reportArgs := &source.ReportArgs{
Namespace: tags.Namespace,
Expand Down
6 changes: 4 additions & 2 deletions pkg/adapter/v2/cloudevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestNewCloudEventsClient_send(t *testing.T) {
}
}

defer func(restoreHTTP func(topt []http.Option, copt []v2client.Option) (v2client.Client, error), restoreEnv string, setEnv bool) {
defer func(restoreHTTP func(topt []http.Option, copt []v2client.Option) (Client, error), restoreEnv string, setEnv bool) {
newClientHTTPObserved = restoreHTTP
if setEnv {
if err := os.Setenv("K_SINK_TIMEOUT", restoreEnv); err != nil {
Expand All @@ -157,7 +157,7 @@ func TestNewCloudEventsClient_send(t *testing.T) {
}(restoreHTTP, restoreEnv, setEnv)

sendOptions := []http.Option{}
newClientHTTPObserved = func(topt []http.Option, copt []v2client.Option) (v2client.Client, error) {
newClientHTTPObserved = func(topt []http.Option, copt []v2client.Option) (Client, error) {
sendOptions = append(sendOptions, topt...)
return nil, nil
}
Expand Down Expand Up @@ -358,6 +358,8 @@ func TestTLS(t *testing.T) {
} else if cloudevents.IsNACK(result) || cloudevents.IsUndelivered(result) {
t.Fatalf("wantErr %v, got %v IsACK %v", tc.wantErr, result, cloudevents.IsACK(result))
}

c.CloseIdleConnections()
})
}
}
Expand Down
Loading

0 comments on commit 1f917d0

Please sign in to comment.