Skip to content

Commit

Permalink
susbcribe to block proposed
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberhorsey committed May 15, 2023
1 parent 8c7ddc4 commit e40f1b2
Showing 1 changed file with 69 additions and 0 deletions.
69 changes: 69 additions & 0 deletions packages/eventindexer/indexer/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error {
errChan := make(chan error)

go svc.subscribeBlockProven(ctx, chainID, errChan)
go svc.subscribeBlockProposed(ctx, chainID, errChan)

// nolint: gosimple
for {
Expand Down Expand Up @@ -61,6 +62,74 @@ func (svc *Service) subscribeBlockProven(ctx context.Context, chainID *big.Int,
log.Infof("blockProvenEvent from subscription for prover %v", event.Prover.Hex())

if err := svc.saveBlockProvenEvent(ctx, chainID, event); err != nil {
eventindexer.BlockProvenEventsProcessedError.Inc()

log.Errorf("svc.subscribe, svc.saveBlockProvenEvent: %v", err)
}

block, err := svc.blockRepo.GetLatestBlockProcessed(chainID)
if err != nil {
log.Errorf("svc.subscribe, blockRepo.GetLatestBlockProcessed: %v", err)
continue
}

if block.Height < event.Raw.BlockNumber {
err = svc.blockRepo.Save(eventindexer.SaveBlockOpts{
Height: event.Raw.BlockNumber,
Hash: event.Raw.BlockHash,
ChainID: chainID,
})
if err != nil {
log.Errorf("svc.subscribe, svc.blockRepo.Save: %v", err)
}

eventindexer.BlocksProcessed.Inc()
}
}
}
}

func (svc *Service) subscribeBlockProposed(ctx context.Context, chainID *big.Int, errChan chan error) {
sink := make(chan *taikol1.TaikoL1BlockProposed)

sub := event.ResubscribeErr(svc.subscriptionBackoff, func(ctx context.Context, err error) (event.Subscription, error) {
if err != nil {
log.Errorf("svc.taikoL1.WatchBlockProposed: %v", err)
}
log.Info("resubscribing to BlockProposed events")

return svc.taikol1.WatchBlockProposed(&bind.WatchOpts{
Context: ctx,
}, sink, nil)
})

defer sub.Unsubscribe()

for {
select {
case <-ctx.Done():
log.Info("context finished")
return
case err := <-sub.Err():
errChan <- errors.Wrap(err, "sub.Err()")
case event := <-sink:
log.Infof("blockProposedEvent from subscription")

tx, _, err := svc.ethClient.TransactionByHash(ctx, event.Raw.TxHash)
if err != nil {
log.Errorf("svc.ethClient.TransactionByHash: %v", err)
}

sender, err := svc.ethClient.TransactionSender(ctx, tx, event.Raw.BlockHash, event.Raw.TxIndex)
if err != nil {
log.Errorf("svc.ethClient.TransactionSender: %v", err)
}

log.Infof("blockProposed by: %v", sender.Hex())

if err := svc.saveBlockProposedEvent(ctx, chainID, event, sender); err != nil {
eventindexer.BlockProposedEventsProcessedError.Inc()

log.Errorf("svc.subscribe, svc.saveBlockProvenEvent: %v", err)
}

Expand Down

0 comments on commit e40f1b2

Please sign in to comment.