-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: added notifications for validators opted in #598
Changes from all commits
7da9ee2
cd12f0c
d05e107
b2516b5
23b52d2
df620b7
e5547fd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
// timings_test.go | ||
package validatorapi | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
validatorapiv1 "github.com/primev/mev-commit/p2p/gen/go/validatorapi/v1" | ||
) | ||
|
||
// SetTestTimings sets the global timing variables for testing and returns a cleanup function | ||
// that restores the previous values. | ||
func SetTestTimings(slotDuration time.Duration, epochSlots int, notifyOffset, fetchOffset time.Duration) func() { | ||
// Save the original values. | ||
origSlotDuration := SlotDuration | ||
origEpochSlots := EpochSlots | ||
origEpochDuration := EpochDuration | ||
origNotifyOffset := NotifyOffset | ||
origFetchOffset := FetchOffset | ||
|
||
// Override the globals. | ||
SlotDuration = slotDuration | ||
EpochSlots = epochSlots | ||
EpochDuration = SlotDuration * time.Duration(EpochSlots) | ||
NotifyOffset = notifyOffset | ||
FetchOffset = fetchOffset | ||
|
||
// Return a function to restore the original values. | ||
return func() { | ||
SlotDuration = origSlotDuration | ||
EpochSlots = origEpochSlots | ||
EpochDuration = origEpochDuration | ||
NotifyOffset = origNotifyOffset | ||
FetchOffset = origFetchOffset | ||
} | ||
} | ||
|
||
func (s *Service) GenesisTime() time.Time { | ||
return s.genesisTime | ||
} | ||
|
||
func (s *Service) ScheduleNotificationForSlot(epoch uint64, slot uint64, info *validatorapiv1.SlotInfo) { | ||
s.scheduleNotificationForSlot(epoch, slot, info) | ||
} | ||
|
||
func (s *Service) SetGenesisTime(genesisTime time.Time) { | ||
s.genesisTime = genesisTime | ||
} | ||
|
||
func (s *Service) SetProcessEpoch(ctx context.Context, epoch uint64) { | ||
s.processEpoch(ctx, epoch) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,11 +9,14 @@ import ( | |
"net/http" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
"github.com/ethereum/go-ethereum/accounts/abi/bind" | ||
"github.com/ethereum/go-ethereum/common/hexutil" | ||
validatoroptinrouter "github.com/primev/mev-commit/contracts-abi/clients/ValidatorOptInRouter" | ||
validatorapiv1 "github.com/primev/mev-commit/p2p/gen/go/validatorapi/v1" | ||
"github.com/primev/mev-commit/p2p/pkg/notifications" | ||
"golang.org/x/sync/errgroup" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
) | ||
|
@@ -29,20 +32,32 @@ type Service struct { | |
logger *slog.Logger | ||
metrics *metrics | ||
optsGetter func() (*bind.CallOpts, error) | ||
notifier notifications.Notifier | ||
genesisTime time.Time | ||
} | ||
|
||
var ( | ||
SlotDuration = 12 * time.Second | ||
EpochSlots = 32 | ||
EpochDuration = SlotDuration * time.Duration(EpochSlots) | ||
NotifyOffset = 1 * time.Second | ||
FetchOffset = 2 * time.Second | ||
) | ||
|
||
func NewService( | ||
apiURL string, | ||
validatorRouter ValidatorRouterContract, | ||
logger *slog.Logger, | ||
optsGetter func() (*bind.CallOpts, error), | ||
notifier notifications.Notifier, | ||
) *Service { | ||
return &Service{ | ||
apiURL: apiURL, | ||
validatorRouter: validatorRouter, | ||
logger: logger, | ||
metrics: newMetrics(), | ||
optsGetter: optsGetter, | ||
notifier: notifier, | ||
} | ||
} | ||
|
||
|
@@ -98,8 +113,8 @@ func (s *Service) fetchCurrentEpoch(ctx context.Context, epoch uint64) (uint64, | |
if epoch != 0 { | ||
return epoch, nil | ||
} | ||
|
||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.apiURL+"/eth/v1/beacon/states/head/finality_checkpoints", nil) | ||
url := fmt.Sprintf("%s/eth/v1/beacon/states/head/finality_checkpoints", s.apiURL) | ||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) | ||
if err != nil { | ||
s.logger.Error("creating request", "error", err) | ||
return 0, status.Errorf(codes.Internal, "creating request: %v", err) | ||
|
@@ -216,3 +231,160 @@ func (s *Service) processValidators(dutiesResp *ProposerDutiesResponse) (map[uin | |
|
||
return validators, nil | ||
} | ||
|
||
func (s *Service) scheduleNotificationForSlot(epoch uint64, slot uint64, info *validatorapiv1.SlotInfo) { | ||
slotStartTime := s.genesisTime.Add(time.Duration(slot) * SlotDuration) | ||
notificationTime := slotStartTime.Add(-NotifyOffset) | ||
|
||
delay := time.Until(notificationTime) | ||
if delay <= 0 { | ||
s.logger.Warn("notification time already passed for slot", "epoch", epoch, "slot", slot) | ||
return | ||
} | ||
|
||
time.AfterFunc(delay, func() { | ||
notif := notifications.NewNotification( | ||
notifications.TopicValidatorOptedIn, | ||
map[string]any{ | ||
"epoch": epoch, | ||
"slot": slot, | ||
"bls_key": info.BLSKey, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think for the clients it would be better to give the execution block number in the notification as they will bid on that. Can we somehow compute this? Or maybe some API which can provide this? |
||
}, | ||
) | ||
s.notifier.Notify(notif) | ||
s.logger.Info("sent notification for opted in validator", "epoch", epoch, "slot", slot, "bls_key", info.BLSKey) | ||
}) | ||
} | ||
|
||
func (s *Service) processEpoch(ctx context.Context, epoch uint64) { | ||
s.logger.Info("processing epoch", "epoch", epoch) | ||
|
||
dutiesResp, err := s.fetchProposerDuties(ctx, epoch) | ||
if err != nil { | ||
s.logger.Error("failed to fetch proposer duties", "epoch", epoch, "error", err) | ||
return | ||
} | ||
|
||
validators, err := s.processValidators(dutiesResp) | ||
if err != nil { | ||
s.logger.Error("failed to process validators", "epoch", epoch, "error", err) | ||
return | ||
} | ||
|
||
optedInSlots := make([]map[string]interface{}, 0) | ||
for slot, info := range validators { | ||
if info.IsOptedIn { | ||
s.scheduleNotificationForSlot(epoch, slot, info) | ||
optedInSlots = append(optedInSlots, map[string]interface{}{ | ||
"slot": slot, | ||
"bls_key": info.BLSKey, | ||
}) | ||
} | ||
} | ||
|
||
// Send epoch-level notification if there are any opted-in validators | ||
if len(optedInSlots) > 0 { | ||
notif := notifications.NewNotification( | ||
notifications.TopicEpochValidatorsOptedIn, | ||
map[string]any{ | ||
"epoch": epoch, | ||
"slots": optedInSlots, | ||
}, | ||
) | ||
s.notifier.Notify(notif) | ||
s.logger.Info("sent notification for epoch with opted in validators", | ||
"epoch", epoch, | ||
"slot_count", len(optedInSlots)) | ||
} | ||
} | ||
|
||
// Start starts a background job that fetches and processes an epoch every 384 seconds. | ||
// (384 seconds is the duration of an epoch) | ||
func (s *Service) Start(ctx context.Context) <-chan struct{} { | ||
doneChan := make(chan struct{}) | ||
|
||
genesisTime, err := s.fetchGenesisTime(ctx) | ||
if err != nil { | ||
s.logger.Error("failed to fetch genesis time", "error", err) | ||
close(doneChan) | ||
return doneChan | ||
} | ||
s.genesisTime = genesisTime | ||
s.logger.Info("initialized genesis time", "genesis_time", s.genesisTime) | ||
|
||
eg, egCtx := errgroup.WithContext(ctx) | ||
|
||
eg.Go(func() error { | ||
// Loop until the context is canceled. | ||
for { | ||
now := time.Now() | ||
elapsed := now.Sub(s.genesisTime) | ||
currentEpoch := uint64(elapsed / EpochDuration) | ||
nextEpoch := currentEpoch + 1 | ||
nextEpochStart := s.genesisTime.Add(time.Duration(nextEpoch) * EpochDuration) | ||
fetchTime := nextEpochStart.Add(-FetchOffset) | ||
delay := time.Until(fetchTime) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! |
||
if delay < 0 { | ||
delay = 0 | ||
} | ||
|
||
s.logger.Info("scheduling epoch fetch", "upcoming_epoch", nextEpoch, "fetch_in", delay, "fetch_time", fetchTime) | ||
select { | ||
case <-egCtx.Done(): | ||
s.logger.Info("epoch cron job stopped") | ||
return nil | ||
case <-time.After(delay): | ||
// Time expired: proceed to process next epoch. | ||
} | ||
|
||
s.logger.Info("fetching upcoming epoch", "epoch", nextEpoch) | ||
s.processEpoch(egCtx, nextEpoch) | ||
} | ||
}) | ||
|
||
go func() { | ||
defer close(doneChan) | ||
if err := eg.Wait(); err != nil { | ||
s.logger.Error("error in epoch cron job", "error", err) | ||
} | ||
}() | ||
|
||
return doneChan | ||
} | ||
|
||
func (s *Service) fetchGenesisTime(ctx context.Context) (time.Time, error) { | ||
url := fmt.Sprintf("%s/eth/v1/beacon/genesis", s.apiURL) | ||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) | ||
if err != nil { | ||
s.logger.Error("creating genesis request", "error", err) | ||
return time.Time{}, err | ||
} | ||
req.Header.Set("accept", "application/json") | ||
resp, err := http.DefaultClient.Do(req) | ||
if err != nil { | ||
s.logger.Error("making genesis request", "error", err) | ||
return time.Time{}, err | ||
} | ||
defer resp.Body.Close() | ||
|
||
if resp.StatusCode != http.StatusOK { | ||
s.logger.Error("unexpected status code for genesis", "status", resp.StatusCode) | ||
return time.Time{}, fmt.Errorf("unexpected status code: %v", resp.StatusCode) | ||
} | ||
|
||
var genesisResp struct { | ||
Data struct { | ||
GenesisTime string `json:"genesis_time"` | ||
} `json:"data"` | ||
} | ||
if err := json.NewDecoder(resp.Body).Decode(&genesisResp); err != nil { | ||
s.logger.Error("decoding genesis response", "error", err) | ||
return time.Time{}, err | ||
} | ||
genesisTimeInt, err := strconv.ParseInt(genesisResp.Data.GenesisTime, 10, 64) | ||
if err != nil { | ||
s.logger.Error("parsing genesis time", "error", err) | ||
return time.Time{}, err | ||
} | ||
return time.Unix(genesisTimeInt, 0), nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think we need this delay. We already have the delay to query the duties. I think we can send this as soon as we get the info. In case of the notifications, users will be always subscribed ideally. So sooner they get it, sooner they can decide to bid.
If they miss this, they should be able to query the get validators endpoint right?