Skip to content

Commit

Permalink
integration with abci 0.38 Finalize Block
Browse files Browse the repository at this point in the history
  • Loading branch information
0g-wh committed Nov 8, 2024
1 parent 26ce7ac commit 2b319e4
Show file tree
Hide file tree
Showing 51 changed files with 1,247 additions and 323 deletions.
424 changes: 285 additions & 139 deletions baseapp/abci.go

Large diffs are not rendered by default.

177 changes: 148 additions & 29 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ import (
"github.com/cosmos/gogoproto/proto"
"golang.org/x/exp/maps"

errorsmod "cosmossdk.io/errors"
"github.com/cosmos/cosmos-sdk/baseapp/oe"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/snapshots"
"github.com/cosmos/cosmos-sdk/store"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/telemetry"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/cosmos/cosmos-sdk/types/mempool"
Expand All @@ -35,12 +38,12 @@ type (
)

const (
runTxModeCheck runTxMode = iota // Check a transaction
runTxModeReCheck // Recheck a (pending) transaction after a commit
runTxModeSimulate // Simulate a transaction
runTxModeDeliver // Deliver a transaction
runTxPrepareProposal // Prepare a TM block proposal
runTxProcessProposal // Process a TM block proposal
runTxModeCheck runTxMode = iota // Check a transaction execModeCheck
runTxModeReCheck // Recheck a (pending) transaction after a commit execModeReCheck
runTxModeSimulate // Simulate a transaction execModeSimulate
runTxPrepareProposal // Prepare a TM block proposal execModePrepareProposal
runTxProcessProposal // Process a TM block proposal execModeProcessProposal
execModeFinalize // Finalize a block proposal
)

var _ abci.Application = (*BaseApp)(nil)
Expand Down Expand Up @@ -80,7 +83,7 @@ type BaseApp struct { //nolint: maligned
// checkState is set on InitChain and reset on Commit
// deliverState is set on InitChain and BeginBlock and set to nil on Commit
checkState *state // for CheckTx
deliverState *state // for DeliverTx
finalizeBlockState *state // for DeliverTx
processProposalState *state // for ProcessProposal
prepareProposalState *state // for PrepareProposal

Expand Down Expand Up @@ -144,6 +147,12 @@ type BaseApp struct { //nolint: maligned
abciListeners []ABCIListener

chainID string

// optimisticExec contains the context required for Optimistic Execution,
// including the goroutine handling.This is experimental and must be enabled
// by developers.
optimisticExec *oe.OptimisticExecution
preBlocker sdk.PreBlocker // logic to run before BeginBlocker
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down Expand Up @@ -431,9 +440,9 @@ func (app *BaseApp) setState(mode runTxMode, header tmproto.Header) {
// Minimum gas prices are also set. It is set on InitChain and reset on Commit.
baseState.ctx = baseState.ctx.WithIsCheckTx(true).WithMinGasPrices(app.minGasPrices)
app.checkState = baseState
case runTxModeDeliver:
case execModeFinalize:
// It is set on InitChain and BeginBlock and set to nil on Commit.
app.deliverState = baseState
app.finalizeBlockState = baseState
case runTxPrepareProposal:
// It is set on InitChain and Commit.
app.prepareProposalState = baseState
Expand Down Expand Up @@ -555,8 +564,8 @@ func validateBasicTxMsgs(msgs []sdk.Msg) error {
// if app is in runTxProcessProposal, and checkState otherwise.
func (app *BaseApp) getState(mode runTxMode) *state {
switch mode {
case runTxModeDeliver:
return app.deliverState
case execModeFinalize:
return app.finalizeBlockState

case runTxPrepareProposal:
return app.prepareProposalState
Expand Down Expand Up @@ -626,7 +635,7 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context
// Note, gas execution info is always returned. A reference to a Result is
// returned if the tx does not run out of gas and if all the messages are valid
// and execute successfully. An error is returned otherwise.
func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, priority int64, err error) {
func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
// NOTE: GasWanted should be returned by the AnteHandler. GasUsed is
// determined by the GasMeter. We need access to the context to get the gas
// meter, so we initialize upfront.
Expand All @@ -636,8 +645,8 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
ms := ctx.MultiStore()

// only run the tx if there is block gas remaining
if mode == runTxModeDeliver && ctx.BlockGasMeter().IsOutOfGas() {
return gInfo, nil, nil, 0, sdkerrors.Wrap(sdkerrors.ErrOutOfGas, "no block gas left to run tx")
if mode == execModeFinalize && ctx.BlockGasMeter().IsOutOfGas() {
return gInfo, nil, nil, errorsmod.Wrap(sdkerrors.ErrOutOfGas, "no block gas left to run tx")
}

defer func() {
Expand Down Expand Up @@ -670,18 +679,18 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
// NOTE: consumeBlockGas must exist in a separate defer function from the
// general deferred recovery function to recover from consumeBlockGas as it'll
// be executed first (deferred statements are executed as stack).
if mode == runTxModeDeliver {
if mode == execModeFinalize {
defer consumeBlockGas()
}

tx, err := app.txDecoder(txBytes)
if err != nil {
return sdk.GasInfo{}, nil, nil, 0, err
return sdk.GasInfo{}, nil, nil, err
}

msgs := tx.GetMsgs()
if err := validateBasicTxMsgs(msgs); err != nil {
return sdk.GasInfo{}, nil, nil, 0, err
return sdk.GasInfo{}, nil, nil, err
}

if app.anteHandler != nil {
Expand Down Expand Up @@ -717,23 +726,22 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
gasWanted = ctx.GasMeter().Limit()

if err != nil {
return gInfo, nil, nil, 0, err
return gInfo, nil, nil, err
}

priority = ctx.Priority()
msCache.Write()
anteEvents = events.ToABCIEvents()
}

if mode == runTxModeCheck {
err = app.mempool.Insert(ctx, tx)
if err != nil {
return gInfo, nil, anteEvents, priority, err
return gInfo, nil, anteEvents, err
}
} else if mode == runTxModeDeliver {
} else if mode == execModeFinalize {
err = app.mempool.Remove(tx)
if err != nil && !errors.Is(err, mempool.ErrTxNotFound) {
return gInfo, nil, anteEvents, priority,
return gInfo, nil, anteEvents,
fmt.Errorf("failed to remove tx from mempool: %w", err)
}
}
Expand All @@ -759,26 +767,26 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re

newCtx, err := app.postHandler(postCtx, tx, mode == runTxModeSimulate, err == nil)
if err != nil {
return gInfo, nil, anteEvents, priority, err
return gInfo, nil, anteEvents, err
}

result.Events = append(result.Events, newCtx.EventManager().ABCIEvents()...)
}

if mode == runTxModeDeliver {
if mode == execModeFinalize {
// When block gas exceeds, it'll panic and won't commit the cached store.
consumeBlockGas()

msCache.Write()
}

if len(anteEvents) > 0 && (mode == runTxModeDeliver || mode == runTxModeSimulate) {
if len(anteEvents) > 0 && (mode == execModeFinalize || mode == runTxModeSimulate) {
// append the events in the order of occurrence
result.Events = append(anteEvents, result.Events...)
}
}

return gInfo, result, anteEvents, priority, err
return gInfo, result, anteEvents, err
}

// runMsgs iterates through a list of messages and executes them with the provided
Expand All @@ -793,7 +801,7 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, mode runTxMode) (*s

// NOTE: GasWanted is determined by the AnteHandler and GasUsed by the GasMeter.
for i, msg := range msgs {
if mode != runTxModeDeliver && mode != runTxModeSimulate {
if mode != execModeFinalize && mode != runTxModeSimulate {
break
}

Expand Down Expand Up @@ -884,7 +892,7 @@ func (app *BaseApp) PrepareProposalVerifyTx(tx sdk.Tx) ([]byte, error) {
return nil, err
}

_, _, _, _, err = app.runTx(runTxPrepareProposal, bz) //nolint:dogsled
_, _, _, err = app.runTx(runTxPrepareProposal, bz) //nolint:dogsled
if err != nil {
return nil, err
}
Expand All @@ -903,7 +911,7 @@ func (app *BaseApp) ProcessProposalVerifyTx(txBz []byte) (sdk.Tx, error) {
return nil, err
}

_, _, _, _, err = app.runTx(runTxProcessProposal, txBz) //nolint:dogsled
_, _, _, err = app.runTx(runTxProcessProposal, txBz) //nolint:dogsled
if err != nil {
return nil, err
}
Expand All @@ -915,3 +923,114 @@ func (app *BaseApp) ProcessProposalVerifyTx(txBz []byte) (sdk.Tx, error) {
func (app *BaseApp) Close() error {
return nil
}

func (app *BaseApp) preBlock(req *abci.RequestFinalizeBlock) ([]abci.Event, error) {
var events []abci.Event
if app.preBlocker != nil {
ctx := app.finalizeBlockState.Context()
rsp, err := app.preBlocker(ctx, req)
if err != nil {
return nil, err
}
// rsp.ConsensusParamsChanged is true from preBlocker means ConsensusParams in store get changed
// write the consensus parameters in store to context
if rsp.ConsensusParamsChanged {
ctx = ctx.WithConsensusParams(app.GetConsensusParams(ctx))
// GasMeter must be set after we get a context with updated consensus params.
gasMeter := app.getBlockGasMeter(ctx)
ctx = ctx.WithBlockGasMeter(gasMeter)
app.finalizeBlockState.SetContext(ctx)
}
events = ctx.EventManager().ABCIEvents()
}
return events, nil
}

func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
gInfo := sdk.GasInfo{}
resultStr := "successful"

var resp *abci.ExecTxResult

defer func() {
telemetry.IncrCounter(1, "tx", "count")
telemetry.IncrCounter(1, "tx", resultStr)
telemetry.SetGauge(float32(gInfo.GasUsed), "tx", "gas", "used")
telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted")
}()

gInfo, result, anteEvents, err := app.runTx(execModeFinalize, tx)
if err != nil {
resultStr = "failed"
resp = sdkerrors.ResponseExecTxResultWithEvents(
err,
gInfo.GasWanted,
gInfo.GasUsed,
sdk.MarkEventsToIndex(anteEvents, app.indexEvents),
app.trace,
)
return resp
}

resp = &abci.ExecTxResult{
GasWanted: int64(gInfo.GasWanted),
GasUsed: int64(gInfo.GasUsed),
Log: result.Log,
Data: result.Data,
Events: sdk.MarkEventsToIndex(result.Events, app.indexEvents),
}

return resp
}

func (app *BaseApp) beginBlock(_ *abci.RequestFinalizeBlock) (sdk.BeginBlock, error) {
var (
resp sdk.BeginBlock
err error
)

if app.beginBlocker != nil {
resp, err = app.beginBlocker(app.finalizeBlockState.Context())
if err != nil {
return resp, err
}

// append BeginBlock attributes to all events in the EndBlock response
for i, event := range resp.Events {
resp.Events[i].Attributes = append(
event.Attributes,
abci.EventAttribute{Key: "mode", Value: "BeginBlock"},
)
}

resp.Events = sdk.MarkEventsToIndex(resp.Events, app.indexEvents)
}

return resp, nil
}

// endBlock is an application-defined function that is called after transactions
// have been processed in FinalizeBlock.
func (app *BaseApp) endBlock(_ sdk.Context) (sdk.EndBlock, error) {
var endblock sdk.EndBlock

if app.endBlocker != nil {
eb, err := app.endBlocker(app.finalizeBlockState.Context())
if err != nil {
return endblock, err
}

// append EndBlock attributes to all events in the EndBlock response
for i, event := range eb.Events {
eb.Events[i].Attributes = append(
event.Attributes,
abci.EventAttribute{Key: "mode", Value: "EndBlock"},
)
}

eb.Events = sdk.MarkEventsToIndex(eb.Events, app.indexEvents)
endblock = eb
}

return endblock, nil
}
Loading

0 comments on commit 2b319e4

Please sign in to comment.