Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

delay when to shutdown the mtping adapter #3831

Merged
merged 3 commits into from
Aug 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion cmd/mtping/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ const (
)

func main() {
ctx := signals.NewContext()
sctx := signals.NewContext()

// When cancelling the adapter to close to the minute, there is
// a risk of losing events due to either the delay of starting a new pod
// or for the passive pod to become active (when HA is enabled and replicas > 1).
// So when receiving a SIGTEM signal, delay the cancellation of the adapter,
// which under the cover delays the release of the lease.
ctx := mtping.NewDelayingContext(sctx, mtping.GetNoShutDownAfterValue())

ctx = adapter.WithConfigMapWatcherEnabled(ctx)
ctx = adapter.WithInjectorEnabled(ctx)
ctx = adapter.WithHAEnabled(ctx)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/pelletier/go-toml v1.8.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pkg/errors v0.9.1
github.com/prometheus/common v0.9.1
github.com/rickb777/date v1.13.0
github.com/robfig/cron/v3 v3.0.1
github.com/rogpeppe/fastuuid v1.2.0
Expand Down
23 changes: 21 additions & 2 deletions pkg/adapter/mtping/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,23 @@ package mtping
import (
"context"
"flag"

"github.com/robfig/cron/v3"
"os"
"strconv"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/prometheus/common/log"
"github.com/robfig/cron/v3"
"go.uber.org/zap"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/adapter/v2"
)

const (
EnvNoShutdownAfter = "K_NO_SHUTDOWN_AFTER"
)

var (
// withSeconds enables schedules with seconds.
withSeconds bool
Expand Down Expand Up @@ -76,3 +82,16 @@ func (a *mtpingAdapter) Start(ctx context.Context) error {
a.logger.Infof("runner stopped")
return nil
}

func GetNoShutDownAfterValue() int {
str := os.Getenv(EnvNoShutdownAfter)
if str != "" {
second, err := strconv.Atoi(str)
if err != nil || second < 0 || second > 59 {
log.Warnf("%s environment value is invalid. It must be a integer between 0 and 59. (got %s)", EnvNoShutdownAfter, str)
} else {
return second
}
}
return 55 // seems a reasonable default
}
40 changes: 40 additions & 0 deletions pkg/adapter/mtping/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mtping

import (
"context"
"time"
)

// NewDelayingContext returns a new context delaying the
// cancellation of the given context.
func NewDelayingContext(ctx context.Context, afterSecond int) context.Context {
delayCtx, cancel := context.WithCancel(context.Background())
go func() {
<-ctx.Done()

s := time.Now().Second()
if s > afterSecond {
// Make sure to pass the minute by adding 1 second.
time.Sleep(time.Second * time.Duration(60-s+1))
}

cancel()
}()
return delayCtx
}
57 changes: 57 additions & 0 deletions pkg/adapter/mtping/context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mtping

import (
"context"
"fmt"
"testing"
"time"
)

func TestWithDelayedCancellation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

delayedCtx := NewDelayingContext(ctx, 55)
go func() {
<-delayedCtx.Done()
}()

fmt.Println(time.Now())
waitForSecond(56)
cancel()

<-delayedCtx.Done()

fmt.Println(time.Now())
csecond := time.Now().Second()
if csecond > 55 {
t.Error("expected current second to be less than 55")
}
}

func waitForSecond(second int) {
csecond := time.Now().Second()
if csecond == second {
return
}
if csecond > second {
time.Sleep(time.Duration(second+60-csecond) * time.Second)
} else {
time.Sleep(time.Duration(second-csecond) * time.Second)
}
}
7 changes: 4 additions & 3 deletions pkg/adapter/mtping/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,14 @@ func (a *cronJobsRunner) RemoveSchedule(id cron.EntryID) {

func (a *cronJobsRunner) Start(stopCh <-chan struct{}) {
a.cron.Start()
<-stopCh // main channel that gets closed once term signal is received
<-stopCh
}

func (a *cronJobsRunner) Stop() {
ctx := a.cron.Stop() // no more ticks
if ctx != nil { // ctx gets done when all jobs complete
<-ctx.Done() // wait for all to be done.
if ctx != nil {
// wait for all jobs to be done.
<-ctx.Done()
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/pingsource/pingsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"encoding/json"
"fmt"

"knative.dev/eventing/pkg/adapter/mtping"

"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -308,6 +310,7 @@ func (r *Reconciler) reconcileMTReceiveAdapter(ctx context.Context, source *v1al
LoggingConfig: loggingConfig,
MetricsConfig: metricsConfig,
LeConfig: r.leConfig,
NoShutdownAfter: mtping.GetNoShutDownAfterValue(),
}
expected := resources.MakeMTReceiveAdapter(args)

Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/pingsource/pingsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"testing"

"knative.dev/eventing/pkg/adapter/mtping"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
Expand Down Expand Up @@ -622,6 +624,7 @@ func MakeMTAdapter() *appsv1.Deployment {
ServiceAccountName: mtadapterName,
MTAdapterName: mtadapterName,
Image: mtimage,
NoShutdownAfter: mtping.GetNoShutDownAfterValue(),
}
return resources.MakeMTReceiveAdapter(args)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/reconciler/pingsource/resources/mt_receive_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ limitations under the License.
package resources

import (
"strconv"

v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/eventing/pkg/adapter/mtping"
"knative.dev/eventing/pkg/adapter/v2"
"knative.dev/pkg/system"
)
Expand All @@ -39,6 +42,7 @@ type MTArgs struct {
MetricsConfig string
LoggingConfig string
LeConfig string
NoShutdownAfter int
}

// MakeMTReceiveAdapter generates the mtping deployment for pingsources
Expand Down Expand Up @@ -115,5 +119,8 @@ func makeEnv(args MTArgs) []corev1.EnvVar {
}, {
Name: adapter.EnvConfigLeaderElectionConfig,
Value: args.LeConfig,
}, {
Name: mtping.EnvNoShutdownAfter,
Value: strconv.Itoa(args.NoShutdownAfter),
}}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestMakeMTPingAdapter(t *testing.T) {
Image: "test-image",
MetricsConfig: "metrics",
LoggingConfig: "logging",
NoShutdownAfter: 40,
}

want := &v1.Deployment{
Expand Down Expand Up @@ -82,6 +83,9 @@ func TestMakeMTPingAdapter(t *testing.T) {
}, {
Name: "K_LEADER_ELECTION_CONFIG",
Value: "",
}, {
Name: "K_NO_SHUTDOWN_AFTER",
Value: "40",
}},
// Set low resource requests and limits.
// This should be configurable.
Expand Down
1 change: 1 addition & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ github.com/prometheus/client_golang/prometheus/promhttp
# github.com/prometheus/client_model v0.2.0
github.com/prometheus/client_model/go
# github.com/prometheus/common v0.9.1
## explicit
github.com/prometheus/common/expfmt
github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg
github.com/prometheus/common/log
Expand Down