Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Add client side backoff retries for pubsub messages #1218

Merged
merged 10 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions cmd/broker/fanout/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"cloud.google.com/go/pubsub"

"github.com/google/knative-gcp/pkg/broker/config/volume"
"github.com/google/knative-gcp/pkg/broker/handler/pool"
"github.com/google/knative-gcp/pkg/broker/handler"
metadataClient "github.com/google/knative-gcp/pkg/gclient/metadata"
"github.com/google/knative-gcp/pkg/metrics"
"github.com/google/knative-gcp/pkg/utils"
Expand Down Expand Up @@ -81,19 +81,19 @@ func main() {
syncSignal := poolSyncSignal(ctx, targetsUpdateCh)
syncPool, err := InitializeSyncPool(
ctx,
pool.ProjectID(projectID),
handler.ProjectID(projectID),
metrics.PodName(env.PodName),
metrics.ContainerName(component),
[]volume.Option{
volume.WithPath(env.TargetsConfigPath),
volume.WithNotifyChan(targetsUpdateCh),
},
buildPoolOptions(env)...,
buildHandlerOptions(env)...,
)
if err != nil {
logger.Fatal("Failed to create fanout sync pool", zap.Error(err))
}
if _, err := pool.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, pool.DefaultHealthCheckPort); err != nil {
if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultHealthCheckPort); err != nil {
logger.Fatalw("Failed to start fanout sync pool", zap.Error(err))
}

Expand Down Expand Up @@ -124,21 +124,21 @@ func poolSyncSignal(ctx context.Context, targetsUpdateCh chan struct{}) chan str
return ch
}

func buildPoolOptions(env envConfig) []pool.Option {
func buildHandlerOptions(env envConfig) []handler.Option {
rs := pubsub.DefaultReceiveSettings
var opts []pool.Option
var opts []handler.Option
if env.HandlerConcurrency > 0 {
// Let the pubsub subscription and handler have the same concurrency?
opts = append(opts, pool.WithHandlerConcurrency(env.HandlerConcurrency))
opts = append(opts, handler.WithHandlerConcurrency(env.HandlerConcurrency))
rs.NumGoroutines = env.HandlerConcurrency
}
if env.MaxConcurrencyPerEvent > 0 {
opts = append(opts, pool.WithMaxConcurrentPerEvent(env.MaxConcurrencyPerEvent))
opts = append(opts, handler.WithMaxConcurrentPerEvent(env.MaxConcurrencyPerEvent))
}
if env.TimeoutPerEvent > 0 {
opts = append(opts, pool.WithTimeoutPerEvent(env.TimeoutPerEvent))
opts = append(opts, handler.WithTimeoutPerEvent(env.TimeoutPerEvent))
}
opts = append(opts, pool.WithPubsubReceiveSettings(rs))
opts = append(opts, handler.WithPubsubReceiveSettings(rs))
// The default CeClient is good?
return opts
}
10 changes: 5 additions & 5 deletions cmd/broker/fanout/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"context"

"github.com/google/knative-gcp/pkg/broker/config/volume"
"github.com/google/knative-gcp/pkg/broker/handler/pool"
"github.com/google/knative-gcp/pkg/broker/handler"
"github.com/google/knative-gcp/pkg/metrics"
"github.com/google/wire"
)
Expand All @@ -31,13 +31,13 @@ import (
// retry pool's pubsub client and uses targetsVolumeOpts to initialize the targets volume watcher.
func InitializeSyncPool(
ctx context.Context,
projectID pool.ProjectID,
projectID handler.ProjectID,
podName metrics.PodName,
containerName metrics.ContainerName,
targetsVolumeOpts []volume.Option,
opts ...pool.Option,
) (*pool.FanoutPool, error) {
opts ...handler.Option,
) (*handler.FanoutPool, error) {
// Implementation generated by wire. Providers for required FanoutPool dependencies should be
// added here.
panic(wire.Build(pool.ProviderSet, volume.NewTargetsFromFile, metrics.NewDeliveryReporter))
panic(wire.Build(handler.ProviderSet, volume.NewTargetsFromFile, metrics.NewDeliveryReporter))
}
14 changes: 7 additions & 7 deletions cmd/broker/fanout/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 16 additions & 9 deletions cmd/broker/retry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"go.uber.org/zap"

"github.com/google/knative-gcp/pkg/broker/config/volume"
"github.com/google/knative-gcp/pkg/broker/handler/pool"
"github.com/google/knative-gcp/pkg/broker/handler"
metadataClient "github.com/google/knative-gcp/pkg/gclient/metadata"
"github.com/google/knative-gcp/pkg/metrics"
"github.com/google/knative-gcp/pkg/utils"
Expand Down Expand Up @@ -58,6 +58,9 @@ type envConfig struct {

// Max to 10m.
TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"`

MinRetryBackoff time.Duration `envconfig:"MIN_RETRY_BACKOFF" default:"1s"`
MaxRetryBackoff time.Duration `envconfig:"MAX_RETRY_BACKOFF" default:"1m"`
}

func main() {
Expand Down Expand Up @@ -86,19 +89,19 @@ func main() {
syncSignal := poolSyncSignal(ctx, targetsUpdateCh)
syncPool, err := InitializeSyncPool(
ctx,
pool.ProjectID(projectID),
handler.ProjectID(projectID),
metrics.PodName(env.PodName),
metrics.ContainerName(component),
[]volume.Option{
volume.WithPath(env.TargetsConfigPath),
volume.WithNotifyChan(targetsUpdateCh),
},
buildPoolOptions(env)...,
buildHandlerOptions(env)...,
)
if err != nil {
logger.Fatal("Failed to get retry sync pool", zap.Error(err))
}
if _, err := pool.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, pool.DefaultHealthCheckPort); err != nil {
if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultHealthCheckPort); err != nil {
logger.Fatal("Failed to start retry sync pool", zap.Error(err))
}

Expand Down Expand Up @@ -127,7 +130,7 @@ func poolSyncSignal(ctx context.Context, targetsUpdateCh chan struct{}) chan str
return ch
}

func buildPoolOptions(env envConfig) []pool.Option {
func buildHandlerOptions(env envConfig) []handler.Option {
rs := pubsub.DefaultReceiveSettings
// If Synchronous is true, then no more than MaxOutstandingMessages will be in memory at one time.
// MaxOutstandingBytes still refers to the total bytes processed, rather than in memory.
Expand All @@ -136,15 +139,19 @@ func buildPoolOptions(env envConfig) []pool.Option {
rs.Synchronous = true
rs.MaxOutstandingMessages = env.OutstandingMessagesPerSub
rs.MaxOutstandingBytes = env.OutstandingBytesPerSub
var opts []pool.Option
var opts []handler.Option
yolocs marked this conversation as resolved.
Show resolved Hide resolved
if env.HandlerConcurrency > 0 {
opts = append(opts, pool.WithHandlerConcurrency(env.HandlerConcurrency))
opts = append(opts, handler.WithHandlerConcurrency(env.HandlerConcurrency))
rs.NumGoroutines = env.HandlerConcurrency
}
if env.TimeoutPerEvent > 0 {
opts = append(opts, pool.WithTimeoutPerEvent(env.TimeoutPerEvent))
opts = append(opts, handler.WithTimeoutPerEvent(env.TimeoutPerEvent))
}
opts = append(opts, pool.WithPubsubReceiveSettings(rs))
opts = append(opts, handler.WithRetryPolicy(handler.RetryPolicy{
MinBackoff: env.MinRetryBackoff,
MaxBackoff: env.MaxRetryBackoff,
}))
opts = append(opts, handler.WithPubsubReceiveSettings(rs))
// The default CeClient is good?
return opts
}
8 changes: 4 additions & 4 deletions cmd/broker/retry/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"context"

"github.com/google/knative-gcp/pkg/broker/config/volume"
"github.com/google/knative-gcp/pkg/broker/handler/pool"
"github.com/google/knative-gcp/pkg/broker/handler"
"github.com/google/knative-gcp/pkg/metrics"
"github.com/google/wire"
)
Expand All @@ -31,12 +31,12 @@ import (
// retry pool's pubsub client and uses targetsVolumeOpts to initialize the targets volume watcher.
func InitializeSyncPool(
ctx context.Context,
projectID pool.ProjectID,
projectID handler.ProjectID,
podName metrics.PodName,
containerName metrics.ContainerName,
targetsVolumeOpts []volume.Option,
opts ...pool.Option) (*pool.RetryPool, error) {
opts ...handler.Option) (*handler.RetryPool, error) {
// Implementation generated by wire. Providers for required RetryPool dependencies should be
// added here.
panic(wire.Build(pool.ProviderSet, volume.NewTargetsFromFile, metrics.NewDeliveryReporter))
panic(wire.Build(handler.ProviderSet, volume.NewTargetsFromFile, metrics.NewDeliveryReporter))
}
10 changes: 5 additions & 5 deletions cmd/broker/retry/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 21 additions & 20 deletions pkg/broker/handler/pool/fanout.go → pkg/broker/handler/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package pool
package handler

import (
"context"
Expand All @@ -29,7 +29,6 @@ import (
"knative.dev/eventing/pkg/logging"

"github.com/google/knative-gcp/pkg/broker/config"
"github.com/google/knative-gcp/pkg/broker/handler"
handlerctx "github.com/google/knative-gcp/pkg/broker/handler/context"
"github.com/google/knative-gcp/pkg/broker/handler/processors"
"github.com/google/knative-gcp/pkg/broker/handler/processors/deliver"
Expand Down Expand Up @@ -59,7 +58,7 @@ type FanoutPool struct {
}

type fanoutHandlerCache struct {
handler.Handler
Handler
b *config.Broker
}

Expand Down Expand Up @@ -153,24 +152,26 @@ func (p *FanoutPool) SyncOnce(ctx context.Context) error {
sub := p.pubsubClient.Subscription(b.DecoupleQueue.Subscription)
sub.ReceiveSettings = p.options.PubsubReceiveSettings

h := NewHandler(
sub,
processors.ChainProcessors(
&fanout.Processor{MaxConcurrency: p.options.MaxConcurrencyPerEvent, Targets: p.targets},
&filter.Processor{Targets: p.targets},
&deliver.Processor{
DeliverClient: p.deliverClient,
Targets: p.targets,
RetryOnFailure: true,
DeliverRetryClient: p.deliverRetryClient,
DeliverTimeout: p.options.DeliveryTimeout,
StatsReporter: p.statsReporter,
},
),
p.options.TimeoutPerEvent,
p.options.RetryPolicy,
)
hc := &fanoutHandlerCache{
Handler: handler.Handler{
Timeout: p.options.TimeoutPerEvent,
Subscription: sub,
Processor: processors.ChainProcessors(
&fanout.Processor{MaxConcurrency: p.options.MaxConcurrencyPerEvent, Targets: p.targets},
&filter.Processor{Targets: p.targets},
&deliver.Processor{
DeliverClient: p.deliverClient,
Targets: p.targets,
RetryOnFailure: true,
DeliverRetryClient: p.deliverRetryClient,
DeliverTimeout: p.options.DeliveryTimeout,
StatsReporter: p.statsReporter,
},
),
},
b: b,
Handler: *h,
b: b,
}

// Start the handler with broker key in context.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package pool
package handler

import (
"context"
Expand All @@ -28,7 +28,7 @@ import (

"github.com/google/knative-gcp/pkg/broker/config"
"github.com/google/knative-gcp/pkg/broker/eventutil"
pooltesting "github.com/google/knative-gcp/pkg/broker/handler/pool/testing"
handlertesting "github.com/google/knative-gcp/pkg/broker/handler/testing"
reportertest "github.com/google/knative-gcp/pkg/metrics/testing"

_ "knative.dev/pkg/metrics/testing"
Expand All @@ -44,7 +44,7 @@ func TestFanoutWatchAndSync(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testProject := "test-project"
helper, err := pooltesting.NewHelper(ctx, testProject)
helper, err := handlertesting.NewHelper(ctx, testProject)
if err != nil {
t.Fatalf("failed to create pool testing helper: %v", err)
}
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestFanoutSyncPoolE2E(t *testing.T) {
defer cancel()
testProject := "test-project"

helper, err := pooltesting.NewHelper(ctx, testProject)
helper, err := handlertesting.NewHelper(ctx, testProject)
if err != nil {
t.Fatalf("failed to create pool testing helper: %v", err)
}
Expand Down
Loading