Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding RateLimiter to GCP PubSub channel to backoff retries on dispatch failures #775

Merged
merged 11 commits into from
Feb 6, 2019
Merged
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions contrib/gcppubsub/pkg/dispatcher/dispatcher/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package dispatcher
import (
"context"
"sync"
"time"

"k8s.io/client-go/util/workqueue"

"sigs.k8s.io/controller-runtime/pkg/event"

pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util"
"go.uber.org/zap"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -36,11 +39,20 @@ const (
// controllerAgentName is the string used by this controller to identify itself when creating
// events.
controllerAgentName = "gcp-pubsub-channel-dispatcher"

// Exponential backoff constants used to limit the pace at which
// we nack messages in order to throttle the channel retries.
// The backoff is computed as follows: min(expBackoffMaxDelay, expBackoffBaseDelay* 2^#failures).
// expBackoffMaxDelay should be less than subscription.ReceiveSettings.MaxExtension,
// which is the maximum period for which the Subscription extends the ack deadline
// for each message. It is currently set to 10 minutes.
expBackoffBaseDelay = 1 * time.Second
expBackoffMaxDelay = 5 * time.Minute
)

// New returns a Controller that represents the dispatcher portion (messages from GCP PubSub are
// sent into the cluster) of the GCP PubSub dispatcher. We use a reconcile loop to watch all
// Channels and notice changes to them.
// Channels and notice changes to them. It uses an exponential backoff to throttle the retries.
func New(mgr manager.Manager, logger *zap.Logger, stopCh <-chan struct{}) (controller.Controller, error) {
// reconcileChan is used when the dispatcher itself needs to force reconciliation of a Channel.
reconcileChan := make(chan event.GenericEvent)
Expand All @@ -58,6 +70,8 @@ func New(mgr manager.Manager, logger *zap.Logger, stopCh <-chan struct{}) (contr

subscriptionsLock: sync.Mutex{},
subscriptions: map[channelName]map[subscriptionName]context.CancelFunc{},

rateLimiter: workqueue.NewItemExponentialFailureRateLimiter(expBackoffBaseDelay, expBackoffMaxDelay),
}

c, err := controller.New(controllerAgentName, mgr, controller.Options{
Expand Down
29 changes: 22 additions & 7 deletions contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ import (
"context"
"errors"
"sync"
"time"

"k8s.io/client-go/util/workqueue"

eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
util "github.com/knative/eventing/pkg/provisioners"
ccpcontroller "github.com/knative/eventing/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner"
pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util"
"github.com/knative/eventing/contrib/gcppubsub/pkg/util/logging"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
util "github.com/knative/eventing/pkg/provisioners"
"go.uber.org/zap"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -63,6 +66,9 @@ type reconciler struct {
// function must be called when we no longer want that subscription to be active. Logically it
// is a map from Channel name to Subscription name to CancelFunc.
subscriptions map[channelName]map[subscriptionName]context.CancelFunc

// rateLimiter is used to limit the pace at which we nack a message when it could not be dispatched.
rateLimiter workqueue.RateLimiter
}

// Verify the struct implements reconcile.Reconciler
Expand Down Expand Up @@ -290,7 +296,7 @@ func (r *reconciler) receiveMessagesBlocking(ctxWithCancel context.Context, enqu
logging.FromContext(ctxWithCancel).Info("subscription.Receive start")
receiveErr := subscription.Receive(
ctxWithCancel,
receiveFunc(logging.FromContext(ctxWithCancel).Sugar(), sub, defaults, r.dispatcher))
receiveFunc(logging.FromContext(ctxWithCancel).Sugar(), sub, defaults, r.dispatcher, r.rateLimiter, time.Sleep))
// We want to minimize holding the lock. r.reconcileChan may block, so definitely do not do
// it under lock. But, to prevent a race condition, we must delete from r.subscriptions
// before using r.reconcileChan.
Expand All @@ -312,18 +318,27 @@ func (r *reconciler) receiveMessagesBlocking(ctxWithCancel context.Context, enqu
}
}

func receiveFunc(logger *zap.SugaredLogger, sub pubsubutil.GcpPubSubSubscriptionStatus, defaults provisioners.DispatchDefaults, dispatcher provisioners.Dispatcher) func(context.Context, pubsubutil.PubSubMessage) {
func receiveFunc(logger *zap.SugaredLogger, sub pubsubutil.GcpPubSubSubscriptionStatus, defaults provisioners.DispatchDefaults, dispatcher provisioners.Dispatcher, rateLimiter workqueue.RateLimiter, waitFunc func(duration time.Duration)) func(context.Context, pubsubutil.PubSubMessage) {
return func(ctx context.Context, msg pubsubutil.PubSubMessage) {
message := &provisioners.Message{
Headers: msg.Attributes(),
Payload: msg.Data(),
}
err := dispatcher.DispatchMessage(message, sub.SubscriberURI, sub.ReplyURI, defaults)
if err != nil {
logger.Error("Message dispatch failed", zap.Error(err), zap.String("pubSubMessageId", msg.ID()))
// Compute the wait time to nack this message.
// As soon as we nack a message, the GcpPubSub channel will attempt the retry.
// We use this as a mechanism to backoff retries.
sleepDuration := rateLimiter.When(msg.ID())
// Blocking, might need to run this on a separate go routine to improve throughput.
logger.Desugar().Error("Message dispatch failed, waiting to nack", zap.Error(err), zap.String("pubSubMessageId", msg.ID()), zap.Float64("backoffSec", sleepDuration.Seconds()))
nachocano marked this conversation as resolved.
Show resolved Hide resolved
waitFunc(sleepDuration)
msg.Nack()
} else {
logger.Debug("Message dispatch succeeded", zap.String("pubSubMessageId", msg.ID()))
// If there were any failures for this message, remove it from the rateLimiter backed map.
rateLimiter.Forget(msg.ID())
// Acknowledge the dispatch.
logger.Desugar().Debug("Message dispatch succeeded", zap.String("pubSubMessageId", msg.ID()))
msg.Ack()
}
}
Expand Down
116 changes: 96 additions & 20 deletions contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ import (
"context"
"errors"
"fmt"
"math"
"reflect"
"sync"
"testing"
"time"

"k8s.io/client-go/util/workqueue"

"github.com/knative/eventing/contrib/gcppubsub/pkg/util"

"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -389,15 +393,28 @@ func TestReconcile(t *testing.T) {

func TestReceiveFunc(t *testing.T) {
testCases := map[string]struct {
ack bool
dispatcherErr error
ack bool
dispatcherErr error
dispatchAttempts int
}{
"dispatch error": {
ack: false,
dispatcherErr: errors.New(testErrorMessage),
"dispatch error with few dispatch attempts": {
ack: false,
dispatcherErr: errors.New(testErrorMessage),
dispatchAttempts: 3,
},
"dispatch error with capped backoff": {
ack: false,
dispatcherErr: errors.New(testErrorMessage),
dispatchAttempts: 20,
},
"dispatch success": {
ack: true,
"dispatch success on first attempt": {
ack: true,
dispatchAttempts: 1,
},
"dispatch success after many failed attempts": {
ack: true,
dispatcherErr: errors.New(testErrorMessage),
dispatchAttempts: 10,
},
}
for n, tc := range testCases {
Expand All @@ -410,21 +427,41 @@ func TestReceiveFunc(t *testing.T) {
defaults := provisioners.DispatchDefaults{
Namespace: cNamespace,
}
rf := receiveFunc(zap.NewNop().Sugar(), sub, defaults, &fakeDispatcher{err: tc.dispatcherErr})
msg := fakepubsub.Message{}
rf(context.TODO(), &msg)
dispatcher := &fakeDispatcher{ack: tc.ack, err: tc.dispatcherErr, errCounter: tc.dispatchAttempts}
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(expBackoffBaseDelay, expBackoffMaxDelay)
waiter := fakeWaiter{make([]time.Duration, 0)}
rf := receiveFunc(zap.NewNop().Sugar(), sub, defaults, dispatcher, rateLimiter, waiter.sleep)

for i := 1; i <= tc.dispatchAttempts; i++ {
nachocano marked this conversation as resolved.
Show resolved Hide resolved
msg := fakepubsub.Message{}
rf(context.TODO(), &msg)

if msg.MessageData.Ack && msg.MessageData.Nack {
t.Error("Message both Acked and Nacked")
if msg.MessageData.Ack && msg.MessageData.Nack {
t.Errorf("Message both Acked and Nacked on attempt %d", i)
}
if tc.ack && i == tc.dispatchAttempts {
if !msg.MessageData.Ack {
t.Errorf("Message should have been Acked on attempt %d. It wasn't.", i)
}
if rateLimiter.NumRequeues(msg.ID()) != 0 {
t.Errorf("Message should have removed from retry queue on attempt %d. It wasn't.", i)
}
} else {
if !msg.MessageData.Nack {
t.Errorf("Message should have been Nacked on attempt %d. It wasn't.", i)
}
}
}
var failures int
if tc.ack {
if !msg.MessageData.Ack {
t.Error("Message should have been Acked. It wasn't.")
}
failures = tc.dispatchAttempts - 1
} else {
if !msg.MessageData.Nack {
t.Error("Message should have been Nacked. It wasn't.")
}
failures = tc.dispatchAttempts
}
// We validate the backoff times
expectedWaitTimes := computeBackoffs(failures, expBackoffBaseDelay, expBackoffMaxDelay)
if !reflect.DeepEqual(expectedWaitTimes, waiter.WaitTimes) {
t.Errorf("Expected backoff times %d, got %d", getDurationsInSeconds(expectedWaitTimes), getDurationsInSeconds(waiter.WaitTimes))
}
})
}
Expand Down Expand Up @@ -618,9 +655,48 @@ func (cc *cancelChecker) verify(t *testing.T, _ *controllertesting.TestCase) {
}

type fakeDispatcher struct {
err error
ack bool
err error
errCounter int
}

func (d *fakeDispatcher) DispatchMessage(_ *provisioners.Message, _, _ string, _ provisioners.DispatchDefaults) error {
return d.err
if !d.ack {
return d.err
}
// Decrease the errCounter to simulate a final ack after many nacks.
d.errCounter--
if d.errCounter > 0 {
return d.err
}
return nil
}

type fakeWaiter struct {
WaitTimes []time.Duration
}

func (fw *fakeWaiter) sleep(duration time.Duration) {
fw.WaitTimes = append(fw.WaitTimes, duration)
}

func computeBackoffs(failures int, baseDelay, maxDelay time.Duration) []time.Duration {
durations := make([]time.Duration, 0, failures)
for i := 0; i < failures; i++ {
backoff := float64(baseDelay) * math.Pow(2, float64(i))
calculated := time.Duration(backoff)
if calculated > maxDelay {
calculated = maxDelay
}
durations = append(durations, calculated)
}
return durations
}

func getDurationsInSeconds(durations []time.Duration) []int {
durationsInSeconds := make([]int, 0, len(durations))
for _, duration := range durations {
durationsInSeconds = append(durationsInSeconds, int(duration.Seconds()))
}
return durationsInSeconds
}