From a37797a6115fda37e933b0742881649a411a29ef Mon Sep 17 00:00:00 2001 From: jeff <113397187+cyberhorsey@users.noreply.github.com> Date: Wed, 20 Sep 2023 19:11:47 -0700 Subject: [PATCH] feat(relayer): queue processor Prefetch (#14765) --- packages/relayer/cmd/flags/processor.go | 8 ++++++ packages/relayer/processor/config.go | 11 +++++--- packages/relayer/processor/config_test.go | 2 ++ packages/relayer/processor/processor.go | 32 ++++++++++++----------- packages/relayer/queue/queue.go | 9 ++++--- packages/relayer/queue/rabbitmq/queue.go | 4 +++ 6 files changed, 43 insertions(+), 23 deletions(-) diff --git a/packages/relayer/cmd/flags/processor.go b/packages/relayer/cmd/flags/processor.go index dd4dcdaa811..d0c9d3654f7 100644 --- a/packages/relayer/cmd/flags/processor.go +++ b/packages/relayer/cmd/flags/processor.go @@ -91,6 +91,13 @@ var ( Category: processorCategory, Value: 3, } + QueuePrefetchCount = &cli.Uint64Flag{ + Name: "queue.prefetch", + Usage: "How many messages to prefetch", + Category: processorCategory, + Value: 1, + EnvVars: []string{"QUEUE_PREFETCH_COUNT"}, + } ) var ProcessorFlags = MergeFlags(CommonFlags, []cli.Flag{ @@ -107,4 +114,5 @@ var ProcessorFlags = MergeFlags(CommonFlags, []cli.Flag{ ProfitableOnly, BackOffRetryInterval, BackOffMaxRetrys, + QueuePrefetchCount, }) diff --git a/packages/relayer/processor/config.go b/packages/relayer/processor/config.go index f325e2f4e8b..0b6924eacfb 100644 --- a/packages/relayer/processor/config.go +++ b/packages/relayer/processor/config.go @@ -51,6 +51,7 @@ type Config struct { QueuePassword string QueueHost string QueuePort uint64 + QueuePrefetch uint64 // rpc configs SrcRPCUrl string DestRPCUrl string @@ -87,6 +88,7 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) { QueuePassword: c.String(flags.QueuePassword.Name), QueuePort: c.Uint64(flags.QueuePort.Name), QueueHost: c.String(flags.QueueHost.Name), + QueuePrefetch: c.Uint64(flags.QueuePrefetchCount.Name), SrcRPCUrl: c.String(flags.SrcRPCUrl.Name), DestRPCUrl: c.String(flags.DestRPCUrl.Name), HeaderSyncInterval: c.Uint64(flags.HeaderSyncInterval.Name), @@ -119,10 +121,11 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) { }, OpenQueueFunc: func() (queue.Queue, error) { opts := queue.NewQueueOpts{ - Username: c.String(flags.QueueUsername.Name), - Password: c.String(flags.QueuePassword.Name), - Host: c.String(flags.QueueHost.Name), - Port: c.String(flags.QueuePort.Name), + Username: c.String(flags.QueueUsername.Name), + Password: c.String(flags.QueuePassword.Name), + Host: c.String(flags.QueueHost.Name), + Port: c.String(flags.QueuePort.Name), + PrefetchCount: c.Uint64(flags.QueuePrefetchCount.Name), } q, err := rabbitmq.NewQueue(opts) diff --git a/packages/relayer/processor/config_test.go b/packages/relayer/processor/config_test.go index d23ef0c108c..97bd118554d 100644 --- a/packages/relayer/processor/config_test.go +++ b/packages/relayer/processor/config_test.go @@ -68,6 +68,7 @@ func TestNewConfigFromCliContext(t *testing.T) { assert.Equal(t, uint64(30), c.DatabaseMaxConnLifetime) assert.Equal(t, uint64(10), c.ETHClientTimeout) assert.Equal(t, true, c.ProfitableOnly) + assert.Equal(t, uint64(100), c.QueuePrefetch) c.OpenDBFunc = func() (DB, error) { return &mock.DB{}, nil @@ -110,6 +111,7 @@ func TestNewConfigFromCliContext(t *testing.T) { "-" + flags.DatabaseMaxOpenConns.Name, databaseMaxOpenConns, "-" + flags.DatabaseConnMaxLifetime.Name, databaseMaxConnLifetime, "-" + flags.ETHClientTimeout.Name, ethClientTimeout, + "-" + flags.QueuePrefetchCount.Name, "100", "-" + flags.ProfitableOnly.Name, profitableOnly, })) } diff --git a/packages/relayer/processor/processor.go b/packages/relayer/processor/processor.go index f9e2c2d2646..0128e92bbca 100644 --- a/packages/relayer/processor/processor.go +++ b/packages/relayer/processor/processor.go @@ -289,25 +289,27 @@ func (p *Processor) eventLoop(ctx context.Context) { case <-ctx.Done(): return case msg := <-p.msgCh: - err := p.processMessage(ctx, msg) - - if err != nil { - slog.Error("err processing message", "err", err.Error()) - - if errors.Is(err, errUnprocessable) { - if err := p.queue.Ack(ctx, msg); err != nil { - slog.Error("Err acking message", "err", err.Error()) + go func(msg queue.Message) { + err := p.processMessage(ctx, msg) + + if err != nil { + slog.Error("err processing message", "err", err.Error()) + + if errors.Is(err, errUnprocessable) { + if err := p.queue.Ack(ctx, msg); err != nil { + slog.Error("Err acking message", "err", err.Error()) + } + } else { + if err := p.queue.Nack(ctx, msg); err != nil { + slog.Error("Err nacking message", "err", err.Error()) + } } } else { - if err := p.queue.Nack(ctx, msg); err != nil { - slog.Error("Err nacking message", "err", err.Error()) + if err := p.queue.Ack(ctx, msg); err != nil { + slog.Error("Err acking message", "err", err.Error()) } } - } else { - if err := p.queue.Ack(ctx, msg); err != nil { - slog.Error("Err acking message", "err", err.Error()) - } - } + }(msg) } } } diff --git a/packages/relayer/queue/queue.go b/packages/relayer/queue/queue.go index 6dba516f6ea..f3f708e4dae 100644 --- a/packages/relayer/queue/queue.go +++ b/packages/relayer/queue/queue.go @@ -33,8 +33,9 @@ type Message struct { } type NewQueueOpts struct { - Username string - Password string - Host string - Port string + Username string + Password string + Host string + Port string + PrefetchCount uint64 } diff --git a/packages/relayer/queue/rabbitmq/queue.go b/packages/relayer/queue/rabbitmq/queue.go index 36a01ce92da..967b04a3e55 100644 --- a/packages/relayer/queue/rabbitmq/queue.go +++ b/packages/relayer/queue/rabbitmq/queue.go @@ -69,6 +69,10 @@ func (r *RabbitMQ) connect() error { return err } + if err := ch.Qos(int(r.opts.PrefetchCount), 0, false); err != nil { + return err + } + r.conn = conn r.ch = ch