Skip to content

Commit

Permalink
SinkTimeout#4056 (#4227)
Browse files Browse the repository at this point in the history
Signed-off-by: rickr <cr22rc@users.noreply.github.com>
  • Loading branch information
cr22rc authored Oct 6, 2020
1 parent 9a6952d commit 5b33bb9
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 15 deletions.
12 changes: 12 additions & 0 deletions config/core/deployments/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ spec:
fieldRef:
fieldPath: metadata.name


## Adapter settings
# - name: K_LOGGING_CONFIG
# value: ''
# - name: K_LEADER_ELECTION_CONFIG
# value: ''
# - name: K_NO_SHUTDOWN_AFTER
# value: ''
## Time in seconds the adapter will wait for the sink to respond. Default is no timeout
# - name: K_SINK_TIMEOUT
# value: ''

securityContext:
allowPrivilegeEscalation: false

Expand Down
7 changes: 4 additions & 3 deletions config/core/deployments/pingsource-mt-adapter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,18 @@ spec:
apiVersion: v1
fieldPath: metadata.namespace

# The values below are being filled by the ping source controller
# DO NOT MODIFY: The values below are being filled by the ping source controller
# See 500-controller.yaml
- name: K_METRICS_CONFIG
value: ''
- name: K_LOGGING_CONFIG
value: ''
- name: K_LOGGING_CONFIG
value: ''
- name: K_LEADER_ELECTION_CONFIG
value: ''
- name: K_NO_SHUTDOWN_AFTER
value: ''
- name: K_SINK_TIMEOUT
value: ''
- name: POD_NAME
valueFrom:
fieldRef:
Expand Down
2 changes: 2 additions & 0 deletions pkg/adapter/mtping/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (a *cronJobsRunner) cronTick(ctx context.Context, event cloudevents.Event)
return func() {
event := event.Clone()
event.SetID(uuid.New().String()) // provide an ID here so we can track it with logging
defer a.Logger.Debug("finished sending cloudevent id: ", event.ID())
target := cecontext.TargetFrom(ctx).String()
source := event.Context.GetSource()
// nolint:gosec // Cryptographic randomness not necessary here.
Expand All @@ -135,6 +136,7 @@ func (a *cronJobsRunner) cronTick(ctx context.Context, event cloudevents.Event)
a.Logger.Error("failed to send cloudevent result: ", zap.Any("result", result),
zap.String("source", source), zap.String("target", target), zap.String("id", event.ID()))
}

}
}

Expand Down
41 changes: 39 additions & 2 deletions pkg/adapter/v2/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"context"
"errors"
"fmt"
nethttp "net/http"
"net/url"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
Expand All @@ -35,9 +37,18 @@ import (
// NewCloudEventsClient returns a client that will apply the ceOverrides to
// outbound events and report outbound event counts.
func NewCloudEventsClient(target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter) (cloudevents.Client, error) {
return NewCloudEventsClientCRStatus(target, ceOverrides, reporter, nil)
return newCloudEventsClientCRStatus(nil, target, ceOverrides, reporter, nil)
}
func NewCloudEventsClientCRStatus(target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter, crStatusEventClient *crstatusevent.CRStatusEventClient) (cloudevents.Client, error) {
func NewCloudEventsClientCRStatus(env EnvConfigAccessor, reporter source.StatsReporter, crStatusEventClient *crstatusevent.CRStatusEventClient) (cloudevents.Client, error) {
return newCloudEventsClientCRStatus(env, "", nil, reporter, crStatusEventClient)
}
func newCloudEventsClientCRStatus(env EnvConfigAccessor, target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter,
crStatusEventClient *crstatusevent.CRStatusEventClient) (cloudevents.Client, error) {

if target == "" && env != nil {
target = env.GetSink()
}

pOpts := make([]http.Option, 0)
if len(target) > 0 {
pOpts = append(pOpts, cloudevents.WithTarget(target))
Expand All @@ -46,6 +57,19 @@ func NewCloudEventsClientCRStatus(target string, ceOverrides *duckv1.CloudEventO
Propagation: tracecontextb3.TraceContextEgress,
}))

if env != nil {
if sinkWait := env.GetSinktimeout(); sinkWait > 0 {
pOpts = append(pOpts, setTimeOut(time.Duration(sinkWait)*time.Second))
}
var err error
if ceOverrides == nil {
ceOverrides, err = env.GetCloudEventOverrides()
if err != nil {
return nil, err
}
}
}

p, err := cloudevents.NewHTTP(pOpts...)
if err != nil {
return nil, err
Expand All @@ -67,6 +91,19 @@ func NewCloudEventsClientCRStatus(target string, ceOverrides *duckv1.CloudEventO
}, nil
}

func setTimeOut(duration time.Duration) http.Option {
return func(p *http.Protocol) error {
if p == nil {
return fmt.Errorf("http target option can not set nil protocol")
}
if p.Client == nil {
p.Client = &nethttp.Client{}
}
p.Client.Timeout = duration
return nil
}
}

type client struct {
ceClient cloudevents.Client
ceOverrides *duckv1.CloudEventOverrides
Expand Down
61 changes: 59 additions & 2 deletions pkg/adapter/v2/cloudevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ package adapter

import (
"context"
"os"
"strconv"
"testing"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/protocol/http"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/source"

Expand All @@ -44,8 +48,10 @@ func TestNewCloudEventsClient_send(t *testing.T) {
testCases := map[string]struct {
ceOverrides *duckv1.CloudEventOverrides
event *cloudevents.Event
timeout int
}{
"none": {},
"timeout": {timeout: 13},
"none": {},
"send": {
event: func() *cloudevents.Event {
event := cloudevents.NewEvent()
Expand All @@ -70,10 +76,61 @@ func TestNewCloudEventsClient_send(t *testing.T) {
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
ceClient, err := NewCloudEventsClient(fakeURL, tc.ceOverrides, &mockReporter{})
restoreHTTP := cloudevents.NewHTTP
restoreEnv, setEnv := os.LookupEnv("K_SINK_TIMEOUT")
if tc.timeout != 0 {
if err := os.Setenv("K_SINK_TIMEOUT", strconv.Itoa(tc.timeout)); err != nil {
t.Error(err)
}
}

defer func(restoreHTTP func(opts ...http.Option) (*http.Protocol, error), restoreEnv string, setEnv bool) {
cloudevents.NewHTTP = restoreHTTP
if setEnv {
if err := os.Setenv("K_SINK_TIMEOUT", restoreEnv); err != nil {
t.Error(err)
}
} else {
if err := os.Unsetenv("K_SINK_TIMEOUT"); err != nil {
t.Error(err)
}
}

}(restoreHTTP, restoreEnv, setEnv)

sendOptions := []http.Option{}
cloudevents.NewHTTP = func(opts ...http.Option) (*http.Protocol, error) {
sendOptions = append(sendOptions, opts...)
return nil, nil
}
envConfigAccessor := ConstructEnvOrDie(func() EnvConfigAccessor {
return &EnvConfig{}

})

ceClient, err := NewCloudEventsClientCRStatus(envConfigAccessor, &mockReporter{}, nil)
if err != nil {
t.Fail()
}
timeoutSet := false
if tc.timeout != 0 {
for _, opt := range sendOptions {
p := http.Protocol{}
if err := opt(&p); err != nil {
t.Error(err)
}
if p.Client != nil {
if p.Client.Timeout == time.Duration(tc.timeout)*time.Second {
timeoutSet = true
}
}

}
if !timeoutSet {
t.Error("Expected timout to be set")
}

}
got, ok := ceClient.(*client)
if !ok {
t.Errorf("expected NewCloudEventsClient to return a *client, but did not")
Expand Down
29 changes: 29 additions & 0 deletions pkg/adapter/v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package adapter

import (
"encoding/json"
"os"
"strconv"
"time"

"go.uber.org/zap"
Expand All @@ -43,6 +45,7 @@ const (
EnvConfigLoggingConfig = "K_LOGGING_CONFIG"
EnvConfigTracingConfig = "K_TRACING_CONFIG"
EnvConfigLeaderElectionConfig = "K_LEADER_ELECTION_CONFIG"
EnvSinkTimeout = "K_SINK_TIMEOUT"
)

// EnvConfig is the minimal set of configuration parameters
Expand Down Expand Up @@ -85,6 +88,9 @@ type EnvConfig struct {

// LeaderElectionConfigJson is the leader election component configuration.
LeaderElectionConfigJson string `envconfig:"K_LEADER_ELECTION_CONFIG"`

// Time in seconds to wait for sink to respond
EnvSinkTimeout int `envconfig:"K_SINK_TIMEOUT"`
}

// EnvConfigAccessor defines accessors for the minimal
Expand Down Expand Up @@ -114,6 +120,9 @@ type EnvConfigAccessor interface {

// GetLeaderElectionConfig returns leader election configuration.
GetLeaderElectionConfig() (*kle.ComponentConfig, error)

// Get the name of the adapter.
GetSinktimeout() int
}

var _ EnvConfigAccessor = (*EnvConfig)(nil)
Expand Down Expand Up @@ -158,6 +167,10 @@ func (e *EnvConfig) GetName() string {
return e.Name
}

func (e *EnvConfig) GetSinktimeout() int {
return e.EnvSinkTimeout
}

func (e *EnvConfig) SetupTracing(logger *zap.SugaredLogger) error {
config, err := tracingconfig.JsonToTracingConfig(e.TracingConfigJson)
if err != nil {
Expand Down Expand Up @@ -209,3 +222,19 @@ func LeaderElectionComponentConfigToJson(cfg *kle.ComponentConfig) (string, erro
jsonCfg, err := json.Marshal(cfg)
return string(jsonCfg), err
}

func GetSinkTimeout(logger *zap.SugaredLogger) int {
str := os.Getenv(EnvSinkTimeout)
if str != "" {
var err error
duration, err := strconv.Atoi(str)
if err != nil || duration < 0 {
if logger != nil {
logger.Errorf("%s environment value is invalid. It must be a integer greater than zero. (got %s)", EnvSinkTimeout, str)
}
return -1
}
return duration
}
return -1
}
11 changes: 10 additions & 1 deletion pkg/adapter/v2/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func TestEnvConfig(t *testing.T) {
os.Setenv("K_LOGGING_CONFIG", "logging")
os.Setenv("K_TRACING_CONFIG", "tracing")
os.Setenv("K_LEADER_ELECTION_CONFIG", "leaderelection")
os.Setenv("MODE", "mymode") // note: custom to this test impl
os.Setenv("MODE", "mymode") // note: custom to this test impl
os.Setenv("K_SINK_TIMEOUT", "999") // note: custom to this test impl

defer func() {
os.Unsetenv("K_SINK")
Expand All @@ -45,6 +46,7 @@ func TestEnvConfig(t *testing.T) {
os.Unsetenv("K_TRACING_CONFIG")
os.Unsetenv("K_LEADER_ELECTION_CONFIG")
os.Unsetenv("MODE")
os.Unsetenv("K_SINK_TIMEOUT")
}()

var env myEnvConfig
Expand All @@ -65,4 +67,11 @@ func TestEnvConfig(t *testing.T) {
t.Errorf("Expected LeaderElectionConfigJson leaderelection, got: %s", env.LeaderElectionConfigJson)
}

if sinkTimeout := GetSinkTimeout(nil); sinkTimeout != 999 {
t.Error("Expected GetSinkTimeout to be 999, got:", sinkTimeout)
}
if env.EnvSinkTimeout != 999 {
t.Error("Expected env.EnvSinkTimeout to be 999, got:", env.EnvSinkTimeout)
}

}
8 changes: 1 addition & 7 deletions pkg/adapter/v2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"flag"
"fmt"

"log"
"net/http"
"strconv"
Expand Down Expand Up @@ -160,12 +159,7 @@ func MainWithInformers(ctx context.Context, component string, env EnvConfigAcces
logger.Error("Error setting up trace publishing", zap.Error(err))
}

ceOverrides, err := env.GetCloudEventOverrides()
if err != nil {
logger.Error("Error loading cloudevents overrides", zap.Error(err))
}

eventsClient, err := NewCloudEventsClientCRStatus(env.GetSink(), ceOverrides, reporter, crStatusEventClient)
eventsClient, err := NewCloudEventsClientCRStatus(env, reporter, crStatusEventClient)
if err != nil {
logger.Fatal("Error building cloud event client", zap.Error(err))
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/reconciler/pingsource/pingsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/pointer"
"knative.dev/eventing/pkg/adapter/v2"

appsv1listers "k8s.io/client-go/listers/apps/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
Expand Down Expand Up @@ -156,6 +157,7 @@ func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1beta
MetricsConfig: metricsConfig,
LeConfig: r.leConfig,
NoShutdownAfter: mtping.GetNoShutDownAfterValue(),
SinkTimeout: adapter.GetSinkTimeout(logging.FromContext(ctx)),
}
expected := resources.MakeReceiveAdapterEnvVar(args)

Expand Down
2 changes: 2 additions & 0 deletions pkg/reconciler/pingsource/pingsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/eventing/pkg/adapter/v2"
"knative.dev/pkg/system"

"knative.dev/eventing/pkg/adapter/mtping"
Expand Down Expand Up @@ -197,6 +198,7 @@ func TestAllCases(t *testing.T) {
func MakeMTAdapter() *appsv1.Deployment {
args := resources.Args{
NoShutdownAfter: mtping.GetNoShutDownAfterValue(),
SinkTimeout: adapter.GetSinkTimeout(nil),
}
return &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
Expand Down
4 changes: 4 additions & 0 deletions pkg/reconciler/pingsource/resources/receive_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Args struct {
LoggingConfig string
LeConfig string
NoShutdownAfter int
SinkTimeout int
}

// MakeReceiveAdapterEnvVar generates the environment variables for the pingsources
Expand All @@ -53,6 +54,9 @@ func MakeReceiveAdapterEnvVar(args Args) []corev1.EnvVar {
}, {
Name: mtping.EnvNoShutdownAfter,
Value: strconv.Itoa(args.NoShutdownAfter),
}, {
Name: adapter.EnvSinkTimeout,
Value: strconv.Itoa(args.SinkTimeout),
}}

}
Loading

0 comments on commit 5b33bb9

Please sign in to comment.