Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use channels in redisCoordinator to accept push-updates to redis instead of launching goroutines #2966

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions execution/gethexec/express_lane_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pending:

var redisCoordinator *timeboost.RedisCoordinator
if seqConfig().Dangerous.Timeboost.RedisUrl != "" {
redisCoordinator, err = timeboost.NewRedisCoordinator(seqConfig().Dangerous.Timeboost.RedisUrl, roundTimingInfo.Round)
redisCoordinator, err = timeboost.NewRedisCoordinator(seqConfig().Dangerous.Timeboost.RedisUrl, roundTimingInfo)
if err != nil {
return nil, fmt.Errorf("error initializing expressLaneService redis: %w", err)
}
Expand Down Expand Up @@ -311,9 +311,11 @@ func (es *expressLaneService) currentRoundHasController() bool {

// sequenceExpressLaneSubmission with the roundInfo lock held, validates sequence number and sender address fields of the message
// adds the message to the transaction queue and waits for the response
// If sequenceOnly is set, then we do not wait for the tx result
func (es *expressLaneService) sequenceExpressLaneSubmission(
ctx context.Context,
msg *timeboost.ExpressLaneSubmission,
sequenceOnly bool,
) error {
unlockByDefer := true
es.roundInfoMutex.Lock()
Expand Down Expand Up @@ -384,12 +386,10 @@ func (es *expressLaneService) sequenceExpressLaneSubmission(
roundInfo.msgAndResultBySequenceNumber[msg.SequenceNumber] = &msgAndResult{msg, resultChan}

if es.redisCoordinator != nil {
es.LaunchThread(func(context.Context) {
// Persist accepted expressLane txs to redis
if err := es.redisCoordinator.AddAcceptedTx(msg); err != nil {
log.Error("Error adding accepted ExpressLaneSubmission to redis. Loss of msg possible if sequencer switch happens", "seqNum", msg.SequenceNumber, "txHash", msg.Transaction.Hash(), "err", err)
}
})
// Persist accepted expressLane txs to redis
if err := es.redisCoordinator.AddAcceptedTx(msg); err != nil {
log.Error("Error adding accepted ExpressLaneSubmission to redis. Loss of msg possible if sequencer switch happens", "seqNum", msg.SequenceNumber, "txHash", msg.Transaction.Hash(), "err", err)
}
}

now := time.Now()
Expand All @@ -407,7 +407,9 @@ func (es *expressLaneService) sequenceExpressLaneSubmission(
queueCtx, _ = ctxWithTimeout(es.GetContext(), queueTimeout)
if nextMsgAndResult.msg.SequenceNumber == msg.SequenceNumber {
queueCtx, cancel = ctxWithTimeout(ctx, queueTimeout)
defer cancel()
if !sequenceOnly {
defer cancel()
}
}
es.transactionPublisher.PublishTimeboostedTransaction(queueCtx, nextMsgAndResult.msg.Transaction, nextMsgAndResult.msg.Options, nextMsgAndResult.resultChan)
// Increase the global round sequence number.
Expand All @@ -419,6 +421,10 @@ func (es *expressLaneService) sequenceExpressLaneSubmission(
unlockByDefer = false
es.roundInfoMutex.Unlock() // Release lock so that other timeboost txs can be processed

if sequenceOnly {
return nil
}

abortCtx, cancel := ctxWithTimeout(ctx, queueTimeout*2) // We use the same timeout value that sequencer imposes
defer cancel()
select {
Expand All @@ -431,14 +437,12 @@ func (es *expressLaneService) sequenceExpressLaneSubmission(
}

if es.redisCoordinator != nil {
es.LaunchThread(func(context.Context) {
// We update the sequence count in redis only after receiving a result for sequencing this message, instead of updating while holding roundInfoMutex,
// because this prevents any loss of transactions when the prev chosen sequencer updates the count but some how fails to forward txs to the current chosen.
// If the prev chosen ends up forwarding the tx, it is ok as the duplicate txs will be discarded
if redisErr := es.redisCoordinator.UpdateSequenceCount(msg.Round, seqCount); redisErr != nil {
log.Error("Error updating round's sequence count in redis", "err", redisErr) // this shouldn't be a problem if future msgs succeed in updating the count
}
})
// We update the sequence count in redis only after receiving a result for sequencing this message, instead of updating while holding roundInfoMutex,
// because this prevents any loss of transactions when the prev chosen sequencer updates the count but some how fails to forward txs to the current chosen.
// If the prev chosen ends up forwarding the tx, it is ok as the duplicate txs will be discarded
if redisErr := es.redisCoordinator.UpdateSequenceCount(msg.Round, seqCount); redisErr != nil {
log.Error("Error updating round's sequence count in redis", "err", redisErr) // this shouldn't be a problem if future msgs succeed in updating the count
}
}

if err != nil {
Expand Down Expand Up @@ -514,10 +518,8 @@ func (es *expressLaneService) syncFromRedis() {
pendingMsgs := es.redisCoordinator.GetAcceptedTxs(currentRound, sequenceCount, sequenceCount+es.seqConfig().Dangerous.Timeboost.MaxFutureSequenceDistance)
log.Info("Attempting to sequence pending expressLane transactions from redis", "count", len(pendingMsgs))
for _, msg := range pendingMsgs {
es.LaunchThread(func(ctx context.Context) {
if err := es.sequenceExpressLaneSubmission(ctx, msg); err != nil {
log.Error("Untracked expressLaneSubmission returned an error", "round", msg.Round, "seqNum", msg.SequenceNumber, "txHash", msg.Transaction.Hash(), "err", err)
}
})
if err := es.sequenceExpressLaneSubmission(es.GetContext(), msg, true); err != nil {
log.Error("Untracked expressLaneSubmission returned an error while only-sequencing", "round", msg.Round, "seqNum", msg.SequenceNumber, "txHash", msg.Transaction.Hash(), "err", err)
}
}
}
43 changes: 23 additions & 20 deletions execution/gethexec/express_lane_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,21 +300,22 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_nonceTooLow(t *testin
els.transactionPublisher = stubPublisher

msg := buildValidSubmissionWithSeqAndTx(t, 0, 0, emptyTx)
err := els.sequenceExpressLaneSubmission(ctx, msg)
err := els.sequenceExpressLaneSubmission(ctx, msg, false)
require.ErrorIs(t, err, timeboost.ErrSequenceNumberTooLow)
}

func Test_expressLaneService_sequenceExpressLaneSubmission_duplicateNonce(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
redisUrl := redisutil.CreateTestRedis(ctx, t)
timingInfo := defaultTestRoundTimingInfo(time.Now())
els := &expressLaneService{
roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8),
roundTimingInfo: defaultTestRoundTimingInfo(time.Now()),
roundTimingInfo: timingInfo,
seqConfig: func() *SequencerConfig { return &DefaultSequencerConfig },
}
var err error
els.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, els.roundTimingInfo.Round)
els.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, &timingInfo)
require.NoError(t, err)
els.redisCoordinator.Start(ctx)
els.roundInfo.Add(0, &expressLaneRoundInfo{1, make(map[uint64]*msgAndResult)})
Expand All @@ -330,12 +331,12 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_duplicateNonce(t *tes
var err1, err2 error
go func(w *sync.WaitGroup) {
w.Done()
err1 = els.sequenceExpressLaneSubmission(ctx, msg1)
err1 = els.sequenceExpressLaneSubmission(ctx, msg1, false)
wg.Done()
}(&wg)
go func(w *sync.WaitGroup) {
w.Done()
err2 = els.sequenceExpressLaneSubmission(ctx, msg2)
err2 = els.sequenceExpressLaneSubmission(ctx, msg2, false)
wg.Done()
}(&wg)
wg.Wait()
Expand All @@ -354,13 +355,14 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
redisUrl := redisutil.CreateTestRedis(ctx, t)
timingInfo := defaultTestRoundTimingInfo(time.Now())
els := &expressLaneService{
roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8),
roundTimingInfo: defaultTestRoundTimingInfo(time.Now()),
roundTimingInfo: timingInfo,
seqConfig: func() *SequencerConfig { return &DefaultSequencerConfig },
}
var err error
els.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, els.roundTimingInfo.Round)
els.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, &timingInfo)
require.NoError(t, err)
els.redisCoordinator.Start(ctx)
els.roundInfo.Add(0, &expressLaneRoundInfo{1, make(map[uint64]*msgAndResult)})
Expand All @@ -383,7 +385,7 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing
for _, msg := range messages {
go func(w *sync.WaitGroup) {
w.Done()
err := els.sequenceExpressLaneSubmission(ctx, msg)
err := els.sequenceExpressLaneSubmission(ctx, msg, false)
if msg.SequenceNumber != 10 { // Because this go-routine will be interrupted after the test itself ends and 10 will still be waiting for result
require.NoError(t, err)
w.Done()
Expand All @@ -401,7 +403,7 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_outOfOrder(t *testing
els.roundInfoMutex.Unlock()

wg.Add(2) // 4 & 5 should be able to get in after 3 so we add a delta of 2
err = els.sequenceExpressLaneSubmission(ctx, buildValidSubmissionWithSeqAndTx(t, 0, 3, emptyTx))
err = els.sequenceExpressLaneSubmission(ctx, buildValidSubmissionWithSeqAndTx(t, 0, 3, emptyTx), false)
require.NoError(t, err)
wg.Wait()
require.Equal(t, 5, len(stubPublisher.publishedTxOrder))
Expand All @@ -411,13 +413,14 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_erroredTx(t *testing.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
redisUrl := redisutil.CreateTestRedis(ctx, t)
timingInfo := defaultTestRoundTimingInfo(time.Now())
els := &expressLaneService{
roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8),
roundTimingInfo: defaultTestRoundTimingInfo(time.Now()),
roundTimingInfo: timingInfo,
seqConfig: func() *SequencerConfig { return &SequencerConfig{} },
}
var err error
els.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, els.roundTimingInfo.Round)
els.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, &timingInfo)
require.NoError(t, err)
els.redisCoordinator.Start(ctx)
els.roundInfo.Add(0, &expressLaneRoundInfo{1, make(map[uint64]*msgAndResult)})
Expand All @@ -434,10 +437,10 @@ func Test_expressLaneService_sequenceExpressLaneSubmission_erroredTx(t *testing.
}
for _, msg := range messages {
if msg.Transaction.Hash() != emptyTx.Hash() {
err := els.sequenceExpressLaneSubmission(ctx, msg)
err := els.sequenceExpressLaneSubmission(ctx, msg, false)
require.ErrorContains(t, err, "oops, bad tx")
} else {
err := els.sequenceExpressLaneSubmission(ctx, msg)
err := els.sequenceExpressLaneSubmission(ctx, msg, false)
require.NoError(t, err)
}
}
Expand All @@ -451,13 +454,14 @@ func Test_expressLaneService_syncFromRedis(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
redisUrl := redisutil.CreateTestRedis(ctx, t)
timingInfo := defaultTestRoundTimingInfo(time.Now())
els1 := &expressLaneService{
roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8),
roundTimingInfo: defaultTestRoundTimingInfo(time.Now()),
roundTimingInfo: timingInfo,
seqConfig: func() *SequencerConfig { return &DefaultSequencerConfig },
}
var err error
els1.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, els1.roundTimingInfo.Round)
els1.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, &timingInfo)
require.NoError(t, err)
els1.redisCoordinator.Start(ctx)

Expand All @@ -480,7 +484,7 @@ func Test_expressLaneService_syncFromRedis(t *testing.T) {
for _, msg := range messages {
go func(w *sync.WaitGroup) {
w.Done()
_ = els1.sequenceExpressLaneSubmission(ctx, msg)
_ = els1.sequenceExpressLaneSubmission(ctx, msg, false)
if msg.SequenceNumber == 1 {
w.Done()
}
Expand All @@ -495,10 +499,10 @@ func Test_expressLaneService_syncFromRedis(t *testing.T) {

els2 := &expressLaneService{
roundInfo: containers.NewLruCache[uint64, *expressLaneRoundInfo](8),
roundTimingInfo: defaultTestRoundTimingInfo(time.Now()),
roundTimingInfo: timingInfo,
seqConfig: func() *SequencerConfig { return &DefaultSequencerConfig },
}
els2.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, els2.roundTimingInfo.Round)
els2.redisCoordinator, err = timeboost.NewRedisCoordinator(redisUrl, &timingInfo)
require.NoError(t, err)
els2.redisCoordinator.Start(ctx)

Expand All @@ -509,7 +513,6 @@ func Test_expressLaneService_syncFromRedis(t *testing.T) {

// As els2 becomes an active sequencer, syncFromRedis would be called when Activate() function of sequencer is invoked
els2.syncFromRedis()
time.Sleep(time.Second) // wait for parallel sequencing of redis txs to complete

els2.roundInfoMutex.Lock()
roundInfo, exists := els2.roundInfo.Get(0)
Expand All @@ -524,7 +527,7 @@ func Test_expressLaneService_syncFromRedis(t *testing.T) {
}
els2.roundInfoMutex.Unlock()

err = els2.sequenceExpressLaneSubmission(ctx, buildValidSubmissionWithSeqAndTx(t, 0, 2, emptyTx)) // Send an unblocking tx
err = els2.sequenceExpressLaneSubmission(ctx, buildValidSubmissionWithSeqAndTx(t, 0, 2, emptyTx), false) // Send an unblocking tx
require.NoError(t, err)

time.Sleep(time.Second) // wait for future seq num txs to be processed
Expand Down
9 changes: 7 additions & 2 deletions execution/gethexec/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func (s *Sequencer) PublishExpressLaneTransaction(ctx context.Context, msg *time
return forwarder.PublishExpressLaneTransaction(ctx, msg)
}

return s.expressLaneService.sequenceExpressLaneSubmission(ctx, msg)
return s.expressLaneService.sequenceExpressLaneSubmission(ctx, msg, false)
}

func (s *Sequencer) PublishTimeboostedTransaction(queueCtx context.Context, tx *types.Transaction, options *arbitrum_types.ConditionalOptions, resultChan chan error) {
Expand Down Expand Up @@ -1461,7 +1461,12 @@ func (s *Sequencer) StopAndWait() {
wg.Add(1)
go func() {
defer wg.Done()
err := forwarder.PublishTransaction(item.ctx, item.tx, item.options)
var err error
if source == "timeboostAuctionResolutionTxQueue" {
err = forwarder.PublishAuctionResolutionTransaction(item.ctx, item.tx)
} else {
err = forwarder.PublishTransaction(item.ctx, item.tx, item.options)
}
if err != nil {
log.Warn("failed to forward transaction while shutting down", "source", source, "err", err)
}
Expand Down
Loading
Loading