Skip to content

Commit

Permalink
improved comments and dropped useless waits on ctx Done
Browse files Browse the repository at this point in the history
  • Loading branch information
abi87 committed Sep 30, 2024
1 parent 4f9dd82 commit bfe5ffc
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 59 deletions.
74 changes: 22 additions & 52 deletions mod/consensus/pkg/cometbft/service/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,20 @@ func (s *Service[LoggerT]) InitChain(
)
}

// On a new chain, we consider the init chain block height as 0, even though
// req.InitialHeight is 1 by default.
s.logger.Info(
"InitChain",
"initialHeight",
req.InitialHeight,
"chainID",
req.ChainId,
"initialHeight", req.InitialHeight,
"chainID", req.ChainId,
)

// Set the initial height, which will be used to determine if we are
// proposing or processing the first block or not.
// Set the initial height, which will be used to determine if we are proposing
// or processing the first block or not.
s.initialHeight = req.InitialHeight
if s.initialHeight == 0 {
s.initialHeight = 1
}

// if req.InitialHeight is > 1, then we set the initial version on all
// stores
// if req.InitialHeight is > 1, then we set the initial version on all stores
if req.InitialHeight > 1 {
if err := s.sm.CommitMultiStore().
SetInitialVersion(req.InitialHeight); err != nil {
Expand Down Expand Up @@ -141,8 +136,8 @@ func (s *Service[LoggerT]) InitChain(
}
}

// NOTE: We don't commit, but FinalizeBlock for block InitialHeight starts
// from this FinalizeBlockState.
// NOTE: We don't commit, but FinalizeBlock for block InitialHeight starts from
// this FinalizeBlockState.
return &cmtabci.InitChainResponse{
ConsensusParams: req.ConsensusParams,
Validators: resValidators,
Expand Down Expand Up @@ -211,6 +206,8 @@ func (s *Service[LoggerT]) PrepareProposal(
)
}

// Always reset state given that PrepareProposal can timeout
// and be called again in a subsequent round.
s.prepareProposalState = s.resetState()
s.prepareProposalState.SetContext(
s.getContextForProposal(
Expand Down Expand Up @@ -260,11 +257,11 @@ func (s *Service[LoggerT]) ProcessProposal(
)
}

// Since the application can get access to FinalizeBlock state and write to
// it, we must be sure to reset it in case ProcessProposal timeouts and is
// called again in a subsequent round. However, we only want to do this
// after we've processed the first block, as we want to avoid overwriting
// the finalizeState after state changes during InitChain.
// Since the application can get access to FinalizeBlock state and write to it,
// we must be sure to reset it in case ProcessProposal timeouts and is called
// again in a subsequent round. However, we only want to do this after we've
// processed the first block, as we want to avoid overwriting the finalizeState
// after state changes during InitChain.
s.processProposalState = s.resetState()
if req.Height > s.initialHeight {
s.finalizeBlockState = s.resetState()
Expand Down Expand Up @@ -302,41 +299,26 @@ func (s *Service[LoggerT]) ProcessProposal(
}

func (s *Service[LoggerT]) internalFinalizeBlock(
ctx context.Context,
req *cmtabci.FinalizeBlockRequest,
) (*cmtabci.FinalizeBlockResponse, error) {
if err := s.validateFinalizeBlockHeight(req); err != nil {
return nil, err
}

// finalizeBlockState should be set on InitChain or ProcessProposal. If it is
// nil, it means we are replaying this block and we need to set the state here
// given that during block replay ProcessProposal is not executed by CometBFT.
if s.finalizeBlockState == nil {
s.finalizeBlockState = s.resetState()
}

// First check for an abort signal after beginBlock, as it's the first place
// we spend any significant amount of time.
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

// Iterate over all raw transactions in the proposal and attempt to execute
// them, gathering the execution results.
//
// NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g.
// vote extensions, so skip those.
txResults := make([]*cmtabci.ExecTxResult, 0, len(req.Txs))
for range req.Txs {
// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

//nolint:mnd // its okay for now.
txResults = append(txResults, &cmtabci.ExecTxResult{
Codespace: "sdk",
Expand All @@ -363,15 +345,6 @@ func (s *Service[LoggerT]) internalFinalizeBlock(
return nil, err
}

// check after finalizeBlock if we should abort, to avoid propagating the
// result
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

return &cmtabci.FinalizeBlockResponse{
TxResults: txResults,
ValidatorUpdates: valUpdates,
Expand Down Expand Up @@ -423,7 +396,7 @@ func (s *Service[_]) FinalizeBlock(
_ context.Context,
req *cmtabci.FinalizeBlockRequest,
) (*cmtabci.FinalizeBlockResponse, error) {
res, err := s.internalFinalizeBlock(context.Background(), req)
res, err := s.internalFinalizeBlock(req)
if res != nil {
res.AppHash = s.workingHash()
}
Expand All @@ -442,7 +415,7 @@ func (s *Service[LoggerT]) Commit(
context.Context, *cmtabci.CommitRequest,
) (*cmtabci.CommitResponse, error) {
if s.finalizeBlockState == nil {
return nil, fmt.Errorf("commit: %w", errNilFinalizeBlockState)
panic(fmt.Errorf("commit: %w", errNilFinalizeBlockState))
}
header := s.finalizeBlockState.Context().BlockHeader()
retainHeight := s.GetBlockRetentionHeight(header.Height)
Expand All @@ -451,16 +424,13 @@ func (s *Service[LoggerT]) Commit(
if ok {
rms.SetCommitHeader(header)
}

s.sm.CommitMultiStore().Commit()

resp := &cmtabci.CommitResponse{
RetainHeight: retainHeight,
}

s.finalizeBlockState = nil

return resp, nil
return &cmtabci.CommitResponse{
RetainHeight: retainHeight,
}, nil
}

// workingHash gets the apphash that will be finalized in commit.
Expand Down
23 changes: 16 additions & 7 deletions mod/consensus/pkg/cometbft/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,25 @@ type Service[
sm *statem.Manager
Middleware MiddlewareI

// prepareProposalState is used for PrepareProposal, which is set based on the
// previous block's state. This state is never committed. In case of multiple
// consensus rounds, the state is always reset to the previous block's state.
prepareProposalState *state

// processProposalState is used for ProcessProposal, which is set based on the
// previous block's state. This state is never committed. In case of multiple
// consensus rounds, the state is always reset to the previous block's state.
processProposalState *state
finalizeBlockState *state
interBlockCache storetypes.MultiStorePersistentCache
paramStore *params.ConsensusParamsStore

// initialHeight is used to determine if we are
// proposing or processing the first block or not.
// finalizeBlockState is used for FinalizeBlock, which is set based on the
// previous block's state. This state is committed. finalizeBlockState is set
// on InitChain and FinalizeBlock and set to nil on Commit.
finalizeBlockState *state

interBlockCache storetypes.MultiStorePersistentCache
paramStore *params.ConsensusParamsStore

// initialHeight is the initial height at which we start the node
initialHeight int64
minRetainBlocks uint64

Expand Down Expand Up @@ -152,8 +163,6 @@ func (s *Service[_]) Close() error {
_ = s.node.Stop()
}

// Close s.db (opened by cosmos-sdk/server/start.go call to openDB)

s.logger.Info("Closing application.db")
if err := s.sm.Close(); err != nil {
errs = append(errs, err)
Expand Down

0 comments on commit bfe5ffc

Please sign in to comment.