Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Add client side backoff retries for pubsub messages #1218

Merged
merged 10 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions cmd/broker/fanout/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"cloud.google.com/go/pubsub"

"github.com/google/knative-gcp/pkg/broker/config/volume"
"github.com/google/knative-gcp/pkg/broker/handler/pool"
"github.com/google/knative-gcp/pkg/broker/handler"
metadataClient "github.com/google/knative-gcp/pkg/gclient/metadata"
"github.com/google/knative-gcp/pkg/metrics"
"github.com/google/knative-gcp/pkg/utils"
Expand Down Expand Up @@ -80,7 +80,7 @@ func main() {
syncSignal := poolSyncSignal(ctx, targetsUpdateCh)
syncPool, err := InitializeSyncPool(
ctx,
pool.ProjectID(projectID),
handler.ProjectID(projectID),
metrics.PodName(env.PodName),
metrics.ContainerName(component),
[]volume.Option{
Expand All @@ -92,7 +92,7 @@ func main() {
if err != nil {
logger.Fatal("Failed to create fanout sync pool", zap.Error(err))
}
if _, err := pool.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, pool.DefaultHealthCheckPort); err != nil {
if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultHealthCheckPort); err != nil {
logger.Fatalw("Failed to start fanout sync pool", zap.Error(err))
}

Expand Down Expand Up @@ -123,21 +123,21 @@ func poolSyncSignal(ctx context.Context, targetsUpdateCh chan struct{}) chan str
return ch
}

func buildPoolOptions(env envConfig) []pool.Option {
func buildPoolOptions(env envConfig) []handler.Option {
yolocs marked this conversation as resolved.
Show resolved Hide resolved
rs := pubsub.DefaultReceiveSettings
var opts []pool.Option
var opts []handler.Option
if env.HandlerConcurrency > 0 {
// Let the pubsub subscription and handler have the same concurrency?
opts = append(opts, pool.WithHandlerConcurrency(env.HandlerConcurrency))
opts = append(opts, handler.WithHandlerConcurrency(env.HandlerConcurrency))
rs.NumGoroutines = env.HandlerConcurrency
}
if env.MaxConcurrencyPerEvent > 0 {
opts = append(opts, pool.WithMaxConcurrentPerEvent(env.MaxConcurrencyPerEvent))
opts = append(opts, handler.WithMaxConcurrentPerEvent(env.MaxConcurrencyPerEvent))
}
if env.TimeoutPerEvent > 0 {
opts = append(opts, pool.WithTimeoutPerEvent(env.TimeoutPerEvent))
opts = append(opts, handler.WithTimeoutPerEvent(env.TimeoutPerEvent))
}
opts = append(opts, pool.WithPubsubReceiveSettings(rs))
opts = append(opts, handler.WithPubsubReceiveSettings(rs))
// The default CeClient is good?
return opts
}
10 changes: 5 additions & 5 deletions cmd/broker/fanout/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"context"

"github.com/google/knative-gcp/pkg/broker/config/volume"
"github.com/google/knative-gcp/pkg/broker/handler/pool"
"github.com/google/knative-gcp/pkg/broker/handler"
"github.com/google/knative-gcp/pkg/metrics"
"github.com/google/wire"
)
Expand All @@ -31,13 +31,13 @@ import (
// retry pool's pubsub client and uses targetsVolumeOpts to initialize the targets volume watcher.
func InitializeSyncPool(
ctx context.Context,
projectID pool.ProjectID,
projectID handler.ProjectID,
podName metrics.PodName,
containerName metrics.ContainerName,
targetsVolumeOpts []volume.Option,
opts ...pool.Option,
) (*pool.FanoutPool, error) {
opts ...handler.Option,
) (*handler.FanoutPool, error) {
// Implementation generated by wire. Providers for required FanoutPool dependencies should be
// added here.
panic(wire.Build(pool.ProviderSet, volume.NewTargetsFromFile, metrics.NewDeliveryReporter))
panic(wire.Build(handler.ProviderSet, volume.NewTargetsFromFile, metrics.NewDeliveryReporter))
}
14 changes: 7 additions & 7 deletions cmd/broker/fanout/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions cmd/broker/retry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"go.uber.org/zap"

"github.com/google/knative-gcp/pkg/broker/config/volume"
"github.com/google/knative-gcp/pkg/broker/handler/pool"
"github.com/google/knative-gcp/pkg/broker/handler"
metadataClient "github.com/google/knative-gcp/pkg/gclient/metadata"
"github.com/google/knative-gcp/pkg/metrics"
"github.com/google/knative-gcp/pkg/utils"
Expand Down Expand Up @@ -85,7 +85,7 @@ func main() {
syncSignal := poolSyncSignal(ctx, targetsUpdateCh)
syncPool, err := InitializeSyncPool(
ctx,
pool.ProjectID(projectID),
handler.ProjectID(projectID),
metrics.PodName(env.PodName),
metrics.ContainerName(component),
[]volume.Option{
Expand All @@ -97,7 +97,7 @@ func main() {
if err != nil {
logger.Fatal("Failed to get retry sync pool", zap.Error(err))
}
if _, err := pool.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, pool.DefaultHealthCheckPort); err != nil {
if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultHealthCheckPort); err != nil {
logger.Fatal("Failed to start retry sync pool", zap.Error(err))
}

Expand Down Expand Up @@ -126,7 +126,7 @@ func poolSyncSignal(ctx context.Context, targetsUpdateCh chan struct{}) chan str
return ch
}

func buildPoolOptions(env envConfig) []pool.Option {
func buildPoolOptions(env envConfig) []handler.Option {
yolocs marked this conversation as resolved.
Show resolved Hide resolved
rs := pubsub.DefaultReceiveSettings
// If Synchronous is true, then no more than MaxOutstandingMessages will be in memory at one time.
// MaxOutstandingBytes still refers to the total bytes processed, rather than in memory.
Expand All @@ -135,15 +135,15 @@ func buildPoolOptions(env envConfig) []pool.Option {
rs.Synchronous = true
rs.MaxOutstandingMessages = env.OutstandingMessagesPerSub
rs.MaxOutstandingBytes = env.OutstandingBytesPerSub
var opts []pool.Option
var opts []handler.Option
yolocs marked this conversation as resolved.
Show resolved Hide resolved
if env.HandlerConcurrency > 0 {
opts = append(opts, pool.WithHandlerConcurrency(env.HandlerConcurrency))
opts = append(opts, handler.WithHandlerConcurrency(env.HandlerConcurrency))
rs.NumGoroutines = env.HandlerConcurrency
}
if env.TimeoutPerEvent > 0 {
opts = append(opts, pool.WithTimeoutPerEvent(env.TimeoutPerEvent))
opts = append(opts, handler.WithTimeoutPerEvent(env.TimeoutPerEvent))
}
opts = append(opts, pool.WithPubsubReceiveSettings(rs))
opts = append(opts, handler.WithPubsubReceiveSettings(rs))
// The default CeClient is good?
return opts
}
8 changes: 4 additions & 4 deletions cmd/broker/retry/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"context"

"github.com/google/knative-gcp/pkg/broker/config/volume"
"github.com/google/knative-gcp/pkg/broker/handler/pool"
"github.com/google/knative-gcp/pkg/broker/handler"
"github.com/google/knative-gcp/pkg/metrics"
"github.com/google/wire"
)
Expand All @@ -31,12 +31,12 @@ import (
// retry pool's pubsub client and uses targetsVolumeOpts to initialize the targets volume watcher.
func InitializeSyncPool(
ctx context.Context,
projectID pool.ProjectID,
projectID handler.ProjectID,
podName metrics.PodName,
containerName metrics.ContainerName,
targetsVolumeOpts []volume.Option,
opts ...pool.Option) (*pool.RetryPool, error) {
opts ...handler.Option) (*handler.RetryPool, error) {
// Implementation generated by wire. Providers for required RetryPool dependencies should be
// added here.
panic(wire.Build(pool.ProviderSet, volume.NewTargetsFromFile, metrics.NewDeliveryReporter))
panic(wire.Build(handler.ProviderSet, volume.NewTargetsFromFile, metrics.NewDeliveryReporter))
}
10 changes: 5 additions & 5 deletions cmd/broker/retry/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 21 additions & 20 deletions pkg/broker/handler/pool/fanout.go → pkg/broker/handler/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package pool
package handler

import (
"context"
Expand All @@ -29,7 +29,6 @@ import (
"knative.dev/eventing/pkg/logging"

"github.com/google/knative-gcp/pkg/broker/config"
"github.com/google/knative-gcp/pkg/broker/handler"
handlerctx "github.com/google/knative-gcp/pkg/broker/handler/context"
"github.com/google/knative-gcp/pkg/broker/handler/processors"
"github.com/google/knative-gcp/pkg/broker/handler/processors/deliver"
Expand Down Expand Up @@ -59,7 +58,7 @@ type FanoutPool struct {
}

type fanoutHandlerCache struct {
handler.Handler
Handler
b *config.Broker
}

Expand Down Expand Up @@ -153,24 +152,26 @@ func (p *FanoutPool) SyncOnce(ctx context.Context) error {
sub := p.pubsubClient.Subscription(b.DecoupleQueue.Subscription)
sub.ReceiveSettings = p.options.PubsubReceiveSettings

h := NewHandler(
sub,
processors.ChainProcessors(
&fanout.Processor{MaxConcurrency: p.options.MaxConcurrencyPerEvent, Targets: p.targets},
&filter.Processor{Targets: p.targets},
&deliver.Processor{
DeliverClient: p.deliverClient,
Targets: p.targets,
RetryOnFailure: true,
DeliverRetryClient: p.deliverRetryClient,
DeliverTimeout: p.options.DeliveryTimeout,
StatsReporter: p.statsReporter,
},
),
p.options.TimeoutPerEvent,
p.options.RetryPolicy,
)
hc := &fanoutHandlerCache{
Handler: handler.Handler{
Timeout: p.options.TimeoutPerEvent,
Subscription: sub,
Processor: processors.ChainProcessors(
&fanout.Processor{MaxConcurrency: p.options.MaxConcurrencyPerEvent, Targets: p.targets},
&filter.Processor{Targets: p.targets},
&deliver.Processor{
DeliverClient: p.deliverClient,
Targets: p.targets,
RetryOnFailure: true,
DeliverRetryClient: p.deliverRetryClient,
DeliverTimeout: p.options.DeliveryTimeout,
StatsReporter: p.statsReporter,
},
),
},
b: b,
Handler: *h,
b: b,
}

// Start the handler with broker key in context.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package pool
package handler

import (
"context"
Expand All @@ -28,7 +28,7 @@ import (

"github.com/google/knative-gcp/pkg/broker/config"
"github.com/google/knative-gcp/pkg/broker/eventutil"
pooltesting "github.com/google/knative-gcp/pkg/broker/handler/pool/testing"
pooltesting "github.com/google/knative-gcp/pkg/broker/handler/testing"
yolocs marked this conversation as resolved.
Show resolved Hide resolved
reportertest "github.com/google/knative-gcp/pkg/metrics/testing"

_ "knative.dev/pkg/metrics/testing"
Expand Down
27 changes: 26 additions & 1 deletion pkg/broker/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/google/knative-gcp/pkg/broker/handler/processors"
"github.com/google/knative-gcp/pkg/metrics"
"go.uber.org/zap"
"k8s.io/client-go/util/workqueue"
"knative.dev/eventing/pkg/logging"
)

Expand All @@ -43,11 +44,31 @@ type Handler struct {
// Timeout is the timeout for processing each individual event.
Timeout time.Duration

// retryLimiter limits how fast to retry failed events.
retryLimiter workqueue.RateLimiter
// delayNack defaults to time.Sleep; could be overriden in test.
delayNack func(time.Duration)
// cancel is function to stop pulling messages.
cancel context.CancelFunc
alive atomic.Value
}

// NewHandler creates a new Handler.
func NewHandler(
sub *pubsub.Subscription,
processor processors.Interface,
timeout time.Duration,
retryPolicy RetryPolicy,
) *Handler {
return &Handler{
Subscription: sub,
Processor: processor,
Timeout: timeout,
retryLimiter: workqueue.NewItemExponentialFailureRateLimiter(retryPolicy.MinBackoff, retryPolicy.MaxBackoff),
delayNack: time.Sleep,
}
}

// Start starts the handler.
// done func will be called if the pubsub inbound is closed.
func (h *Handler) Start(ctx context.Context, done func(error)) {
Expand Down Expand Up @@ -86,9 +107,13 @@ func (h *Handler) receive(ctx context.Context, msg *pubsub.Message) {
defer cancel()
}
if err := h.Processor.Process(ctx, event); err != nil {
logging.FromContext(ctx).Error("failed to process event", zap.Any("event", event), zap.Error(err))
backoffPeriod := h.retryLimiter.When(msg.ID)
logging.FromContext(ctx).Error("failed to process event; backoff nack", zap.Any("event", event), zap.Float64("backoffPeriod", backoffPeriod.Seconds()), zap.Error(err))
yolocs marked this conversation as resolved.
Show resolved Hide resolved
yolocs marked this conversation as resolved.
Show resolved Hide resolved
h.delayNack(backoffPeriod)
msg.Nack()
return
}

h.retryLimiter.Forget(msg.ID)
msg.Ack()
}
Loading