-
Notifications
You must be signed in to change notification settings - Fork 497
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use channels in redisCoordinator to accept push-updates to redis instead of launching goroutines #2966
base: master
Are you sure you want to change the base?
Conversation
timeboost/redis_coordinator.go
Outdated
for { | ||
var roundSeqUpdate roundSeqUpdateItem | ||
select { | ||
case roundSeqUpdate = <-rc.roundSeqUpdateChan: | ||
if roundSeqUpdate.round < rc.roundTimingInfo.RoundNumber() { | ||
// This prevents stale roundSeqUpdates from being written to redis and unclogs roundSeqUpdateChan | ||
continue | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're inserting in a fifo fashion to roundSeqUpdateChan, could we pull everything off the queue first before we do anything else? I put a code example for how to do this in the comment about the other puller loop. The final update should be the only one we care about. That way we don't have to have the redis write in the loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have concerns over batching redis push updates, i.e, primarily we have redis to ensure smooth transition during a sequencer crash/graceful handover in which case its important that we have as many new messages as possible to be quickly written to redis! And batching would mean either we end up writing all updates or none.
So in case of a failure in pushing a batch update, and this happens to be during a sequencer switch (most likely the case, because if its not then redis is anyway not up-to-date), we lose the purpose of redis altogether!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since, round sequence updates aren't so critical, i think its a good idea to batch them, i.e to keep pulling from roundSeqUpdateChan like upto 5 updates or so
rc.LaunchThread(rc.trackSequenceCountUpdates) | ||
rc.LaunchThread(rc.trackAcceptedTxAddition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these should be made to respond to context cancelations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, they do currently respond to context cancellation through
case <-ctx.Done():
return
} | ||
return nil | ||
} | ||
|
||
func (rc *RedisCoordinator) trackAcceptedTxAddition(ctx context.Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment as for the other channel pulling loop function, except to also add redis batching.
Pull as much as you can from the channel:
for {
// Blocking receive to wait for at least one message
msg := <-c.msgChan
batch = append(batch, msg)
// Non-blocking receives to drain the channel
for {
select {
case msg, ok := <-c.msgChan:
if !ok {
// Channel closed, handle accordingly
return
}
batch = append(batch, msg)
default:
// No more messages, break the loop
goto processBatch
}
}
It should probably also break out after some max batch size, can you pick a good max batch size?
Submit it as a batch:
var multi redis.Pipeliner
for _, submission := range submissions {
jsonSubmission, err := submission.ToJson()
if err != nil {
return err
}
submissionJson, err := json.Marshal(jsonSubmission)
if err != nil {
return err
}
key := fmt.Sprintf("timeboost:express_lane_submission:%s:%d", submission.AuctionContractAddress.Hex(), submission.Round)
multi.RPush(ctx, key, submissionJson)
}
return multi.Exec(ctx).Err()
timeboost/redis_coordinator.go
Outdated
@@ -21,30 +19,42 @@ import ( | |||
const EXPRESS_LANE_ROUND_SEQUENCE_KEY_PREFIX string = "expressLane.roundSequence." // Only written by sequencer holding CHOSEN (seqCoordinator) key | |||
const EXPRESS_LANE_ACCEPTED_TX_KEY_PREFIX string = "expressLane.acceptedTx." // Only written by sequencer holding CHOSEN (seqCoordinator) key | |||
|
|||
const UpdateEventsChannelSize int = 50 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should make this bigger, let's add a zero. TPS on arb1 spikes to over 100 so we should add more headroom. It will use more memory since bounded channels are implemented as a fixed size circular buffer in go, but I think it's negligible. I also have added other comments about how to process these queues faster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, i'll make it 500
This PR makes that the push calls to timeboost redisCoordinator are handled using channels, instead of launching thread-safe go-routines. Thus greatly reducing the number of parallel go-routines launched and guaranteeing all push-updates will be addressed to in the mean time.
In extreme cases when the update (buffered) channels get clogged, we use the roundTimingInfo of rediscoordinator to clear up the stale messages.
Resolves NIT-3110