Skip to content

Commit

Permalink
Include execution tipset in GetMsgInfo
Browse files Browse the repository at this point in the history
This commit updates the GetMsgInfo to include the execution tipset of
the message (if it exists). This allows us to call GetTipSetByCid
instead of GetTipsetByHeight which should be considerably faster and
help speed up both StateSearcMessage and StateWaitForMessage calls.
  • Loading branch information
fridrik01 committed Apr 6, 2023
1 parent 9d44c88 commit a11752f
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 34 deletions.
9 changes: 5 additions & 4 deletions chain/index/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@ type MsgInfo struct {

// MsgIndex is the interface to the message index
type MsgIndex interface {
// GetMsgInfo retrieves the message metadata through the index.
// GetMsgInfo looks up the message in index and retrieve its metadata and execution
// tipset cid.
// The lookup is done using the onchain message Cid; that is the signed message Cid
// for SECP messages and unsigned message Cid for BLS messages.
GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error)
GetMsgInfo(ctx context.Context, mCid cid.Cid) (MsgInfo, cid.Cid, error)
// Close closes the index
Close() error
}

type dummyMsgIndex struct{}

func (dummyMsgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
return MsgInfo{}, ErrNotFound
func (dummyMsgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, cid.Cid, error) {
return MsgInfo{}, cid.Undef, ErrNotFound
}

func (dummyMsgIndex) Close() error {
Expand Down
71 changes: 48 additions & 23 deletions chain/index/msgindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ var dbDefs = []string{
tipset_cid VARCHAR(80) NOT NULL,
epoch INTEGER NOT NULL
)`,
`CREATE INDEX IF NOT EXISTS tipset_cids ON messages (tipset_cid)
`,
`CREATE INDEX IF NOT EXISTS tipset_cids ON messages (tipset_cid)`,
`CREATE INDEX IF NOT EXISTS tipset_epochs ON messages (epoch)`,
`CREATE TABLE IF NOT EXISTS _meta (
version UINT64 NOT NULL UNIQUE
)`,
Expand All @@ -44,6 +44,8 @@ const (
dbqGetMessageInfo = "SELECT tipset_cid, epoch FROM messages WHERE cid = ?"
dbqInsertMessage = "INSERT INTO messages VALUES (?, ?, ?)"
dbqDeleteTipsetMessages = "DELETE FROM messages WHERE tipset_cid = ?"
dbqGetTipsetInfo = "SELECT tipset_cid FROM messages WHERE epoch = ? LIMIT 1"

// reconciliation
dbqCountMessages = "SELECT COUNT(*) FROM messages"
dbqMinEpoch = "SELECT MIN(epoch) FROM messages"
Expand Down Expand Up @@ -75,6 +77,7 @@ type msgIndex struct {

db *sql.DB
selectMsgStmt *sql.Stmt
selectTipsetInfo *sql.Stmt
insertMsgStmt *sql.Stmt
deleteTipSetStmt *sql.Stmt

Expand Down Expand Up @@ -350,6 +353,12 @@ func (x *msgIndex) prepareStatements() error {
}
x.selectMsgStmt = stmt

stmt, err = x.db.Prepare(dbqGetTipsetInfo)
if err != nil {
return xerrors.Errorf("prepare selectTipsetInfo: %w", err)
}
x.selectTipsetInfo = stmt

stmt, err = x.db.Prepare(dbqInsertMessage)
if err != nil {
return xerrors.Errorf("prepare insertMsgStmt: %w", err)
Expand Down Expand Up @@ -485,40 +494,56 @@ func (x *msgIndex) doApply(ctx context.Context, tx *sql.Tx, ts *types.TipSet) er
}

// interface
func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
func (x *msgIndex) GetMsgInfo(ctx context.Context, mCid cid.Cid) (MsgInfo, cid.Cid, error) {
x.closeLk.RLock()
defer x.closeLk.RUnlock()

if x.closed {
return MsgInfo{}, ErrClosed
return MsgInfo{}, cid.Undef, ErrClosed
}

var (
tipset string
epoch int64
)
// fetch message from index (if it exists)
//
var tipset string
var epoch int64
err := x.selectMsgStmt.QueryRow(mCid.String()).Scan(&tipset, &epoch)
if err != nil {
if err == sql.ErrNoRows {
// mCid not in index, its fine
return MsgInfo{}, cid.Undef, ErrNotFound
}

key := m.String()
row := x.selectMsgStmt.QueryRow(key)
err := row.Scan(&tipset, &epoch)
switch {
case err == sql.ErrNoRows:
return MsgInfo{}, ErrNotFound
return MsgInfo{}, cid.Undef, xerrors.Errorf("error querying msgindex database: %w", err)
}
tsCid, err := cid.Decode(tipset)
if err != nil {
return MsgInfo{}, cid.Undef, xerrors.Errorf("error decoding tipset cid: %w", err)
}

case err != nil:
return MsgInfo{}, xerrors.Errorf("error querying msgindex database: %w", err)
msgInfo := MsgInfo{
Message: mCid,
TipSet: tsCid,
Epoch: abi.ChainEpoch(epoch),
}

tipsetCid, err := cid.Decode(tipset)
// fetch execution tipset of message (if it exists)
//
var xTipset string
err = x.selectTipsetInfo.QueryRow(epoch + 1).Scan(&xTipset)
if err != nil {
if err == sql.ErrNoRows {
// execution tipset not in index, its fine
return msgInfo, cid.Undef, nil
}

return MsgInfo{}, cid.Undef, xerrors.Errorf("error querying for execution tipset: %w", err)
}
xtsCid, err := cid.Decode(xTipset)
if err != nil {
return MsgInfo{}, xerrors.Errorf("error decoding tipset cid: %w", err)
return MsgInfo{}, cid.Undef, xerrors.Errorf("error decoding execution tipset cid: %w", err)
}

return MsgInfo{
Message: m,
TipSet: tipsetCid,
Epoch: abi.ChainEpoch(epoch),
}, nil
return msgInfo, xtsCid, nil
}

func (x *msgIndex) Close() error {
Expand Down
4 changes: 2 additions & 2 deletions chain/index/msgindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func verifyIndex(t *testing.T, cs *mockChainStore, msgIndex MsgIndex) {
msgs, err := cs.MessagesForTipset(context.Background(), ts)
require.NoError(t, err)
for _, m := range msgs {
minfo, err := msgIndex.GetMsgInfo(context.Background(), m.Cid())
minfo, ts, err := msgIndex.GetMsgInfo(context.Background(), m.Cid())
require.NoError(t, err)
require.Equal(t, tsCid, minfo.TipSet)
require.Equal(t, ts.Height(), minfo.Epoch)
Expand All @@ -174,7 +174,7 @@ func verifyMissing(t *testing.T, cs *mockChainStore, msgIndex MsgIndex, missing
msgs, err := cs.MessagesForTipset(context.Background(), ts)
require.NoError(t, err)
for _, m := range msgs {
_, err := msgIndex.GetMsgInfo(context.Background(), m.Cid())
_, _, err := msgIndex.GetMsgInfo(context.Background(), m.Cid())
require.Equal(t, ErrNotFound, err)
}
}
Expand Down
18 changes: 13 additions & 5 deletions chain/stmgr/searchwait.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, head *types.TipSet
}

func (sm *StateManager) searchForIndexedMsg(ctx context.Context, mcid cid.Cid, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
minfo, err := sm.msgIndex.GetMsgInfo(ctx, mcid)
minfo, xtsCid, err := sm.msgIndex.GetMsgInfo(ctx, mcid)
if err != nil {
return nil, nil, cid.Undef, xerrors.Errorf("error looking up message in index: %w", err)
}
Expand All @@ -203,10 +203,18 @@ func (sm *StateManager) searchForIndexedMsg(ctx context.Context, mcid cid.Cid, m
}

// now get the execution tipset
// TODO optimization: the index should have it implicitly so we can return it in the msginfo.
xts, err := sm.cs.GetTipsetByHeight(ctx, minfo.Epoch+1, curTs, false)
if err != nil {
return nil, nil, cid.Undef, xerrors.Errorf("error looking up execution tipset: %w", err)
var xts *types.TipSet = nil
if xtsCid != cid.Undef {
// lookup by cid which is faster
xts, err = sm.cs.GetTipSetByCid(ctx, xtsCid)
if err != nil {
return nil, nil, cid.Undef, xerrors.Errorf("error calling GetTipSetByCid: %w", err)
}
} else {
xts, err = sm.cs.GetTipsetByHeight(ctx, minfo.Epoch+1, curTs, false)
if err != nil {
return nil, nil, cid.Undef, xerrors.Errorf("error calling GetTipsetByHeight: %w", err)
}
}

// check that the parent of the execution index is indeed the inclusion tipset
Expand Down

0 comments on commit a11752f

Please sign in to comment.