Skip to content

Commit

Permalink
Simple delete queue for mercury transmitter (#11182)
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Nov 16, 2023
1 parent 0860d26 commit f60efe0
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 6 deletions.
85 changes: 79 additions & 6 deletions core/services/relay/evm/mercury/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

var (
maxTransmitQueueSize = 10_000
maxDeleteQueueSize = 10_000
transmitTimeout = 5 * time.Second
)

Expand Down Expand Up @@ -59,6 +60,24 @@ var (
},
[]string{"feedID"},
)
transmitQueueDeleteErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mercury_transmit_queue_delete_error_count",
Help: "Running count of DB errors when trying to delete an item from the queue DB",
},
[]string{"feedID"},
)
transmitQueueInsertErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mercury_transmit_queue_insert_error_count",
Help: "Running count of DB errors when trying to insert an item into the queue DB",
},
[]string{"feedID"},
)
transmitQueuePushErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mercury_transmit_queue_push_error_count",
Help: "Running count of DB errors when trying to push an item onto the queue",
},
[]string{"feedID"},
)
transmitServerErrorCount = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "mercury_transmit_server_error_count",
Help: "Number of errored transmissions that failed due to an error returned by the mercury server",
Expand Down Expand Up @@ -98,9 +117,14 @@ type mercuryTransmitter struct {
queue *TransmitQueue
wg sync.WaitGroup

transmitSuccessCount prometheus.Counter
transmitDuplicateCount prometheus.Counter
transmitConnectionErrorCount prometheus.Counter
deleteQueue chan *pb.TransmitRequest

transmitSuccessCount prometheus.Counter
transmitDuplicateCount prometheus.Counter
transmitConnectionErrorCount prometheus.Counter
transmitQueueDeleteErrorCount prometheus.Counter
transmitQueueInsertErrorCount prometheus.Counter
transmitQueuePushErrorCount prometheus.Counter
}

var PayloadTypes = getPayloadTypes()
Expand Down Expand Up @@ -138,9 +162,13 @@ func NewTransmitter(lggr logger.Logger, cfgTracker ConfigTracker, rpcClient wsrp
make(chan (struct{})),
nil,
sync.WaitGroup{},
make(chan *pb.TransmitRequest, maxDeleteQueueSize),
transmitSuccessCount.WithLabelValues(feedIDHex),
transmitDuplicateCount.WithLabelValues(feedIDHex),
transmitConnectionErrorCount.WithLabelValues(feedIDHex),
transmitQueueDeleteErrorCount.WithLabelValues(feedIDHex),
transmitQueueInsertErrorCount.WithLabelValues(feedIDHex),
transmitQueuePushErrorCount.WithLabelValues(feedIDHex),
}
}

Expand All @@ -163,6 +191,8 @@ func (mt *mercuryTransmitter) Start(ctx context.Context) (err error) {
return err
}
mt.wg.Add(1)
go mt.runDeleteQueueLoop()
mt.wg.Add(1)
go mt.runQueueLoop()
return nil
})
Expand Down Expand Up @@ -193,6 +223,46 @@ func (mt *mercuryTransmitter) HealthReport() map[string]error {
return report
}

func (mt *mercuryTransmitter) runDeleteQueueLoop() {
defer mt.wg.Done()
runloopCtx, cancel := mt.stopCh.Ctx(context.Background())
defer cancel()

// Exponential backoff for very rarely occurring errors (DB disconnect etc)
b := backoff.Backoff{
Min: 1 * time.Second,
Max: 120 * time.Second,
Factor: 2,
Jitter: true,
}

for {
select {
case req := <-mt.deleteQueue:
for {
if err := mt.persistenceManager.Delete(runloopCtx, req); err != nil {
mt.lggr.Errorw("Failed to delete transmit request record", "error", err, "req", req)
mt.transmitQueueDeleteErrorCount.Inc()
select {
case <-time.After(b.Duration()):
// Wait a backoff duration before trying to delete again
continue
case <-mt.stopCh:
// abort and return immediately on stop even if items remain in queue
return
}
}
break
}
// success
b.Reset()
case <-mt.stopCh:
// abort and return immediately on stop even if items remain in queue
return
}
}
}

func (mt *mercuryTransmitter) runQueueLoop() {
defer mt.wg.Done()
// Exponential backoff with very short retry interval (since latency is a priority)
Expand Down Expand Up @@ -254,9 +324,10 @@ func (mt *mercuryTransmitter) runQueueLoop() {
}
}

if err := mt.persistenceManager.Delete(runloopCtx, t.Req); err != nil {
mt.lggr.Errorw("Failed to delete transmit request record", "error", err, "reportCtx", t.ReportCtx)
return
select {
case mt.deleteQueue <- t.Req:
default:
mt.lggr.Criticalw("Delete queue is full", "reportCtx", t.ReportCtx)
}
}
}
Expand Down Expand Up @@ -289,9 +360,11 @@ func (mt *mercuryTransmitter) Transmit(ctx context.Context, reportCtx ocrtypes.R
mt.lggr.Tracew("Transmit enqueue", "req", req, "report", report, "reportCtx", reportCtx, "signatures", signatures)

if err := mt.persistenceManager.Insert(ctx, req, reportCtx); err != nil {
mt.transmitQueueInsertErrorCount.Inc()
return err
}
if ok := mt.queue.Push(req, reportCtx); !ok {
mt.transmitQueuePushErrorCount.Inc()
return errors.New("transmit queue is closed")
}
return nil
Expand Down
25 changes: 25 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,31 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [dev]

### Added

- Added a new, optional WebServer authentication option that supports LDAP as a user identity provider. This enables user login access and user roles to be managed and provisioned via a centralized remote server that supports the LDAP protocol, which can be helpful when running multiple nodes. See the documentation for more information and config setup instructions. There is a new `[WebServer].AuthenticationMethod` config option, when set to `ldap` requires the new `[WebServer.LDAP]` config section to be defined, see the reference `docs/core.toml`.
- New prom metrics for mercury:
`mercury_transmit_queue_delete_error_count`
`mercury_transmit_queue_insert_error_count`
`mercury_transmit_queue_push_error_count`
Nops should consider alerting on these.


### Changed

- `L2Suggested` mode is now called `SuggestedPrice`

### Removed

- Removed `Optimism2` as a supported gas estimator mode

### Added

- Mercury v0.2 has improved consensus around current block that uses the most recent 5 blocks instead of only the latest one
- Two new prom metrics for mercury, nops should consider adding alerting on these:
- `mercury_insufficient_blocks_count`
- `mercury_zero_blocks_count`

...

## 2.6.0 - UNRELEASED
Expand Down

0 comments on commit f60efe0

Please sign in to comment.