Skip to content

Commit

Permalink
Replace ExecutedAndBlockMessages with individual methods (#1040)
Browse files Browse the repository at this point in the history
* refactor: remove block messages from TipSetMessages and refactor processors

* chore: TipSetMessageReceipts implemented and wired up

* chore: update miner post extractor

* polish: TipSetMessageReceipts iterator

* refactor: update msapproaval extractor

* refactor: update parsed messages task

* refactor: memoize TipSetMessageReceipts

* refactor: update gas economy

* refactor: update gas outs task

* refactor: remove ExecutedAndBlockMessages method

* address review feedback
  • Loading branch information
frrist committed Aug 30, 2022
1 parent da510c3 commit ab09cca
Show file tree
Hide file tree
Showing 21 changed files with 831 additions and 688 deletions.
152 changes: 81 additions & 71 deletions chain/datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,23 @@ import (

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-hamt-ipld/v3"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
states0 "github.com/filecoin-project/specs-actors/actors/states"
states2 "github.com/filecoin-project/specs-actors/v2/actors/states"
states3 "github.com/filecoin-project/specs-actors/v3/actors/states"
states4 "github.com/filecoin-project/specs-actors/v4/actors/states"
states5 "github.com/filecoin-project/specs-actors/v5/actors/states"
lru "github.com/hashicorp/golang-lru"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/sync/singleflight"

states0 "github.com/filecoin-project/specs-actors/actors/states"
states2 "github.com/filecoin-project/specs-actors/v2/actors/states"
states3 "github.com/filecoin-project/specs-actors/v3/actors/states"
states4 "github.com/filecoin-project/specs-actors/v4/actors/states"
states5 "github.com/filecoin-project/specs-actors/v5/actors/states"

"github.com/filecoin-project/lily/chain/actors/adt"
"github.com/filecoin-project/lily/chain/actors/adt/diff"
"github.com/filecoin-project/lily/chain/actors/builtin/miner"
Expand All @@ -37,28 +38,28 @@ import (
)

var (
executedBlkMsgCacheSize int
executedTsCacheSize int
diffPreCommitCacheSize int
diffSectorCacheSize int

executedBlkMsgCacheSizeEnv = "LILY_EXECUTED_BLK_MSG_CACHE_SIZE"
executedTsCacheSizeEnv = "LILY_EXECUTED_TS_CACHE_SIZE"
diffPreCommitCacheSizeEnv = "LILY_DIFF_PRECOMMIT_CACHE_SIZE"
diffSectorCacheSizeEnv = "LILY_DIFF_SECTORS_CACHE_SIZE"
tipsetMessageReceiptCacheSize int
executedTsCacheSize int
diffPreCommitCacheSize int
diffSectorCacheSize int

tipsetMessageReceiptSizeEnv = "LILY_TIPSET_MSG_RECEIPT_CACHE_SIZE"
executedTsCacheSizeEnv = "LILY_EXECUTED_TS_CACHE_SIZE"
diffPreCommitCacheSizeEnv = "LILY_DIFF_PRECOMMIT_CACHE_SIZE"
diffSectorCacheSizeEnv = "LILY_DIFF_SECTORS_CACHE_SIZE"
)

func init() {
executedBlkMsgCacheSize = 4
tipsetMessageReceiptCacheSize = 4
executedTsCacheSize = 4
diffPreCommitCacheSize = 500
diffSectorCacheSize = 500
if s := os.Getenv(executedBlkMsgCacheSizeEnv); s != "" {
if s := os.Getenv(tipsetMessageReceiptSizeEnv); s != "" {
v, err := strconv.ParseInt(s, 10, 64)
if err == nil {
executedBlkMsgCacheSize = int(v)
tipsetMessageReceiptCacheSize = int(v)
} else {
log.Warnf("invalid value (%s) for %s defaulting to %d: %s", s, executedBlkMsgCacheSizeEnv, executedBlkMsgCacheSize, err)
log.Warnf("invalid value (%s) for %s defaulting to %d: %s", s, tipsetMessageReceiptSizeEnv, tipsetMessageReceiptCacheSize, err)
}
}
if s := os.Getenv(executedTsCacheSizeEnv); s != "" {
Expand Down Expand Up @@ -92,28 +93,12 @@ var _ tasks.DataSource = (*DataSource)(nil)

var log = logging.Logger("lily/datasource")

type DataSource struct {
node lens.API

executedBlkMsgCache *lru.Cache
executedBlkMsgGroup singleflight.Group

executedTsCache *lru.Cache
executedTsGroup singleflight.Group

diffSectorsCache *lru.Cache
diffSectorsGroup singleflight.Group

diffPreCommitCache *lru.Cache
diffPreCommitGroup singleflight.Group
}

func NewDataSource(node lens.API) (*DataSource, error) {
t := &DataSource{
node: node,
}
var err error
t.executedBlkMsgCache, err = lru.New(executedBlkMsgCacheSize)
t.tsBlkMsgRecCache, err = lru.New(tipsetMessageReceiptCacheSize)
if err != nil {
return nil, err
}
Expand All @@ -137,6 +122,56 @@ func NewDataSource(node lens.API) (*DataSource, error) {
return t, nil
}

type DataSource struct {
node lens.API

executedTsCache *lru.Cache
executedTsGroup singleflight.Group

tsBlkMsgRecCache *lru.Cache
tsBlkMsgRecGroup singleflight.Group

diffSectorsCache *lru.Cache
diffSectorsGroup singleflight.Group

diffPreCommitCache *lru.Cache
diffPreCommitGroup singleflight.Group
}

func (t *DataSource) ComputeBaseFee(ctx context.Context, ts *types.TipSet) (abi.TokenAmount, error) {
return t.node.ComputeBaseFee(ctx, ts)
}

func (t *DataSource) TipSetBlockMessages(ctx context.Context, ts *types.TipSet) ([]*lens.BlockMessages, error) {
return t.node.MessagesForTipSetBlocks(ctx, ts)
}

// TipSetMessageReceipts returns the blocks and messages in `pts` and their corresponding receipts from `ts` matching block order in tipset (`pts`).
// TODO replace with lotus chainstore method when https://github.com/filecoin-project/lotus/pull/9186 lands
func (t *DataSource) TipSetMessageReceipts(ctx context.Context, ts, pts *types.TipSet) ([]*lens.BlockMessageReceipts, error) {
key, err := asKey(ts, pts)
if err != nil {
return nil, err
}
value, found := t.tsBlkMsgRecCache.Get(key)
if found {
return value.([]*lens.BlockMessageReceipts), nil
}

value, err, _ = t.tsBlkMsgRecGroup.Do(key, func() (interface{}, error) {
data, innerErr := t.node.TipSetMessageReceipts(ctx, ts, pts)
if innerErr == nil {
t.tsBlkMsgRecCache.Add(key, data)
}
return data, innerErr
})
if err != nil {
return nil, err
}

return value.([]*lens.BlockMessageReceipts), nil
}

func (t *DataSource) TipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
ctx, span := otel.Tracer("").Start(ctx, "DataSource.TipSet")
if span.IsRecording() {
Expand Down Expand Up @@ -239,45 +274,20 @@ func (t *DataSource) MessageExecutions(ctx context.Context, ts, pts *types.TipSe
return value.([]*lens.MessageExecution), nil
}

func (t *DataSource) ExecutedAndBlockMessages(ctx context.Context, ts, pts *types.TipSet) (*lens.TipSetMessages, error) {
metrics.RecordInc(ctx, metrics.DataSourceExecutedAndBlockMessagesRead)
ctx, span := otel.Tracer("").Start(ctx, "DataSource.ExecutedAndBlockMessages")
if span.IsRecording() {
span.SetAttributes(attribute.String("tipset", ts.Key().String()))
span.SetAttributes(attribute.String("parent", pts.Key().String()))
}
defer span.End()

key, err := asKey(ts, pts)
if err != nil {
return nil, err
}
value, found := t.executedBlkMsgCache.Get(key)
if found {
metrics.RecordInc(ctx, metrics.DataSourceExecutedAndBlockMessagesCacheHit)
return value.(*lens.TipSetMessages), nil
}

value, err, shared := t.executedBlkMsgGroup.Do(key, func() (interface{}, error) {
data, innerErr := t.node.GetExecutedAndBlockMessagesForTipset(ctx, ts, pts)
if innerErr == nil {
t.executedBlkMsgCache.Add(key, data)
}
func (t *DataSource) MinerLoad(store adt.Store, act *types.Actor) (miner.State, error) {
return miner.Load(store, act)
}

return data, innerErr
})
func (t *DataSource) ShouldBurnFn(ctx context.Context, ts *types.TipSet) (lens.ShouldBurnFn, error) {
return t.node.BurnFundsFn(ctx, ts)
}

if span.IsRecording() {
span.SetAttributes(attribute.Bool("shared", shared))
}
func ComputeGasOutputs(ctx context.Context, block *types.BlockHeader, message *types.Message, receipt *types.MessageReceipt, shouldBurnFn lens.ShouldBurnFn) (vm.GasOutputs, error) {
burn, err := shouldBurnFn(ctx, message, receipt.ExitCode)
if err != nil {
return nil, err
return vm.GasOutputs{}, err
}
return value.(*lens.TipSetMessages), nil
}

func (t *DataSource) MinerLoad(store adt.Store, act *types.Actor) (miner.State, error) {
return miner.Load(store, act)
return vm.ComputeGasOutputs(receipt.GasUsed, message.GasLimit, block.ParentBaseFee, message.GasFeeCap, message.GasPremium, burn), nil
}

func GetActorStateChanges(ctx context.Context, store adt.Store, current, executed *types.TipSet) (tasks.ActorStateChangeDiff, error) {
Expand Down
11 changes: 6 additions & 5 deletions chain/indexer/integrated/processor/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,11 +541,14 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces
// Messages
//
case tasktype.Message:
out.TipsetsProcessors[t] = messagetask.NewTask(api)
out.TipsetProcessors[t] = messagetask.NewTask(api)
case tasktype.BlockMessage:
out.TipsetProcessors[t] = bmtask.NewTask(api)
case tasktype.MessageGasEconomy:
out.TipsetProcessors[t] = gasecontask.NewTask(api)

case tasktype.GasOutputs:
out.TipsetsProcessors[t] = gasouttask.NewTask(api)
case tasktype.BlockMessage:
out.TipsetsProcessors[t] = bmtask.NewTask(api)
case tasktype.ParsedMessage:
out.TipsetsProcessors[t] = parentmessagetask.NewTask(api)
case tasktype.Receipt:
Expand All @@ -554,8 +557,6 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces
out.TipsetsProcessors[t] = imtask.NewTask(api)
case tasktype.InternalParsedMessage:
out.TipsetsProcessors[t] = ipmtask.NewTask(api)
case tasktype.MessageGasEconomy:
out.TipsetsProcessors[t] = gasecontask.NewTask(api)
case tasktype.MultisigApproval:
out.TipsetsProcessors[t] = msapprovaltask.NewTask(api)
case tasktype.VmMessage:
Expand Down
16 changes: 8 additions & 8 deletions chain/indexer/integrated/processor/state_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
"github.com/filecoin-project/lily/chain/actors/builtin/verifreg"
"github.com/filecoin-project/lily/chain/indexer/tasktype"
"github.com/filecoin-project/lily/tasks/messageexecutions/vm"
"github.com/filecoin-project/lily/tasks/messages/blockmessage"
"github.com/filecoin-project/lily/tasks/messages/gaseconomy"
"github.com/filecoin-project/lily/tasks/messages/message"

"github.com/filecoin-project/lily/tasks/actorstate"
inittask "github.com/filecoin-project/lily/tasks/actorstate/init_"
Expand All @@ -34,10 +37,7 @@ import (
"github.com/filecoin-project/lily/tasks/consensus"
"github.com/filecoin-project/lily/tasks/messageexecutions/internalmessage"
"github.com/filecoin-project/lily/tasks/messageexecutions/internalparsedmessage"
"github.com/filecoin-project/lily/tasks/messages/blockmessage"
"github.com/filecoin-project/lily/tasks/messages/gaseconomy"
"github.com/filecoin-project/lily/tasks/messages/gasoutput"
"github.com/filecoin-project/lily/tasks/messages/message"
"github.com/filecoin-project/lily/tasks/messages/parsedmessage"
"github.com/filecoin-project/lily/tasks/messages/receipt"
"github.com/filecoin-project/lily/tasks/msapprovals"
Expand All @@ -48,26 +48,26 @@ func TestNewProcessor(t *testing.T) {
require.NoError(t, err)
require.Equal(t, t.Name(), proc.name)
require.Len(t, proc.actorProcessors, 21)
require.Len(t, proc.tipsetProcessors, 5)
require.Len(t, proc.tipsetsProcessors, 10)
require.Len(t, proc.tipsetProcessors, 8)
require.Len(t, proc.tipsetsProcessors, 7)
require.Len(t, proc.builtinProcessors, 1)

require.Equal(t, message.NewTask(nil), proc.tipsetsProcessors[tasktype.Message])
require.Equal(t, gasoutput.NewTask(nil), proc.tipsetsProcessors[tasktype.GasOutputs])
require.Equal(t, blockmessage.NewTask(nil), proc.tipsetsProcessors[tasktype.BlockMessage])
require.Equal(t, parsedmessage.NewTask(nil), proc.tipsetsProcessors[tasktype.ParsedMessage])
require.Equal(t, receipt.NewTask(nil), proc.tipsetsProcessors[tasktype.Receipt])
require.Equal(t, internalmessage.NewTask(nil), proc.tipsetsProcessors[tasktype.InternalMessage])
require.Equal(t, internalparsedmessage.NewTask(nil), proc.tipsetsProcessors[tasktype.InternalParsedMessage])
require.Equal(t, gaseconomy.NewTask(nil), proc.tipsetsProcessors[tasktype.MessageGasEconomy])
require.Equal(t, msapprovals.NewTask(nil), proc.tipsetsProcessors[tasktype.MultisigApproval])
require.Equal(t, vm.NewTask(nil), proc.tipsetsProcessors[tasktype.VmMessage])

require.Equal(t, message.NewTask(nil), proc.tipsetProcessors[tasktype.Message])
require.Equal(t, blockmessage.NewTask(nil), proc.tipsetProcessors[tasktype.BlockMessage])
require.Equal(t, headers.NewTask(), proc.tipsetProcessors[tasktype.BlockHeader])
require.Equal(t, parents.NewTask(), proc.tipsetProcessors[tasktype.BlockParent])
require.Equal(t, drand.NewTask(), proc.tipsetProcessors[tasktype.DrandBlockEntrie])
require.Equal(t, chaineconomics.NewTask(nil), proc.tipsetProcessors[tasktype.ChainEconomics])
require.Equal(t, consensus.NewTask(nil), proc.tipsetProcessors[tasktype.ChainConsensus])
require.Equal(t, gaseconomy.NewTask(nil), proc.tipsetProcessors[tasktype.MessageGasEconomy])

require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.DeadlineInfoExtractor{})), proc.actorProcessors[tasktype.MinerCurrentDeadlineInfo])
require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.FeeDebtExtractor{})), proc.actorProcessors[tasktype.MinerFeeDebt])
Expand Down
16 changes: 8 additions & 8 deletions chain/indexer/integrated/processor/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,47 +269,47 @@ func TestMakeProcessorsActors(t *testing.T) {

func TestMakeProcessorsTipSet(t *testing.T) {
tasks := []string{
tasktype.Message,
tasktype.BlockMessage,
tasktype.BlockHeader,
tasktype.BlockParent,
tasktype.DrandBlockEntrie,
tasktype.ChainEconomics,
tasktype.ChainConsensus,
tasktype.MessageGasEconomy,
}
proc, err := processor.MakeProcessors(nil, tasks)
require.NoError(t, err)
require.Len(t, proc.TipsetProcessors, len(tasks))

require.Equal(t, message.NewTask(nil), proc.TipsetProcessors[tasktype.Message])
require.Equal(t, blockmessage.NewTask(nil), proc.TipsetProcessors[tasktype.BlockMessage])
require.Equal(t, headers.NewTask(), proc.TipsetProcessors[tasktype.BlockHeader])
require.Equal(t, parents.NewTask(), proc.TipsetProcessors[tasktype.BlockParent])
require.Equal(t, drand.NewTask(), proc.TipsetProcessors[tasktype.DrandBlockEntrie])
require.Equal(t, chaineconomics.NewTask(nil), proc.TipsetProcessors[tasktype.ChainEconomics])
require.Equal(t, consensus.NewTask(nil), proc.TipsetProcessors[tasktype.ChainConsensus])
require.Equal(t, gaseconomy.NewTask(nil), proc.TipsetProcessors[tasktype.MessageGasEconomy])
}

func TestMakeProcessorsTipSets(t *testing.T) {
tasks := []string{
tasktype.Message,
tasktype.GasOutputs,
tasktype.BlockMessage,
tasktype.ParsedMessage,
tasktype.Receipt,
tasktype.InternalMessage,
tasktype.InternalParsedMessage,
tasktype.MessageGasEconomy,
tasktype.MultisigApproval,
}
proc, err := processor.MakeProcessors(nil, tasks)
require.NoError(t, err)
require.Len(t, proc.TipsetsProcessors, len(tasks))

require.Equal(t, message.NewTask(nil), proc.TipsetsProcessors[tasktype.Message])
require.Equal(t, gasoutput.NewTask(nil), proc.TipsetsProcessors[tasktype.GasOutputs])
require.Equal(t, blockmessage.NewTask(nil), proc.TipsetsProcessors[tasktype.BlockMessage])
require.Equal(t, parsedmessage.NewTask(nil), proc.TipsetsProcessors[tasktype.ParsedMessage])
require.Equal(t, receipt.NewTask(nil), proc.TipsetsProcessors[tasktype.Receipt])
require.Equal(t, internalmessage.NewTask(nil), proc.TipsetsProcessors[tasktype.InternalMessage])
require.Equal(t, internalparsedmessage.NewTask(nil), proc.TipsetsProcessors[tasktype.InternalParsedMessage])
require.Equal(t, gaseconomy.NewTask(nil), proc.TipsetsProcessors[tasktype.MessageGasEconomy])
require.Equal(t, msapprovals.NewTask(nil), proc.TipsetsProcessors[tasktype.MultisigApproval])
}

Expand Down Expand Up @@ -346,7 +346,7 @@ func TestMakeProcessorsAllTasks(t *testing.T) {
proc, err := processor.MakeProcessors(nil, append(tasktype.AllTableTasks, processor.BuiltinTaskName))
require.NoError(t, err)
require.Len(t, proc.ActorProcessors, 21)
require.Len(t, proc.TipsetProcessors, 5)
require.Len(t, proc.TipsetsProcessors, 10)
require.Len(t, proc.TipsetProcessors, 8)
require.Len(t, proc.TipsetsProcessors, 7)
require.Len(t, proc.ReportProcessors, 1)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ require (
github.com/filecoin-project/specs-actors/v8 v8.0.1
github.com/hibiken/asynq v0.23.0
github.com/hibiken/asynq/x v0.0.0-20220413130846-5c723f597e01
github.com/ipfs/go-ipld-format v0.4.0
github.com/jedib0t/go-pretty/v6 v6.2.7
go.opentelemetry.io/otel/trace v1.3.0
go.uber.org/atomic v1.9.0
Expand Down Expand Up @@ -196,7 +197,6 @@ require (
github.com/ipfs/go-ipfs-pq v0.0.2 // indirect
github.com/ipfs/go-ipfs-routing v0.2.1 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
github.com/ipfs/go-ipld-format v0.4.0 // indirect
github.com/ipfs/go-ipld-legacy v0.1.1 // indirect
github.com/ipfs/go-ipns v0.1.2 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
Expand Down
Loading

0 comments on commit ab09cca

Please sign in to comment.