Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added notifications for validators opted in #598

Merged
merged 7 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion p2p/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,20 @@ func NewNode(opts *Options) (*Node, error) {
validatorRouterCaller,
opts.Logger.With("component", "validatorapi"),
callOptsGetter,
notificationsSvc,
)
if err != nil {
opts.Logger.Error("failed to create validator api", "error", err)
return nil, err
}
validatorapiv1.RegisterValidatorServer(grpcServer, validatorAPI)

startables = append(
startables,
StartableObjWithDesc{
Desc: "validators",
Startable: validatorAPI,
},
)
blocksPerWindow := bpwBigInt.Uint64()

switch opts.PeerType {
Expand Down
6 changes: 4 additions & 2 deletions p2p/pkg/notifications/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
type Topic string

const (
TopicPeerConnected Topic = "peer_connected"
TopicPeerDisconnected Topic = "peer_disconnected"
TopicPeerConnected Topic = "peer_connected"
TopicPeerDisconnected Topic = "peer_disconnected"
TopicValidatorOptedIn Topic = "validator_opted_in"
TopicEpochValidatorsOptedIn Topic = "epoch_validators_opted_in"
)

func IsTopicValid(topic Topic) bool {
Expand Down
52 changes: 52 additions & 0 deletions p2p/pkg/rpc/validator/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// timings_test.go
package validatorapi

import (
"context"
"time"

validatorapiv1 "github.com/primev/mev-commit/p2p/gen/go/validatorapi/v1"
)

// SetTestTimings sets the global timing variables for testing and returns a cleanup function
// that restores the previous values.
func SetTestTimings(slotDuration time.Duration, epochSlots int, notifyOffset, fetchOffset time.Duration) func() {
// Save the original values.
origSlotDuration := SlotDuration
origEpochSlots := EpochSlots
origEpochDuration := EpochDuration
origNotifyOffset := NotifyOffset
origFetchOffset := FetchOffset

// Override the globals.
SlotDuration = slotDuration
EpochSlots = epochSlots
EpochDuration = SlotDuration * time.Duration(EpochSlots)
NotifyOffset = notifyOffset
FetchOffset = fetchOffset

// Return a function to restore the original values.
return func() {
SlotDuration = origSlotDuration
EpochSlots = origEpochSlots
EpochDuration = origEpochDuration
NotifyOffset = origNotifyOffset
FetchOffset = origFetchOffset
}
}

func (s *Service) GenesisTime() time.Time {
return s.genesisTime
}

func (s *Service) ScheduleNotificationForSlot(epoch uint64, slot uint64, info *validatorapiv1.SlotInfo) {
s.scheduleNotificationForSlot(epoch, slot, info)
}

func (s *Service) SetGenesisTime(genesisTime time.Time) {
s.genesisTime = genesisTime
}

func (s *Service) SetProcessEpoch(ctx context.Context, epoch uint64) {
s.processEpoch(ctx, epoch)
}
176 changes: 174 additions & 2 deletions p2p/pkg/rpc/validator/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ import (
"net/http"
"strconv"
"strings"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common/hexutil"
validatoroptinrouter "github.com/primev/mev-commit/contracts-abi/clients/ValidatorOptInRouter"
validatorapiv1 "github.com/primev/mev-commit/p2p/gen/go/validatorapi/v1"
"github.com/primev/mev-commit/p2p/pkg/notifications"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand All @@ -29,20 +32,32 @@ type Service struct {
logger *slog.Logger
metrics *metrics
optsGetter func() (*bind.CallOpts, error)
notifier notifications.Notifier
genesisTime time.Time
}

var (
SlotDuration = 12 * time.Second
EpochSlots = 32
EpochDuration = SlotDuration * time.Duration(EpochSlots)
NotifyOffset = 1 * time.Second
FetchOffset = 2 * time.Second
)

func NewService(
apiURL string,
validatorRouter ValidatorRouterContract,
logger *slog.Logger,
optsGetter func() (*bind.CallOpts, error),
notifier notifications.Notifier,
) *Service {
return &Service{
apiURL: apiURL,
validatorRouter: validatorRouter,
logger: logger,
metrics: newMetrics(),
optsGetter: optsGetter,
notifier: notifier,
}
}

Expand Down Expand Up @@ -98,8 +113,8 @@ func (s *Service) fetchCurrentEpoch(ctx context.Context, epoch uint64) (uint64,
if epoch != 0 {
return epoch, nil
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.apiURL+"/eth/v1/beacon/states/head/finality_checkpoints", nil)
url := fmt.Sprintf("%s/eth/v1/beacon/states/head/finality_checkpoints", s.apiURL)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
s.logger.Error("creating request", "error", err)
return 0, status.Errorf(codes.Internal, "creating request: %v", err)
Expand Down Expand Up @@ -216,3 +231,160 @@ func (s *Service) processValidators(dutiesResp *ProposerDutiesResponse) (map[uin

return validators, nil
}

func (s *Service) scheduleNotificationForSlot(epoch uint64, slot uint64, info *validatorapiv1.SlotInfo) {
slotStartTime := s.genesisTime.Add(time.Duration(slot) * SlotDuration)
notificationTime := slotStartTime.Add(-NotifyOffset)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think we need this delay. We already have the delay to query the duties. I think we can send this as soon as we get the info. In case of the notifications, users will be always subscribed ideally. So sooner they get it, sooner they can decide to bid.

If they miss this, they should be able to query the get validators endpoint right?


delay := time.Until(notificationTime)
if delay <= 0 {
s.logger.Warn("notification time already passed for slot", "epoch", epoch, "slot", slot)
return
}

time.AfterFunc(delay, func() {
notif := notifications.NewNotification(
notifications.TopicValidatorOptedIn,
map[string]any{
"epoch": epoch,
"slot": slot,
"bls_key": info.BLSKey,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for the clients it would be better to give the execution block number in the notification as they will bid on that. Can we somehow compute this? Or maybe some API which can provide this?

},
)
s.notifier.Notify(notif)
s.logger.Info("sent notification for opted in validator", "epoch", epoch, "slot", slot, "bls_key", info.BLSKey)
})
}

func (s *Service) processEpoch(ctx context.Context, epoch uint64) {
s.logger.Info("processing epoch", "epoch", epoch)

dutiesResp, err := s.fetchProposerDuties(ctx, epoch)
if err != nil {
s.logger.Error("failed to fetch proposer duties", "epoch", epoch, "error", err)
return
}

validators, err := s.processValidators(dutiesResp)
if err != nil {
s.logger.Error("failed to process validators", "epoch", epoch, "error", err)
return
}

optedInSlots := make([]map[string]interface{}, 0)
for slot, info := range validators {
if info.IsOptedIn {
s.scheduleNotificationForSlot(epoch, slot, info)
optedInSlots = append(optedInSlots, map[string]interface{}{
"slot": slot,
"bls_key": info.BLSKey,
})
}
}

// Send epoch-level notification if there are any opted-in validators
if len(optedInSlots) > 0 {
notif := notifications.NewNotification(
notifications.TopicEpochValidatorsOptedIn,
map[string]any{
"epoch": epoch,
"slots": optedInSlots,
},
)
s.notifier.Notify(notif)
s.logger.Info("sent notification for epoch with opted in validators",
"epoch", epoch,
"slot_count", len(optedInSlots))
}
}

// Start starts a background job that fetches and processes an epoch every 384 seconds.
// (384 seconds is the duration of an epoch)
func (s *Service) Start(ctx context.Context) <-chan struct{} {
doneChan := make(chan struct{})

genesisTime, err := s.fetchGenesisTime(ctx)
if err != nil {
s.logger.Error("failed to fetch genesis time", "error", err)
close(doneChan)
return doneChan
}
s.genesisTime = genesisTime
s.logger.Info("initialized genesis time", "genesis_time", s.genesisTime)

eg, egCtx := errgroup.WithContext(ctx)

eg.Go(func() error {
// Loop until the context is canceled.
for {
now := time.Now()
elapsed := now.Sub(s.genesisTime)
currentEpoch := uint64(elapsed / EpochDuration)
nextEpoch := currentEpoch + 1
nextEpochStart := s.genesisTime.Add(time.Duration(nextEpoch) * EpochDuration)
fetchTime := nextEpochStart.Add(-FetchOffset)
delay := time.Until(fetchTime)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

if delay < 0 {
delay = 0
}

s.logger.Info("scheduling epoch fetch", "upcoming_epoch", nextEpoch, "fetch_in", delay, "fetch_time", fetchTime)
select {
case <-egCtx.Done():
s.logger.Info("epoch cron job stopped")
return nil
case <-time.After(delay):
// Time expired: proceed to process next epoch.
}

s.logger.Info("fetching upcoming epoch", "epoch", nextEpoch)
s.processEpoch(egCtx, nextEpoch)
}
})

go func() {
defer close(doneChan)
if err := eg.Wait(); err != nil {
s.logger.Error("error in epoch cron job", "error", err)
}
}()

return doneChan
}

func (s *Service) fetchGenesisTime(ctx context.Context) (time.Time, error) {
url := fmt.Sprintf("%s/eth/v1/beacon/genesis", s.apiURL)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
s.logger.Error("creating genesis request", "error", err)
return time.Time{}, err
}
req.Header.Set("accept", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
s.logger.Error("making genesis request", "error", err)
return time.Time{}, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
s.logger.Error("unexpected status code for genesis", "status", resp.StatusCode)
return time.Time{}, fmt.Errorf("unexpected status code: %v", resp.StatusCode)
}

var genesisResp struct {
Data struct {
GenesisTime string `json:"genesis_time"`
} `json:"data"`
}
if err := json.NewDecoder(resp.Body).Decode(&genesisResp); err != nil {
s.logger.Error("decoding genesis response", "error", err)
return time.Time{}, err
}
genesisTimeInt, err := strconv.ParseInt(genesisResp.Data.GenesisTime, 10, 64)
if err != nil {
s.logger.Error("parsing genesis time", "error", err)
return time.Time{}, err
}
return time.Unix(genesisTimeInt, 0), nil
}
Loading
Loading