From 52a210fd9548a60fe100008d16a43296bd217183 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Mon, 17 Aug 2020 06:10:07 -0400 Subject: [PATCH] delay when to shutdown the mtping adapter (#3831) * delay when to shutdown the mtping adapter * make shutdown delay configurable * only delay the signal context --- cmd/mtping/main.go | 10 +++- go.mod | 1 + pkg/adapter/mtping/adapter.go | 23 +++++++- pkg/adapter/mtping/context.go | 40 +++++++++++++ pkg/adapter/mtping/context_test.go | 57 +++++++++++++++++++ pkg/adapter/mtping/runner.go | 7 ++- pkg/reconciler/pingsource/pingsource.go | 3 + pkg/reconciler/pingsource/pingsource_test.go | 3 + .../resources/mt_receive_adapter.go | 7 +++ .../resources/mt_receive_adapter_test.go | 4 ++ vendor/modules.txt | 1 + 11 files changed, 150 insertions(+), 6 deletions(-) create mode 100644 pkg/adapter/mtping/context.go create mode 100644 pkg/adapter/mtping/context_test.go diff --git a/cmd/mtping/main.go b/cmd/mtping/main.go index c9a964fb174..cd4c676e3e9 100644 --- a/cmd/mtping/main.go +++ b/cmd/mtping/main.go @@ -28,7 +28,15 @@ const ( ) func main() { - ctx := signals.NewContext() + sctx := signals.NewContext() + + // When cancelling the adapter to close to the minute, there is + // a risk of losing events due to either the delay of starting a new pod + // or for the passive pod to become active (when HA is enabled and replicas > 1). + // So when receiving a SIGTEM signal, delay the cancellation of the adapter, + // which under the cover delays the release of the lease. + ctx := mtping.NewDelayingContext(sctx, mtping.GetNoShutDownAfterValue()) + ctx = adapter.WithConfigMapWatcherEnabled(ctx) ctx = adapter.WithInjectorEnabled(ctx) ctx = adapter.WithHAEnabled(ctx) diff --git a/go.mod b/go.mod index 88fca4c3603..4d125c2fd9d 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/pelletier/go-toml v1.8.0 github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 github.com/pkg/errors v0.9.1 + github.com/prometheus/common v0.9.1 github.com/rickb777/date v1.13.0 github.com/robfig/cron/v3 v3.0.1 github.com/rogpeppe/fastuuid v1.2.0 diff --git a/pkg/adapter/mtping/adapter.go b/pkg/adapter/mtping/adapter.go index 0dbc8e0fbd3..af24217d7a2 100644 --- a/pkg/adapter/mtping/adapter.go +++ b/pkg/adapter/mtping/adapter.go @@ -19,10 +19,12 @@ package mtping import ( "context" "flag" - - "github.com/robfig/cron/v3" + "os" + "strconv" cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/prometheus/common/log" + "github.com/robfig/cron/v3" "go.uber.org/zap" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/logging" @@ -30,6 +32,10 @@ import ( "knative.dev/eventing/pkg/adapter/v2" ) +const ( + EnvNoShutdownAfter = "K_NO_SHUTDOWN_AFTER" +) + var ( // withSeconds enables schedules with seconds. withSeconds bool @@ -76,3 +82,16 @@ func (a *mtpingAdapter) Start(ctx context.Context) error { a.logger.Infof("runner stopped") return nil } + +func GetNoShutDownAfterValue() int { + str := os.Getenv(EnvNoShutdownAfter) + if str != "" { + second, err := strconv.Atoi(str) + if err != nil || second < 0 || second > 59 { + log.Warnf("%s environment value is invalid. It must be a integer between 0 and 59. (got %s)", EnvNoShutdownAfter, str) + } else { + return second + } + } + return 55 // seems a reasonable default +} diff --git a/pkg/adapter/mtping/context.go b/pkg/adapter/mtping/context.go new file mode 100644 index 00000000000..5c77f699c89 --- /dev/null +++ b/pkg/adapter/mtping/context.go @@ -0,0 +1,40 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mtping + +import ( + "context" + "time" +) + +// NewDelayingContext returns a new context delaying the +// cancellation of the given context. +func NewDelayingContext(ctx context.Context, afterSecond int) context.Context { + delayCtx, cancel := context.WithCancel(context.Background()) + go func() { + <-ctx.Done() + + s := time.Now().Second() + if s > afterSecond { + // Make sure to pass the minute by adding 1 second. + time.Sleep(time.Second * time.Duration(60-s+1)) + } + + cancel() + }() + return delayCtx +} diff --git a/pkg/adapter/mtping/context_test.go b/pkg/adapter/mtping/context_test.go new file mode 100644 index 00000000000..96dd8a40b69 --- /dev/null +++ b/pkg/adapter/mtping/context_test.go @@ -0,0 +1,57 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mtping + +import ( + "context" + "fmt" + "testing" + "time" +) + +func TestWithDelayedCancellation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + delayedCtx := NewDelayingContext(ctx, 55) + go func() { + <-delayedCtx.Done() + }() + + fmt.Println(time.Now()) + waitForSecond(56) + cancel() + + <-delayedCtx.Done() + + fmt.Println(time.Now()) + csecond := time.Now().Second() + if csecond > 55 { + t.Error("expected current second to be less than 55") + } +} + +func waitForSecond(second int) { + csecond := time.Now().Second() + if csecond == second { + return + } + if csecond > second { + time.Sleep(time.Duration(second+60-csecond) * time.Second) + } else { + time.Sleep(time.Duration(second-csecond) * time.Second) + } +} diff --git a/pkg/adapter/mtping/runner.go b/pkg/adapter/mtping/runner.go index 36641266ac5..cdbad8b3240 100644 --- a/pkg/adapter/mtping/runner.go +++ b/pkg/adapter/mtping/runner.go @@ -109,13 +109,14 @@ func (a *cronJobsRunner) RemoveSchedule(id cron.EntryID) { func (a *cronJobsRunner) Start(stopCh <-chan struct{}) { a.cron.Start() - <-stopCh // main channel that gets closed once term signal is received + <-stopCh } func (a *cronJobsRunner) Stop() { ctx := a.cron.Stop() // no more ticks - if ctx != nil { // ctx gets done when all jobs complete - <-ctx.Done() // wait for all to be done. + if ctx != nil { + // wait for all jobs to be done. + <-ctx.Done() } } diff --git a/pkg/reconciler/pingsource/pingsource.go b/pkg/reconciler/pingsource/pingsource.go index 5fc1aa5566c..8e60f0eccb6 100644 --- a/pkg/reconciler/pingsource/pingsource.go +++ b/pkg/reconciler/pingsource/pingsource.go @@ -21,6 +21,8 @@ import ( "encoding/json" "fmt" + "knative.dev/eventing/pkg/adapter/mtping" + "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -308,6 +310,7 @@ func (r *Reconciler) reconcileMTReceiveAdapter(ctx context.Context, source *v1al LoggingConfig: loggingConfig, MetricsConfig: metricsConfig, LeConfig: r.leConfig, + NoShutdownAfter: mtping.GetNoShutDownAfterValue(), } expected := resources.MakeMTReceiveAdapter(args) diff --git a/pkg/reconciler/pingsource/pingsource_test.go b/pkg/reconciler/pingsource/pingsource_test.go index e93e4beb829..9a775588b9e 100644 --- a/pkg/reconciler/pingsource/pingsource_test.go +++ b/pkg/reconciler/pingsource/pingsource_test.go @@ -21,6 +21,8 @@ import ( "fmt" "testing" + "knative.dev/eventing/pkg/adapter/mtping" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -622,6 +624,7 @@ func MakeMTAdapter() *appsv1.Deployment { ServiceAccountName: mtadapterName, MTAdapterName: mtadapterName, Image: mtimage, + NoShutdownAfter: mtping.GetNoShutDownAfterValue(), } return resources.MakeMTReceiveAdapter(args) } diff --git a/pkg/reconciler/pingsource/resources/mt_receive_adapter.go b/pkg/reconciler/pingsource/resources/mt_receive_adapter.go index 6d5edd93b1b..663eb308f83 100644 --- a/pkg/reconciler/pingsource/resources/mt_receive_adapter.go +++ b/pkg/reconciler/pingsource/resources/mt_receive_adapter.go @@ -17,10 +17,13 @@ limitations under the License. package resources import ( + "strconv" + v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/eventing/pkg/adapter/mtping" "knative.dev/eventing/pkg/adapter/v2" "knative.dev/pkg/system" ) @@ -39,6 +42,7 @@ type MTArgs struct { MetricsConfig string LoggingConfig string LeConfig string + NoShutdownAfter int } // MakeMTReceiveAdapter generates the mtping deployment for pingsources @@ -115,5 +119,8 @@ func makeEnv(args MTArgs) []corev1.EnvVar { }, { Name: adapter.EnvConfigLeaderElectionConfig, Value: args.LeConfig, + }, { + Name: mtping.EnvNoShutdownAfter, + Value: strconv.Itoa(args.NoShutdownAfter), }} } diff --git a/pkg/reconciler/pingsource/resources/mt_receive_adapter_test.go b/pkg/reconciler/pingsource/resources/mt_receive_adapter_test.go index 3cf68538e70..444ccc0049b 100644 --- a/pkg/reconciler/pingsource/resources/mt_receive_adapter_test.go +++ b/pkg/reconciler/pingsource/resources/mt_receive_adapter_test.go @@ -37,6 +37,7 @@ func TestMakeMTPingAdapter(t *testing.T) { Image: "test-image", MetricsConfig: "metrics", LoggingConfig: "logging", + NoShutdownAfter: 40, } want := &v1.Deployment{ @@ -82,6 +83,9 @@ func TestMakeMTPingAdapter(t *testing.T) { }, { Name: "K_LEADER_ELECTION_CONFIG", Value: "", + }, { + Name: "K_NO_SHUTDOWN_AFTER", + Value: "40", }}, // Set low resource requests and limits. // This should be configurable. diff --git a/vendor/modules.txt b/vendor/modules.txt index b7cb96d8f08..dcfacff1f58 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -256,6 +256,7 @@ github.com/prometheus/client_golang/prometheus/promhttp # github.com/prometheus/client_model v0.2.0 github.com/prometheus/client_model/go # github.com/prometheus/common v0.9.1 +## explicit github.com/prometheus/common/expfmt github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg github.com/prometheus/common/log