Skip to content

Commit

Permalink
feat(relayer): queue processor Prefetch (#14765)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberhorsey authored Sep 21, 2023
1 parent fa2e842 commit a37797a
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 23 deletions.
8 changes: 8 additions & 0 deletions packages/relayer/cmd/flags/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ var (
Category: processorCategory,
Value: 3,
}
QueuePrefetchCount = &cli.Uint64Flag{
Name: "queue.prefetch",
Usage: "How many messages to prefetch",
Category: processorCategory,
Value: 1,
EnvVars: []string{"QUEUE_PREFETCH_COUNT"},
}
)

var ProcessorFlags = MergeFlags(CommonFlags, []cli.Flag{
Expand All @@ -107,4 +114,5 @@ var ProcessorFlags = MergeFlags(CommonFlags, []cli.Flag{
ProfitableOnly,
BackOffRetryInterval,
BackOffMaxRetrys,
QueuePrefetchCount,
})
11 changes: 7 additions & 4 deletions packages/relayer/processor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Config struct {
QueuePassword string
QueueHost string
QueuePort uint64
QueuePrefetch uint64
// rpc configs
SrcRPCUrl string
DestRPCUrl string
Expand Down Expand Up @@ -87,6 +88,7 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
QueuePassword: c.String(flags.QueuePassword.Name),
QueuePort: c.Uint64(flags.QueuePort.Name),
QueueHost: c.String(flags.QueueHost.Name),
QueuePrefetch: c.Uint64(flags.QueuePrefetchCount.Name),
SrcRPCUrl: c.String(flags.SrcRPCUrl.Name),
DestRPCUrl: c.String(flags.DestRPCUrl.Name),
HeaderSyncInterval: c.Uint64(flags.HeaderSyncInterval.Name),
Expand Down Expand Up @@ -119,10 +121,11 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
},
OpenQueueFunc: func() (queue.Queue, error) {
opts := queue.NewQueueOpts{
Username: c.String(flags.QueueUsername.Name),
Password: c.String(flags.QueuePassword.Name),
Host: c.String(flags.QueueHost.Name),
Port: c.String(flags.QueuePort.Name),
Username: c.String(flags.QueueUsername.Name),
Password: c.String(flags.QueuePassword.Name),
Host: c.String(flags.QueueHost.Name),
Port: c.String(flags.QueuePort.Name),
PrefetchCount: c.Uint64(flags.QueuePrefetchCount.Name),
}

q, err := rabbitmq.NewQueue(opts)
Expand Down
2 changes: 2 additions & 0 deletions packages/relayer/processor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func TestNewConfigFromCliContext(t *testing.T) {
assert.Equal(t, uint64(30), c.DatabaseMaxConnLifetime)
assert.Equal(t, uint64(10), c.ETHClientTimeout)
assert.Equal(t, true, c.ProfitableOnly)
assert.Equal(t, uint64(100), c.QueuePrefetch)

c.OpenDBFunc = func() (DB, error) {
return &mock.DB{}, nil
Expand Down Expand Up @@ -110,6 +111,7 @@ func TestNewConfigFromCliContext(t *testing.T) {
"-" + flags.DatabaseMaxOpenConns.Name, databaseMaxOpenConns,
"-" + flags.DatabaseConnMaxLifetime.Name, databaseMaxConnLifetime,
"-" + flags.ETHClientTimeout.Name, ethClientTimeout,
"-" + flags.QueuePrefetchCount.Name, "100",
"-" + flags.ProfitableOnly.Name, profitableOnly,
}))
}
Expand Down
32 changes: 17 additions & 15 deletions packages/relayer/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,25 +289,27 @@ func (p *Processor) eventLoop(ctx context.Context) {
case <-ctx.Done():
return
case msg := <-p.msgCh:
err := p.processMessage(ctx, msg)

if err != nil {
slog.Error("err processing message", "err", err.Error())

if errors.Is(err, errUnprocessable) {
if err := p.queue.Ack(ctx, msg); err != nil {
slog.Error("Err acking message", "err", err.Error())
go func(msg queue.Message) {
err := p.processMessage(ctx, msg)

if err != nil {
slog.Error("err processing message", "err", err.Error())

if errors.Is(err, errUnprocessable) {
if err := p.queue.Ack(ctx, msg); err != nil {
slog.Error("Err acking message", "err", err.Error())
}
} else {
if err := p.queue.Nack(ctx, msg); err != nil {
slog.Error("Err nacking message", "err", err.Error())
}
}
} else {
if err := p.queue.Nack(ctx, msg); err != nil {
slog.Error("Err nacking message", "err", err.Error())
if err := p.queue.Ack(ctx, msg); err != nil {
slog.Error("Err acking message", "err", err.Error())
}
}
} else {
if err := p.queue.Ack(ctx, msg); err != nil {
slog.Error("Err acking message", "err", err.Error())
}
}
}(msg)
}
}
}
9 changes: 5 additions & 4 deletions packages/relayer/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ type Message struct {
}

type NewQueueOpts struct {
Username string
Password string
Host string
Port string
Username string
Password string
Host string
Port string
PrefetchCount uint64
}
4 changes: 4 additions & 0 deletions packages/relayer/queue/rabbitmq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (r *RabbitMQ) connect() error {
return err
}

if err := ch.Qos(int(r.opts.PrefetchCount), 0, false); err != nil {
return err
}

r.conn = conn
r.ch = ch

Expand Down

0 comments on commit a37797a

Please sign in to comment.