diff --git a/chain/index/interface.go b/chain/index/interface.go index f875a94bf79..cc6c5c502dc 100644 --- a/chain/index/interface.go +++ b/chain/index/interface.go @@ -24,18 +24,19 @@ type MsgInfo struct { // MsgIndex is the interface to the message index type MsgIndex interface { - // GetMsgInfo retrieves the message metadata through the index. + // GetMsgInfo looks up the message in index and retrieve its metadata and execution + // tipset cid. // The lookup is done using the onchain message Cid; that is the signed message Cid // for SECP messages and unsigned message Cid for BLS messages. - GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) + GetMsgInfo(ctx context.Context, mCid cid.Cid) (MsgInfo, cid.Cid, error) // Close closes the index Close() error } type dummyMsgIndex struct{} -func (dummyMsgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) { - return MsgInfo{}, ErrNotFound +func (dummyMsgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, cid.Cid, error) { + return MsgInfo{}, cid.Undef, ErrNotFound } func (dummyMsgIndex) Close() error { diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index 39ba487f2ef..0d1a8a9103f 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -30,8 +30,8 @@ var dbDefs = []string{ tipset_cid VARCHAR(80) NOT NULL, epoch INTEGER NOT NULL )`, - `CREATE INDEX IF NOT EXISTS tipset_cids ON messages (tipset_cid) - `, + `CREATE INDEX IF NOT EXISTS tipset_cids ON messages (tipset_cid)`, + `CREATE INDEX IF NOT EXISTS tipset_epochs ON messages (epoch)`, `CREATE TABLE IF NOT EXISTS _meta ( version UINT64 NOT NULL UNIQUE )`, @@ -44,6 +44,8 @@ const ( dbqGetMessageInfo = "SELECT tipset_cid, epoch FROM messages WHERE cid = ?" dbqInsertMessage = "INSERT INTO messages VALUES (?, ?, ?)" dbqDeleteTipsetMessages = "DELETE FROM messages WHERE tipset_cid = ?" + dbqGetTipsetInfo = "SELECT tipset_cid FROM messages WHERE epoch = ? LIMIT 1" + // reconciliation dbqCountMessages = "SELECT COUNT(*) FROM messages" dbqMinEpoch = "SELECT MIN(epoch) FROM messages" @@ -75,6 +77,7 @@ type msgIndex struct { db *sql.DB selectMsgStmt *sql.Stmt + selectTipsetInfo *sql.Stmt insertMsgStmt *sql.Stmt deleteTipSetStmt *sql.Stmt @@ -350,6 +353,12 @@ func (x *msgIndex) prepareStatements() error { } x.selectMsgStmt = stmt + stmt, err = x.db.Prepare(dbqGetTipsetInfo) + if err != nil { + return xerrors.Errorf("prepare selectTipsetInfo: %w", err) + } + x.selectTipsetInfo = stmt + stmt, err = x.db.Prepare(dbqInsertMessage) if err != nil { return xerrors.Errorf("prepare insertMsgStmt: %w", err) @@ -485,40 +494,56 @@ func (x *msgIndex) doApply(ctx context.Context, tx *sql.Tx, ts *types.TipSet) er } // interface -func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) { +func (x *msgIndex) GetMsgInfo(ctx context.Context, mCid cid.Cid) (MsgInfo, cid.Cid, error) { x.closeLk.RLock() defer x.closeLk.RUnlock() if x.closed { - return MsgInfo{}, ErrClosed + return MsgInfo{}, cid.Undef, ErrClosed } - var ( - tipset string - epoch int64 - ) + // fetch message from index (if it exists) + // + var tipset string + var epoch int64 + err := x.selectMsgStmt.QueryRow(mCid.String()).Scan(&tipset, &epoch) + if err != nil { + if err == sql.ErrNoRows { + // mCid not in index, its fine + return MsgInfo{}, cid.Undef, ErrNotFound + } - key := m.String() - row := x.selectMsgStmt.QueryRow(key) - err := row.Scan(&tipset, &epoch) - switch { - case err == sql.ErrNoRows: - return MsgInfo{}, ErrNotFound + return MsgInfo{}, cid.Undef, xerrors.Errorf("error querying msgindex database: %w", err) + } + tsCid, err := cid.Decode(tipset) + if err != nil { + return MsgInfo{}, cid.Undef, xerrors.Errorf("error decoding tipset cid: %w", err) + } - case err != nil: - return MsgInfo{}, xerrors.Errorf("error querying msgindex database: %w", err) + msgInfo := MsgInfo{ + Message: mCid, + TipSet: tsCid, + Epoch: abi.ChainEpoch(epoch), } - tipsetCid, err := cid.Decode(tipset) + // fetch execution tipset of message (if it exists) + // + var xTipset string + err = x.selectTipsetInfo.QueryRow(epoch + 1).Scan(&xTipset) + if err != nil { + if err == sql.ErrNoRows { + // execution tipset not in index, its fine + return msgInfo, cid.Undef, nil + } + + return MsgInfo{}, cid.Undef, xerrors.Errorf("error querying for execution tipset: %w", err) + } + xtsCid, err := cid.Decode(xTipset) if err != nil { - return MsgInfo{}, xerrors.Errorf("error decoding tipset cid: %w", err) + return MsgInfo{}, cid.Undef, xerrors.Errorf("error decoding execution tipset cid: %w", err) } - return MsgInfo{ - Message: m, - TipSet: tipsetCid, - Epoch: abi.ChainEpoch(epoch), - }, nil + return msgInfo, xtsCid, nil } func (x *msgIndex) Close() error { diff --git a/chain/index/msgindex_test.go b/chain/index/msgindex_test.go index 24f9b845f2b..5324db49794 100644 --- a/chain/index/msgindex_test.go +++ b/chain/index/msgindex_test.go @@ -157,7 +157,7 @@ func verifyIndex(t *testing.T, cs *mockChainStore, msgIndex MsgIndex) { msgs, err := cs.MessagesForTipset(context.Background(), ts) require.NoError(t, err) for _, m := range msgs { - minfo, err := msgIndex.GetMsgInfo(context.Background(), m.Cid()) + minfo, ts, err := msgIndex.GetMsgInfo(context.Background(), m.Cid()) require.NoError(t, err) require.Equal(t, tsCid, minfo.TipSet) require.Equal(t, ts.Height(), minfo.Epoch) @@ -174,7 +174,7 @@ func verifyMissing(t *testing.T, cs *mockChainStore, msgIndex MsgIndex, missing msgs, err := cs.MessagesForTipset(context.Background(), ts) require.NoError(t, err) for _, m := range msgs { - _, err := msgIndex.GetMsgInfo(context.Background(), m.Cid()) + _, _, err := msgIndex.GetMsgInfo(context.Background(), m.Cid()) require.Equal(t, ErrNotFound, err) } } diff --git a/chain/stmgr/searchwait.go b/chain/stmgr/searchwait.go index 356ace23c45..2749d3682d5 100644 --- a/chain/stmgr/searchwait.go +++ b/chain/stmgr/searchwait.go @@ -190,7 +190,7 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, head *types.TipSet } func (sm *StateManager) searchForIndexedMsg(ctx context.Context, mcid cid.Cid, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) { - minfo, err := sm.msgIndex.GetMsgInfo(ctx, mcid) + minfo, xtsCid, err := sm.msgIndex.GetMsgInfo(ctx, mcid) if err != nil { return nil, nil, cid.Undef, xerrors.Errorf("error looking up message in index: %w", err) } @@ -203,10 +203,18 @@ func (sm *StateManager) searchForIndexedMsg(ctx context.Context, mcid cid.Cid, m } // now get the execution tipset - // TODO optimization: the index should have it implicitly so we can return it in the msginfo. - xts, err := sm.cs.GetTipsetByHeight(ctx, minfo.Epoch+1, curTs, false) - if err != nil { - return nil, nil, cid.Undef, xerrors.Errorf("error looking up execution tipset: %w", err) + var xts *types.TipSet = nil + if xtsCid != cid.Undef { + // lookup by cid which is faster + xts, err = sm.cs.GetTipSetByCid(ctx, xtsCid) + if err != nil { + return nil, nil, cid.Undef, xerrors.Errorf("error calling GetTipSetByCid: %w", err) + } + } else { + xts, err = sm.cs.GetTipsetByHeight(ctx, minfo.Epoch+1, curTs, false) + if err != nil { + return nil, nil, cid.Undef, xerrors.Errorf("error calling GetTipsetByHeight: %w", err) + } } // check that the parent of the execution index is indeed the inclusion tipset