Skip to content

Commit

Permalink
fix(eventindexer): add disperser log, remove unused stats from previo…
Browse files Browse the repository at this point in the history
…us testnets (#16938)
  • Loading branch information
cyberhorsey authored May 1, 2024
1 parent 0cbcc0a commit aec6bca
Show file tree
Hide file tree
Showing 19 changed files with 9 additions and 802 deletions.
6 changes: 0 additions & 6 deletions packages/eventindexer/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ func InitFromConfig(ctx context.Context, api *API, cfg *Config) error {
return err
}

statRepository, err := repo.NewStatRepository(db)
if err != nil {
return err
}

nftBalanceRepository, err := repo.NewNFTBalanceRepository(db)
if err != nil {
return err
Expand All @@ -79,7 +74,6 @@ func InitFromConfig(ctx context.Context, api *API, cfg *Config) error {

srv, err := http.NewServer(http.NewServerOpts{
EventRepo: eventRepository,
StatRepo: statRepository,
NFTBalanceRepo: nftBalanceRepository,
ChartRepo: chartRepository,
Echo: echo.New(),
Expand Down
2 changes: 2 additions & 0 deletions packages/eventindexer/disperser/disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func (d *Disperser) Start() error {
return err
}

slog.Info("addresses", "addresses", addresses)

for _, address := range addresses {
slog.Info("dispersing to", "address", address)

Expand Down
104 changes: 0 additions & 104 deletions packages/eventindexer/generator/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,110 +403,6 @@ func (g *Generator) queryByTask(task string, date time.Time) error {

// return early for array processing data
return nil
case tasks.TotalProofRewards:
var feeTokenAddresses []string = make([]string, 0)
// get unique fee token addresses
query := "SELECT DISTINCT(fee_token_address) FROM stats WHERE stat_type = ?"

err = g.db.GormDB().
Raw(query, eventindexer.StatTypeProofReward).
Scan(&feeTokenAddresses).Error
if err != nil {
return err
}

slog.Info("feeTokenAddresses", "addresses", feeTokenAddresses)

for _, feeTokenAddress := range feeTokenAddresses {
f := feeTokenAddress

var dailyProofRewards decimal.NullDecimal

// nolint: lll
query := "SELECT COALESCE(SUM(proof_reward), 0) FROM events WHERE event = ? AND DATE(transacted_at) = ? AND fee_token_address = ?"
err = g.db.GormDB().
Raw(query, eventindexer.EventNameBlockAssigned, dateString, f).
Scan(&dailyProofRewards).Error

if err != nil {
return err
}

tsdResult, err := g.previousDayTsdResultByTask(task, date, &f, nil)
if err != nil {
return err
}

result := tsdResult.Decimal.Add(dailyProofRewards.Decimal)

slog.Info("Query successful",
"task", task,
"date", dateString,
"result", result.String(),
"feeTokenAddress", f,
)

insertStmt := `
INSERT INTO time_series_data(task, value, date, fee_token_address)
VALUES (?, ?, ?, ?)`

err = g.db.GormDB().Exec(insertStmt, task, result, dateString, f).Error
if err != nil {
slog.Info("Insert failed", "task", task, "date", dateString, "error", err.Error())
return err
}
}

// return early for array processing data
return nil
case tasks.ProofRewardsPerDay:
var feeTokenAddresses []string = make([]string, 0)
// get unique fee token addresses
query := "SELECT DISTINCT(fee_token_address) FROM stats WHERE stat_type = ?"

err = g.db.GormDB().
Raw(query, eventindexer.EventNameBlockAssigned).
Scan(&feeTokenAddresses).Error
if err != nil {
return err
}

for _, feeTokenAddress := range feeTokenAddresses {
f := feeTokenAddress

var result decimal.Decimal

// nolint: lll
query := `SELECT COALESCE(SUM(proof_reward), 0) FROM events WHERE event = ? AND DATE(transacted_at) = ? AND fee_token_address = ?`
err = g.db.GormDB().
Raw(query, eventindexer.EventNameBlockAssigned, dateString, f).
Scan(&result).Error

if err != nil {
return err
}

slog.Info("Query successful",
"task", task,
"date", dateString,
"result", result.String(),
"feeTokenAddress", f,
)

insertStmt := `
INSERT INTO time_series_data(task, value, date, fee_token_address)
VALUES (?, ?, ?, ?)`

err = g.db.GormDB().Exec(insertStmt, task, result, dateString, f).Error
if err != nil {
slog.Info("Insert failed", "task", task, "date", dateString, "error", err.Error())
return err
}
}

// return early for array processing data
return nil

case tasks.BridgeMessagesSentPerDay:
err = g.eventCount(task, date, eventindexer.EventNameMessageSent, &result)
case tasks.TotalBridgeMessagesSent:
Expand Down
2 changes: 1 addition & 1 deletion packages/eventindexer/indexer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
L1TaikoAddress: common.HexToAddress(c.String(flags.L1TaikoAddress.Name)),
BridgeAddress: common.HexToAddress(c.String(flags.BridgeAddress.Name)),
AssignmentHookAddress: common.HexToAddress(c.String(flags.AssignmentHookAddress.Name)),
SgxVerifierAddress: common.HexToAddress(flags.SgxVerifierAddress.Name),
SgxVerifierAddress: common.HexToAddress(c.String(flags.SgxVerifierAddress.Name)),
SwapAddresses: swaps,
BlockBatchSize: c.Uint64(flags.BlockBatchSize.Name),
SubscriptionBackoff: c.Uint64(flags.SubscriptionBackoff.Name),
Expand Down
7 changes: 0 additions & 7 deletions packages/eventindexer/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ var (
type Indexer struct {
accountRepo eventindexer.AccountRepository
eventRepo eventindexer.EventRepository
statRepo eventindexer.StatRepository
nftBalanceRepo eventindexer.NFTBalanceRepository
txRepo eventindexer.TransactionRepository

Expand Down Expand Up @@ -132,11 +131,6 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) error {
return err
}

statRepository, err := repo.NewStatRepository(db)
if err != nil {
return err
}

nftBalanceRepository, err := repo.NewNFTBalanceRepository(db)
if err != nil {
return err
Expand Down Expand Up @@ -209,7 +203,6 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) error {
i.blockSaveMutex = &sync.Mutex{}
i.accountRepo = accountRepository
i.eventRepo = eventRepository
i.statRepo = statRepository
i.nftBalanceRepo = nftBalanceRepository
i.txRepo = txRepository

Expand Down
70 changes: 2 additions & 68 deletions packages/eventindexer/indexer/save_block_assigned_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ func (i *Indexer) saveBlockAssignedEvent(
return errors.Wrap(err, "i.ethClient.BlockByNumber")
}

proverReward, err := i.updateAverageProverReward(ctx, event)
if err != nil {
return errors.Wrap(err, "i.updateAverageProverReward")
}

feeToken := event.Assignment.FeeToken.Hex()

_, err = i.eventRepo.Save(ctx, eventindexer.SaveEventOpts{
Expand All @@ -72,8 +67,8 @@ func (i *Indexer) saveBlockAssignedEvent(
Address: "",
AssignedProver: &assignedProver,
TransactedAt: time.Unix(int64(block.Time()), 0).UTC(),
Amount: proverReward,
ProofReward: proverReward,
Amount: big.NewInt(0),
ProofReward: big.NewInt(0),
FeeTokenAddress: &feeToken,
EmittedBlockID: event.Raw.BlockNumber,
})
Expand All @@ -85,64 +80,3 @@ func (i *Indexer) saveBlockAssignedEvent(

return nil
}

func (i *Indexer) updateAverageProverReward(
ctx context.Context,
event *assignmenthook.AssignmentHookBlockAssigned,
) (*big.Int, error) {
feeToken := event.Assignment.FeeToken.Hex()

stat, err := i.statRepo.Find(ctx, eventindexer.StatTypeProofReward, &feeToken)
if err != nil {
return nil, errors.Wrap(err, "i.statRepo.Find")
}

avg, ok := new(big.Int).SetString(stat.AverageProofReward, 10)
if !ok {
return nil, errors.New("unable to convert average proof time to string")
}

var proverFee *big.Int

tiers := event.Assignment.TierFees
minTier := event.Meta.MinTier

for _, tier := range tiers {
if tier.Tier == minTier {
proverFee = tier.Fee
break
}
}

newAverageProofReward := calcNewAverage(
avg,
new(big.Int).SetUint64(stat.NumProofs),
proverFee,
)

slog.Info("newAverageProofReward update",
"prover",
event.AssignedProver.Hex(),
"proverFee",
proverFee.String(),
"tiers",
event.Assignment.TierFees,
"minTier",
event.Meta.MinTier,
"avg",
avg.String(),
"newAvg",
newAverageProofReward.String(),
)

_, err = i.statRepo.Save(ctx, eventindexer.SaveStatOpts{
ProofReward: newAverageProofReward,
StatType: eventindexer.StatTypeProofReward,
FeeTokenAddress: &feeToken,
})
if err != nil {
return nil, errors.Wrap(err, "i.statRepo.Save")
}

return big.NewInt(0), nil
}
84 changes: 0 additions & 84 deletions packages/eventindexer/indexer/save_transition_proved_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,11 @@ import (

"log/slog"

"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/taikoxyz/taiko-mono/packages/eventindexer"
"github.com/taikoxyz/taiko-mono/packages/eventindexer/contracts/taikol1"
)

var (
systemProver = common.HexToAddress("0x0000000000000000000000000000000000000001")
oracleProver = common.HexToAddress("0x0000000000000000000000000000000000000000")
)

func (i *Indexer) saveTransitionProvedEvents(
ctx context.Context,
chainID *big.Int,
Expand Down Expand Up @@ -82,83 +76,5 @@ func (i *Indexer) saveTransitionProvedEvent(

eventindexer.TransitionProvedEventsProcessed.Inc()

if event.Prover.Hex() != systemProver.Hex() && event.Prover.Hex() != oracleProver.Hex() {
if err := i.updateAverageProofTime(ctx, event); err != nil {
return errors.Wrap(err, "i.updateAverageProofTime")
}
}

return nil
}

func (i *Indexer) updateAverageProofTime(ctx context.Context, event *taikol1.TaikoL1TransitionProved) error {
block, err := i.taikol1.GetBlock(nil, event.BlockId.Uint64())
// will be unable to GetBlock for older blocks, just return nil, we dont
// care about averageProofTime that much to be honest for older blocks
if err != nil {
slog.Error("getBlock error", "err", err.Error())

return nil
}

eventBlock, err := i.ethClient.BlockByHash(ctx, event.Raw.BlockHash)
if err != nil {
return errors.Wrap(err, "i.ethClient.BlockByHash")
}

stat, err := i.statRepo.Find(ctx, eventindexer.StatTypeProofTime, nil)
if err != nil {
return errors.Wrap(err, "i.statRepo.Find")
}

proposedAt := block.ProposedAt

provenAt := eventBlock.Time()

proofTime := provenAt - proposedAt

avg, ok := new(big.Int).SetString(stat.AverageProofTime, 10)
if !ok {
return errors.New("unable to convert average proof time to string")
}

newAverageProofTime := calcNewAverage(
avg,
new(big.Int).SetUint64(stat.NumProofs),
new(big.Int).SetUint64(proofTime),
)

slog.Info("avgProofWindow update",
"id",
event.BlockId.Int64(),
"prover",
event.Prover.Hex(),
"proposedAt",
proposedAt,
"provenAt",
provenAt,
"proofTime",
proofTime,
"avg",
avg.String(),
"newAvg",
newAverageProofTime.String(),
)

_, err = i.statRepo.Save(ctx, eventindexer.SaveStatOpts{
ProofTime: newAverageProofTime,
StatType: eventindexer.StatTypeProofTime,
})
if err != nil {
return errors.Wrap(err, "i.statRepo.Save")
}

return nil
}

func calcNewAverage(a, t, n *big.Int) *big.Int {
m := new(big.Int).Mul(a, t)
added := new(big.Int).Add(m, n)

return new(big.Int).Div(added, t.Add(t, big.NewInt(1)))
}
20 changes: 0 additions & 20 deletions packages/eventindexer/migrations/1666650701_create_stats_table.sql

This file was deleted.

Loading

0 comments on commit aec6bca

Please sign in to comment.