diff --git a/abci/client/local_client.go b/abci/client/local_client.go index c595d5639..4ed78e8cf 100644 --- a/abci/client/local_client.go +++ b/abci/client/local_client.go @@ -87,12 +87,24 @@ func (app *localClient) DeliverTxAsync(params types.RequestDeliverTx) *ReqRes { ) } -func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes { - res := app.Application.CheckTx(req) - return app.callback( - types.ToRequestCheckTx(req), - types.ToResponseCheckTx(res), - ) +func (app *localClient) CheckTxAsync(params types.RequestCheckTx) *ReqRes { + req := types.ToRequestCheckTx(params) + reqRes := NewReqRes(req) + + app.Application.CheckTxAsync(params, func(r types.ResponseCheckTx) { + res := types.ToResponseCheckTx(r) + app.Callback(req, res) + reqRes.Response = res + reqRes.Done() + reqRes.SetDone() + + // Notify reqRes listener if set + if cb := reqRes.GetCallback(); cb != nil { + cb(res) + } + }) + + return reqRes } func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes { @@ -201,7 +213,7 @@ func (app *localClient) DeliverTxSync(req types.RequestDeliverTx) (*types.Respon } func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) { - res := app.Application.CheckTx(req) + res := app.Application.CheckTxSync(req) return &res, nil } @@ -265,6 +277,7 @@ func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRe func newLocalReqRes(req *types.Request, res *types.Response) *ReqRes { reqRes := NewReqRes(req) reqRes.Response = res + reqRes.Done() reqRes.SetDone() return reqRes } diff --git a/abci/example/counter/counter.go b/abci/example/counter/counter.go index 58f8aabb9..18d4788b2 100644 --- a/abci/example/counter/counter.go +++ b/abci/example/counter/counter.go @@ -62,7 +62,15 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli return types.ResponseDeliverTx{Code: code.CodeTypeOK} } -func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx { +func (app *Application) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx { + return app.checkTx(req) +} + +func (app *Application) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) { + callback(app.checkTx(req)) +} + +func (app *Application) checkTx(req types.RequestCheckTx) types.ResponseCheckTx { if app.serial { if len(req.Tx) > 8 { return types.ResponseCheckTx{ diff --git a/abci/example/kvstore/kvstore.go b/abci/example/kvstore/kvstore.go index 42f00231f..dbdf30e2b 100644 --- a/abci/example/kvstore/kvstore.go +++ b/abci/example/kvstore/kvstore.go @@ -109,7 +109,15 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli return types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: events} } -func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx { +func (app *Application) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx { + return app.checkTx(req) +} + +func (app *Application) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) { + callback(app.checkTx(req)) +} + +func (app *Application) checkTx(req types.RequestCheckTx) types.ResponseCheckTx { return types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1} } diff --git a/abci/example/kvstore/persistent_kvstore.go b/abci/example/kvstore/persistent_kvstore.go index 1cad91443..d74b6249f 100644 --- a/abci/example/kvstore/persistent_kvstore.go +++ b/abci/example/kvstore/persistent_kvstore.go @@ -80,8 +80,12 @@ func (app *PersistentKVStoreApplication) DeliverTx(req types.RequestDeliverTx) t return app.app.DeliverTx(req) } -func (app *PersistentKVStoreApplication) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx { - return app.app.CheckTx(req) +func (app *PersistentKVStoreApplication) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx { + return app.app.CheckTxSync(req) +} + +func (app *PersistentKVStoreApplication) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) { + app.app.CheckTxAsync(req, callback) } func (app *PersistentKVStoreApplication) BeginRecheckTx(req types.RequestBeginRecheckTx) types.ResponseBeginRecheckTx { diff --git a/abci/types/application.go b/abci/types/application.go index 8609ddbe5..28c383a8e 100644 --- a/abci/types/application.go +++ b/abci/types/application.go @@ -4,6 +4,8 @@ import ( context "golang.org/x/net/context" ) +type CheckTxCallback func(ResponseCheckTx) + // Application is an interface that enables any finite, deterministic state machine // to be driven by a blockchain-based replication engine via the ABCI. // All methods take a RequestXxx argument and return a ResponseXxx argument, @@ -15,7 +17,8 @@ type Application interface { Query(RequestQuery) ResponseQuery // Query for state // Mempool Connection - CheckTx(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool + CheckTxSync(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool + CheckTxAsync(RequestCheckTx, CheckTxCallback) // Asynchronously validate a tx for the mempool BeginRecheckTx(RequestBeginRecheckTx) ResponseBeginRecheckTx // Signals the beginning of rechecking EndRecheckTx(RequestEndRecheckTx) ResponseEndRecheckTx // Signals the end of rechecking @@ -51,10 +54,14 @@ func (BaseApplication) DeliverTx(req RequestDeliverTx) ResponseDeliverTx { return ResponseDeliverTx{Code: CodeTypeOK} } -func (BaseApplication) CheckTx(req RequestCheckTx) ResponseCheckTx { +func (BaseApplication) CheckTxSync(req RequestCheckTx) ResponseCheckTx { return ResponseCheckTx{Code: CodeTypeOK} } +func (BaseApplication) CheckTxAsync(req RequestCheckTx, callback CheckTxCallback) { + callback(ResponseCheckTx{Code: CodeTypeOK}) +} + func (BaseApplication) BeginRecheckTx(req RequestBeginRecheckTx) ResponseBeginRecheckTx { return ResponseBeginRecheckTx{Code: CodeTypeOK} } @@ -114,7 +121,7 @@ func (app *GRPCApplication) DeliverTx(ctx context.Context, req *RequestDeliverTx } func (app *GRPCApplication) CheckTx(ctx context.Context, req *RequestCheckTx) (*ResponseCheckTx, error) { - res := app.app.CheckTx(*req) + res := app.app.CheckTxSync(*req) return &res, nil } diff --git a/blockchain/v0/reactor_test.go b/blockchain/v0/reactor_test.go index a31c9a141..226eb47c7 100644 --- a/blockchain/v0/reactor_test.go +++ b/blockchain/v0/reactor_test.go @@ -374,10 +374,14 @@ func (app *testApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx return abci.ResponseDeliverTx{Events: []abci.Event{}} } -func (app *testApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { +func (app *testApp) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx { return abci.ResponseCheckTx{} } +func (app *testApp) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) { + callback(abci.ResponseCheckTx{}) +} + func (app *testApp) Commit() abci.ResponseCommit { return abci.ResponseCommit{} } diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 835e6875a..c2d9a6a5b 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -101,7 +101,7 @@ func deliverTxsRange(cs *State, start, end int) { for i := start; i < end; i++ { txBytes := make([]byte, 8) binary.BigEndian.PutUint64(txBytes, uint64(i)) - err := assertMempool(cs.txNotifier).CheckTx(txBytes, nil, mempl.TxInfo{}) + _, err := assertMempool(cs.txNotifier).CheckTxSync(txBytes, mempl.TxInfo{}) if err != nil { panic(fmt.Sprintf("Error after CheckTx: %v", err)) } @@ -161,13 +161,13 @@ func TestMempoolRmBadTx(t *testing.T) { // Try to send the tx through the mempool. // CheckTx should not err, but the app should return a bad abci code // and the tx should get removed from the pool - err := assertMempool(cs.txNotifier).CheckTx(txBytes, func(r *abci.Response) { + err := assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(r *abci.Response) { if r.GetCheckTx().Code != code.CodeTypeBadNonce { t.Errorf("expected checktx to return bad nonce, got %v", r) return } checkTxRespCh <- struct{}{} - }, mempl.TxInfo{}) + }) if err != nil { t.Errorf("error after CheckTx: %v", err) return @@ -233,7 +233,15 @@ func (app *CounterApplication) DeliverTx(req abci.RequestDeliverTx) abci.Respons return abci.ResponseDeliverTx{Code: code.CodeTypeOK} } -func (app *CounterApplication) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { +func (app *CounterApplication) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx { + return app.checkTx(req) +} + +func (app *CounterApplication) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) { + callback(app.checkTx(req)) +} + +func (app *CounterApplication) checkTx(req abci.RequestCheckTx) abci.ResponseCheckTx { txValue := txAsUint64(req.Tx) app.mempoolTxCountMtx.Lock() defer app.mempoolTxCountMtx.Unlock() diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index a0a8c9732..fff9f39d7 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -240,7 +240,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses) // send a tx - if err := assertMempool(css[3].txNotifier).CheckTx([]byte{1, 2, 3}, nil, mempl.TxInfo{}); err != nil { + if _, err := assertMempool(css[3].txNotifier).CheckTxSync([]byte{1, 2, 3}, mempl.TxInfo{}); err != nil { t.Error(err) } @@ -543,7 +543,7 @@ func waitForAndValidateBlock( err := validateBlock(newBlock, activeVals) assert.Nil(t, err) for _, tx := range txs { - err := assertMempool(css[j].txNotifier).CheckTx(tx, nil, mempl.TxInfo{}) + _, err := assertMempool(css[j].txNotifier).CheckTxSync(tx, mempl.TxInfo{}) assert.Nil(t, err) } }, css) diff --git a/consensus/replay_test.go b/consensus/replay_test.go index f886cdeeb..665c3b14a 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -107,7 +107,7 @@ func sendTxs(ctx context.Context, cs *State) { return default: tx := []byte{byte(i)} - assertMempool(cs.txNotifier).CheckTx(tx, nil, mempl.TxInfo{}) + assertMempool(cs.txNotifier).CheckTxSync(tx, mempl.TxInfo{}) i++ } } @@ -350,7 +350,7 @@ func TestSimulateValidatorsChange(t *testing.T) { require.NoError(t, err) valPubKey1ABCI := types.TM2PB.PubKey(newValidatorPubKey1) newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower) - err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, nil, mempl.TxInfo{}) + _, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx1, mempl.TxInfo{}) assert.Nil(t, err) propBlock, _ := css[0].createProposalBlock() //changeProposer(t, cs1, vs2) propBlockParts := propBlock.MakePartSet(partSize) @@ -376,7 +376,7 @@ func TestSimulateValidatorsChange(t *testing.T) { require.NoError(t, err) updatePubKey1ABCI := types.TM2PB.PubKey(updateValidatorPubKey1) updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25) - err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, nil, mempl.TxInfo{}) + _, err = assertMempool(css[0].txNotifier).CheckTxSync(updateValidatorTx1, mempl.TxInfo{}) assert.Nil(t, err) propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2) propBlockParts = propBlock.MakePartSet(partSize) @@ -402,13 +402,13 @@ func TestSimulateValidatorsChange(t *testing.T) { require.NoError(t, err) newVal2ABCI := types.TM2PB.PubKey(newValidatorPubKey2) newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower) - err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx2, nil, mempl.TxInfo{}) + _, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx2, mempl.TxInfo{}) assert.Nil(t, err) newValidatorPubKey3, err := css[nVals+2].privValidator.GetPubKey() require.NoError(t, err) newVal3ABCI := types.TM2PB.PubKey(newValidatorPubKey3) newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower) - err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, nil, mempl.TxInfo{}) + _, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx3, mempl.TxInfo{}) assert.Nil(t, err) propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2) propBlockParts = propBlock.MakePartSet(partSize) @@ -442,7 +442,7 @@ func TestSimulateValidatorsChange(t *testing.T) { ensureNewProposal(proposalCh, height, round) removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0) - err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx2, nil, mempl.TxInfo{}) + _, err = assertMempool(css[0].txNotifier).CheckTxSync(removeValidatorTx2, mempl.TxInfo{}) assert.Nil(t, err) rs = css[0].GetRoundState() @@ -472,7 +472,7 @@ func TestSimulateValidatorsChange(t *testing.T) { height++ incrementHeight(vss...) removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0) - err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, nil, mempl.TxInfo{}) + _, err = assertMempool(css[0].txNotifier).CheckTxSync(removeValidatorTx3, mempl.TxInfo{}) assert.Nil(t, err) propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2) propBlockParts = propBlock.MakePartSet(partSize) diff --git a/mempool/bench_test.go b/mempool/bench_test.go index d6f2d9ed2..0c2fdc7d3 100644 --- a/mempool/bench_test.go +++ b/mempool/bench_test.go @@ -18,7 +18,7 @@ func BenchmarkReap(b *testing.B) { for i := 0; i < size; i++ { tx := make([]byte, 8) binary.BigEndian.PutUint64(tx, uint64(i)) - mempool.CheckTx(tx, nil, TxInfo{}) + mempool.CheckTxSync(tx, TxInfo{}) } b.ResetTimer() for i := 0; i < b.N; i++ { @@ -26,7 +26,38 @@ func BenchmarkReap(b *testing.B) { } } -func BenchmarkCheckTx(b *testing.B) { +func BenchmarkReapWithCheckTxAsync(b *testing.B) { + app := kvstore.NewApplication() + cc := proxy.NewLocalClientCreator(app) + mempool, cleanup := newMempoolWithApp(cc) + defer cleanup() + + size := 10000 + for i := 0; i < size; i++ { + tx := make([]byte, 8) + binary.BigEndian.PutUint64(tx, uint64(i)) + mempool.CheckTxAsync(tx, TxInfo{}, nil) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + mempool.ReapMaxBytesMaxGas(100000000, 10000000) + } +} + +func BenchmarkCheckTxSync(b *testing.B) { + app := kvstore.NewApplication() + cc := proxy.NewLocalClientCreator(app) + mempool, cleanup := newMempoolWithApp(cc) + defer cleanup() + + for i := 0; i < b.N; i++ { + tx := make([]byte, 8) + binary.BigEndian.PutUint64(tx, uint64(i)) + mempool.CheckTxSync(tx, TxInfo{}) + } +} + +func BenchmarkCheckTxAsync(b *testing.B) { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) mempool, cleanup := newMempoolWithApp(cc) @@ -35,7 +66,7 @@ func BenchmarkCheckTx(b *testing.B) { for i := 0; i < b.N; i++ { tx := make([]byte, 8) binary.BigEndian.PutUint64(tx, uint64(i)) - mempool.CheckTx(tx, nil, TxInfo{}) + mempool.CheckTxAsync(tx, TxInfo{}, nil) } } diff --git a/mempool/cache_test.go b/mempool/cache_test.go index a4a8da7bb..9d089a2e0 100644 --- a/mempool/cache_test.go +++ b/mempool/cache_test.go @@ -58,7 +58,7 @@ func TestCacheAfterUpdate(t *testing.T) { for tcIndex, tc := range tests { for i := 0; i < tc.numTxsToCreate; i++ { tx := types.Tx{byte(i)} - err := mempool.CheckTx(tx, nil, TxInfo{}) + _, err := mempool.CheckTxSync(tx, TxInfo{}) require.NoError(t, err) } @@ -71,7 +71,7 @@ func TestCacheAfterUpdate(t *testing.T) { for _, v := range tc.reAddIndices { tx := types.Tx{byte(v)} - _ = mempool.CheckTx(tx, nil, TxInfo{}) + _, _ = mempool.CheckTxSync(tx, TxInfo{}) } cache := mempool.cache.(*mapTxCache) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 32f3269da..666fdceb8 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -1,7 +1,6 @@ package mempool import ( - "bytes" "container/list" "crypto/sha256" "fmt" @@ -52,12 +51,6 @@ type CListMempool struct { txs *clist.CList // concurrent linked-list of good txs proxyAppConn proxy.AppConnMempool - // Track whether we're rechecking txs. - // These are not protected by a mutex and are expected to be mutated in - // serial (ie. by abci responses which are called in serial). - recheckCursor *clist.CElement // next expected response - recheckEnd *clist.CElement // re-checking stops here - // Map for quick access to txs to record sender in CheckTx. // txsMap: txKey -> CElement txsMap sync.Map @@ -84,14 +77,12 @@ func NewCListMempool( options ...CListMempoolOption, ) *CListMempool { mempool := &CListMempool{ - config: config, - proxyAppConn: proxyAppConn, - txs: clist.New(), - height: height, - recheckCursor: nil, - recheckEnd: nil, - logger: log.NewNopLogger(), - metrics: NopMetrics(), + config: config, + proxyAppConn: proxyAppConn, + txs: clist.New(), + height: height, + logger: log.NewNopLogger(), + metrics: NopMetrics(), } if config.CacheSize > 0 { mempool.cache = newMapTxCache(config.CacheSize) @@ -210,17 +201,67 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} { return mem.txs.WaitChan() } +// It blocks if we're waiting on Update() or Reap(). +// Safe for concurrent use by multiple goroutines. +func (mem *CListMempool) CheckTxSync(tx types.Tx, txInfo TxInfo) (res *abci.Response, err error) { + mem.updateMtx.RLock() + // use defer to unlock mutex because application (*local client*) might panic + defer mem.updateMtx.RUnlock() + + if err = mem.prepareCheckTx(tx, txInfo); err != nil { + return res, err + } + + // CONTRACT: `app.CheckTxSync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas) + var r *abci.ResponseCheckTx + r, err = mem.proxyAppConn.CheckTxSync(abci.RequestCheckTx{Tx: tx}) + if err != nil { + return res, err + } + + // TODO refactor to pass a `pointer` directly + res = abci.ToResponseCheckTx(*r) + mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, res, nil) + return res, err +} + // It blocks if we're waiting on Update() or Reap(). // cb: A callback from the CheckTx command. // It gets called from another goroutine. // CONTRACT: Either cb will get called, or err returned. // // Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error { +func (mem *CListMempool) CheckTxAsync(tx types.Tx, txInfo TxInfo, cb func(*abci.Response)) (err error) { mem.updateMtx.RLock() // use defer to unlock mutex because application (*local client*) might panic - defer mem.updateMtx.RUnlock() + defer func() { + if err != nil { + mem.updateMtx.RUnlock() + return + } + + if r := recover(); r != nil { + mem.updateMtx.RUnlock() + panic(r) + } + }() + + if err = mem.prepareCheckTx(tx, txInfo); err != nil { + return err + } + // CONTRACT: `app.CheckTxAsync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas) + reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) + reqRes.SetCallback(func(res *abci.Response) { + mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, res, cb) + mem.updateMtx.RUnlock() + }) + + return err +} + +// CONTRACT: `caller` should held `mem.updateMtx.RLock()` +func (mem *CListMempool) prepareCheckTx(tx types.Tx, txInfo TxInfo) error { txSize := len(tx) if err := mem.isFull(txSize); err != nil { @@ -287,10 +328,6 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx return err } - // CONTRACT: `app.CheckTxAsync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas) - reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) - reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb)) - return nil } @@ -304,15 +341,18 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx // When rechecking, we don't need the peerID, so the recheck callback happens // here. func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) { - if mem.recheckCursor == nil { + checkTxReq := req.GetCheckTx() + if checkTxReq == nil { return } - mem.metrics.RecheckCount.Add(1) - mem.resCbRecheck(req, res) + if checkTxReq.Type == abci.CheckTxType_Recheck { + mem.metrics.RecheckCount.Add(1) + mem.resCbRecheck(req, res) - // update metrics - mem.metrics.Size.Set(float64(mem.Size())) + // update metrics + mem.metrics.Size.Set(float64(mem.Size())) + } } // Request specific callback that should be set on individual reqRes objects @@ -328,23 +368,17 @@ func (mem *CListMempool) reqResCb( tx []byte, peerID uint16, peerP2PID p2p.ID, + res *abci.Response, externalCb func(*abci.Response), -) func(res *abci.Response) { - return func(res *abci.Response) { - if mem.recheckCursor != nil { - // this should never happen - panic("recheck cursor is not nil in reqResCb") - } - - mem.resCbFirstTime(tx, peerID, peerP2PID, res) +) { + mem.resCbFirstTime(tx, peerID, peerP2PID, res) - // update metrics - mem.metrics.Size.Set(float64(mem.Size())) + // update metrics + mem.metrics.Size.Set(float64(mem.Size())) - // passed in by the caller of CheckTx, eg. the RPC - if externalCb != nil { - externalCb(res) - } + // passed in by the caller of CheckTx, eg. the RPC + if externalCb != nil { + externalCb(res) } } @@ -467,34 +501,19 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { switch r := res.Value.(type) { case *abci.Response_CheckTx: tx := req.GetCheckTx().Tx - memTx := mem.recheckCursor.Value.(*mempoolTx) - if !bytes.Equal(tx, memTx.tx) { - panic(fmt.Sprintf( - "Unexpected tx response from proxy during recheck\nExpected %X, got %X", - memTx.tx, - tx)) - } - if r.CheckTx.Code == abci.CodeTypeOK { - // Good, nothing to do. - } else { - // Tx became invalidated due to newly committed block. - mem.logger.Info("Tx is no longer valid", "tx", txID(tx), "res", r) - // NOTE: we remove tx from the cache because it might be good later - mem.removeTx(tx, mem.recheckCursor, true) - } - if mem.recheckCursor == mem.recheckEnd { - mem.recheckCursor = nil - } else { - mem.recheckCursor = mem.recheckCursor.Next() - } - if mem.recheckCursor == nil { - // Done! - mem.logger.Info("Done rechecking txs") - - // incase the recheck removed all txs - if mem.Size() > 0 { - mem.notifyTxsAvailable() + txHash := txKey(tx) + if e, ok := mem.txsMap.Load(txHash); ok { + celem := e.(*clist.CElement) + if r.CheckTx.Code == abci.CodeTypeOK { + // Good, nothing to do. + } else { + // Tx became invalidated due to newly committed block. + mem.logger.Info("Tx is no longer valid", "tx", txID(tx), "res", r) + // NOTE: we remove tx from the cache because it might be good later + mem.removeTx(tx, celem, true) } + } else { + panic(fmt.Sprintf("Unexpected tx response from proxy during recheck\ntxHash=%s, tx=%X", txHash, tx)) } default: // ignore other messages @@ -637,11 +656,11 @@ func (mem *CListMempool) Update( // At this point, mem.txs are being rechecked. // mem.recheckCursor re-scans mem.txs and possibly removes some txs. // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. - } else { - // just notify there're some txs left. - if mem.Size() > 0 { - mem.notifyTxsAvailable() - } + } + + // notify there're some txs left. + if mem.Size() > 0 { + mem.notifyTxsAvailable() } // Update metrics @@ -655,18 +674,24 @@ func (mem *CListMempool) recheckTxs() { return } - mem.recheckCursor = mem.txs.Front() - mem.recheckEnd = mem.txs.Back() + wg := sync.WaitGroup{} // Push txs to proxyAppConn // NOTE: globalCb may be called concurrently. for e := mem.txs.Front(); e != nil; e = e.Next() { + wg.Add(1) + memTx := e.Value.(*mempoolTx) - mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{ + reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{ Tx: memTx.tx, Type: abci.CheckTxType_Recheck, }) + reqRes.SetCallback(func(res *abci.Response) { + wg.Done() + }) } + + wg.Wait() } //-------------------------------------------------------------------------------- diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index 1985b0dfe..cc9d60b8c 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -77,7 +77,7 @@ func checkTxs(t *testing.T, mempool Mempool, count int, peerID uint16) types.Txs if err != nil { t.Error(err) } - if err := mempool.CheckTx(txBytes, nil, txInfo); err != nil { + if _, err := mempool.CheckTxSync(txBytes, txInfo); err != nil { // Skip invalid txs. // TestMempoolFilters will fail otherwise. It asserts a number of txs // returned. @@ -100,7 +100,7 @@ func TestReapMaxBytesMaxGas(t *testing.T) { checkTxs(t, mempool, 1, UnknownPeerID) tx0 := mempool.TxsFront().Value.(*mempoolTx) // assert that kv store has gas wanted = 1. - require.Equal(t, app.CheckTx(abci.RequestCheckTx{Tx: tx0.tx}).GasWanted, int64(1), "KVStore had a gas value neq to 1") + require.Equal(t, app.CheckTxSync(abci.RequestCheckTx{Tx: tx0.tx}).GasWanted, int64(1), "KVStore had a gas value neq to 1") require.Equal(t, tx0.gasWanted, int64(1), "transactions gas was set incorrectly") // ensure each tx is 20 bytes long require.Equal(t, len(tx0.tx), 20, "Tx is longer than 20 bytes") @@ -178,7 +178,7 @@ func TestMempoolUpdate(t *testing.T) { // 1. Adds valid txs to the cache { mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x01}}), abciResponses(1, abci.CodeTypeOK), nil) - err := mempool.CheckTx([]byte{0x01}, nil, TxInfo{}) + _, err := mempool.CheckTxSync([]byte{0x01}, TxInfo{}) if assert.Error(t, err) { assert.Equal(t, ErrTxInCache, err) } @@ -186,7 +186,7 @@ func TestMempoolUpdate(t *testing.T) { // 2. Removes valid txs from the mempool { - err := mempool.CheckTx([]byte{0x02}, nil, TxInfo{}) + _, err := mempool.CheckTxSync([]byte{0x02}, TxInfo{}) require.NoError(t, err) mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x02}}), abciResponses(1, abci.CodeTypeOK), nil) assert.Zero(t, mempool.Size()) @@ -194,12 +194,12 @@ func TestMempoolUpdate(t *testing.T) { // 3. Removes invalid transactions from the cache and the mempool (if present) { - err := mempool.CheckTx([]byte{0x03}, nil, TxInfo{}) + _, err := mempool.CheckTxSync([]byte{0x03}, TxInfo{}) require.NoError(t, err) mempool.Update(newTestBlock(1, []types.Tx{[]byte{0x03}}), abciResponses(1, 1), nil) assert.Zero(t, mempool.Size()) - err = mempool.CheckTx([]byte{0x03}, nil, TxInfo{}) + _, err = mempool.CheckTxSync([]byte{0x03}, TxInfo{}) assert.NoError(t, err) } } @@ -269,7 +269,7 @@ func TestSerialReap(t *testing.T) { // This will succeed txBytes := make([]byte, 8) binary.BigEndian.PutUint64(txBytes, uint64(i)) - err := mempool.CheckTx(txBytes, nil, TxInfo{}) + _, err := mempool.CheckTxSync(txBytes, TxInfo{}) _, cached := cacheMap[string(txBytes)] if cached { require.NotNil(t, err, "expected error for cached tx") @@ -279,7 +279,7 @@ func TestSerialReap(t *testing.T) { cacheMap[string(txBytes)] = struct{}{} // Duplicates are cached and should return error - err = mempool.CheckTx(txBytes, nil, TxInfo{}) + _, err = mempool.CheckTxSync(txBytes, TxInfo{}) require.NotNil(t, err, "Expected error after CheckTx on duplicated tx") } } @@ -385,7 +385,7 @@ func TestMempoolCloseWAL(t *testing.T) { require.Equal(t, 1, len(m2), "expecting the wal match in") // 5. Write some contents to the WAL - mempool.CheckTx(types.Tx([]byte("foo")), nil, TxInfo{}) + mempool.CheckTxSync(types.Tx([]byte("foo")), TxInfo{}) walFilepath := mempool.wal.Path sum1 := checksumFile(walFilepath, t) @@ -395,7 +395,7 @@ func TestMempoolCloseWAL(t *testing.T) { // 7. Invoke CloseWAL() and ensure it discards the // WAL thus any other write won't go through. mempool.CloseWAL() - mempool.CheckTx(types.Tx([]byte("bar")), nil, TxInfo{}) + mempool.CheckTxSync(types.Tx([]byte("bar")), TxInfo{}) sum2 := checksumFile(walFilepath, t) require.Equal(t, sum1, sum2, "expected no change to the WAL after invoking CloseWAL() since it was discarded") @@ -448,7 +448,7 @@ func TestMempoolMaxMsgSize(t *testing.T) { caseString := fmt.Sprintf("case %d, len %d", i, testCase.len) tx := tmrand.Bytes(testCase.len) - err := mempl.CheckTx(tx, nil, TxInfo{}) + _, err := mempl.CheckTxSync(tx, TxInfo{}) msg := &TxMessage{tx} encoded := cdc.MustMarshalBinaryBare(msg) require.Equal(t, len(encoded), txMessageSize(tx), caseString) @@ -475,7 +475,7 @@ func TestMempoolTxsBytes(t *testing.T) { assert.EqualValues(t, 0, mempool.TxsBytes()) // 2. len(tx) after CheckTx - err := mempool.CheckTx([]byte{0x01}, nil, TxInfo{}) + _, err := mempool.CheckTxSync([]byte{0x01}, TxInfo{}) require.NoError(t, err) assert.EqualValues(t, 1, mempool.TxsBytes()) @@ -484,7 +484,7 @@ func TestMempoolTxsBytes(t *testing.T) { assert.EqualValues(t, 0, mempool.TxsBytes()) // 4. zero after Flush - err = mempool.CheckTx([]byte{0x02, 0x03}, nil, TxInfo{}) + _, err = mempool.CheckTxSync([]byte{0x02, 0x03}, TxInfo{}) require.NoError(t, err) assert.EqualValues(t, 2, mempool.TxsBytes()) @@ -492,9 +492,9 @@ func TestMempoolTxsBytes(t *testing.T) { assert.EqualValues(t, 0, mempool.TxsBytes()) // 5. ErrMempoolIsFull is returned when/if MaxTxsBytes limit is reached. - err = mempool.CheckTx([]byte{0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04}, nil, TxInfo{}) + _, err = mempool.CheckTxSync([]byte{0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04}, TxInfo{}) require.NoError(t, err) - err = mempool.CheckTx([]byte{0x05}, nil, TxInfo{}) + _, err = mempool.CheckTxSync([]byte{0x05}, TxInfo{}) if assert.Error(t, err) { assert.IsType(t, ErrMempoolIsFull{}, err) } @@ -508,7 +508,7 @@ func TestMempoolTxsBytes(t *testing.T) { txBytes := make([]byte, 8) binary.BigEndian.PutUint64(txBytes, uint64(0)) - err = mempool.CheckTx(txBytes, nil, TxInfo{}) + _, err = mempool.CheckTxSync(txBytes, TxInfo{}) require.NoError(t, err) assert.EqualValues(t, 8, mempool.TxsBytes()) @@ -559,7 +559,7 @@ func TestMempoolRemoteAppConcurrency(t *testing.T) { tx := txs[txNum] // this will err with ErrTxInCache many times ... - mempool.CheckTx(tx, nil, TxInfo{SenderID: uint16(peerID)}) + mempool.CheckTxSync(tx, TxInfo{SenderID: uint16(peerID)}) } } diff --git a/mempool/mempool.go b/mempool/mempool.go index b75ee81a6..93603b7a9 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -15,7 +15,8 @@ import ( type Mempool interface { // CheckTx executes a new transaction against the application to determine // its validity and whether it should be added to the mempool. - CheckTx(tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error + CheckTxSync(tx types.Tx, txInfo TxInfo) (*abci.Response, error) + CheckTxAsync(tx types.Tx, txInfo TxInfo, callback func(*abci.Response)) error // ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes // bytes total with the condition that the total gasWanted must be less than diff --git a/mempool/reactor.go b/mempool/reactor.go index fda12c021..79bd18fdc 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -174,7 +174,7 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { if src != nil { txInfo.SenderP2PID = src.ID() } - err := memR.mempool.CheckTx(msg.Tx, nil, txInfo) + err := memR.mempool.CheckTxAsync(msg.Tx, txInfo, nil) if err != nil { memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err) } diff --git a/mock/mempool.go b/mock/mempool.go index 2555e8452..ebedcc05a 100644 --- a/mock/mempool.go +++ b/mock/mempool.go @@ -15,7 +15,10 @@ var _ mempl.Mempool = Mempool{} func (Mempool) Lock() {} func (Mempool) Unlock() {} func (Mempool) Size() int { return 0 } -func (Mempool) CheckTx(_ types.Tx, _ func(*abci.Response), _ mempl.TxInfo) error { +func (Mempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, error) { + return nil, nil +} +func (Mempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(*abci.Response)) error { return nil } func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} } diff --git a/node/node_test.go b/node/node_test.go index 3a20893fb..936114450 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -267,7 +267,7 @@ func TestCreateProposalBlock(t *testing.T) { txLength := 1000 for i := 0; i < maxBytes/txLength; i++ { tx := tmrand.Bytes(txLength) - err := mempool.CheckTx(tx, nil, mempl.TxInfo{}) + _, err := mempool.CheckTxSync(tx, mempl.TxInfo{}) assert.NoError(t, err) } diff --git a/proxy/app_conn.go b/proxy/app_conn.go index 8f322cc89..aa209096b 100644 --- a/proxy/app_conn.go +++ b/proxy/app_conn.go @@ -24,6 +24,7 @@ type AppConnMempool interface { SetResponseCallback(abcicli.Callback) Error() error + CheckTxSync(types.RequestCheckTx) (*types.ResponseCheckTx, error) CheckTxAsync(types.RequestCheckTx) *abcicli.ReqRes BeginRecheckTxSync(types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) @@ -102,6 +103,10 @@ func (app *appConnMempool) Error() error { return app.appConn.Error() } +func (app *appConnMempool) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) { + return app.appConn.CheckTxSync(req) +} + func (app *appConnMempool) CheckTxAsync(req types.RequestCheckTx) *abcicli.ReqRes { return app.appConn.CheckTxAsync(req) } diff --git a/rpc/client/mock/abci.go b/rpc/client/mock/abci.go index ebee8b4e8..3da17d77e 100644 --- a/rpc/client/mock/abci.go +++ b/rpc/client/mock/abci.go @@ -48,7 +48,7 @@ func (a ABCIApp) ABCIQueryWithOptions( // TODO: Make it wait for a commit and set res.Height appropriately. func (a ABCIApp) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { res := ctypes.ResultBroadcastTxCommit{} - res.CheckTx = a.App.CheckTx(abci.RequestCheckTx{Tx: tx}) + res.CheckTx = a.App.CheckTxSync(abci.RequestCheckTx{Tx: tx}) if res.CheckTx.IsErr() { return &res, nil } @@ -58,11 +58,13 @@ func (a ABCIApp) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit } func (a ABCIApp) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - c := a.App.CheckTx(abci.RequestCheckTx{Tx: tx}) + chRes := make(chan abci.ResponseCheckTx, 1) + a.App.CheckTxAsync(abci.RequestCheckTx{Tx: tx}, func(res abci.ResponseCheckTx) { + chRes <- res + }) + c := <-chRes // and this gets written in a background thread... - if !c.IsErr() { - go func() { a.App.DeliverTx(abci.RequestDeliverTx{Tx: tx}) }() // nolint: errcheck - } + go func() { a.App.DeliverTx(abci.RequestDeliverTx{Tx: tx}) }() // nolint: errcheck return &ctypes.ResultBroadcastTx{ Code: c.Code, Data: c.Data, @@ -73,7 +75,7 @@ func (a ABCIApp) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error } func (a ABCIApp) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - c := a.App.CheckTx(abci.RequestCheckTx{Tx: tx}) + c := a.App.CheckTxSync(abci.RequestCheckTx{Tx: tx}) // and this gets written in a background thread... if !c.IsErr() { go func() { a.App.DeliverTx(abci.RequestDeliverTx{Tx: tx}) }() // nolint: errcheck diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index f4936bd3b..1fd3dfeaf 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -336,7 +336,7 @@ func TestUnconfirmedTxs(t *testing.T) { _, _, tx := MakeTxKV() mempool := node.Mempool() - _ = mempool.CheckTx(tx, nil, mempl.TxInfo{}) + _, _ = mempool.CheckTxSync(tx, mempl.TxInfo{}) for i, c := range GetClients() { mc, ok := c.(client.MempoolClient) @@ -357,7 +357,7 @@ func TestNumUnconfirmedTxs(t *testing.T) { _, _, tx := MakeTxKV() mempool := node.Mempool() - _ = mempool.CheckTx(tx, nil, mempl.TxInfo{}) + _, _ = mempool.CheckTxSync(tx, mempl.TxInfo{}) mempoolSize := mempool.Size() for i, c := range GetClients() { diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 2c417c407..517550d2c 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -21,7 +21,7 @@ import ( // CheckTx nor DeliverTx results. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - err := env.Mempool.CheckTx(tx, nil, mempl.TxInfo{}) + err := env.Mempool.CheckTxAsync(tx, mempl.TxInfo{}, nil) if err != nil { return nil, err @@ -33,14 +33,10 @@ func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadca // DeliverTx result. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - resCh := make(chan *abci.Response, 1) - err := env.Mempool.CheckTx(tx, func(res *abci.Response) { - resCh <- res - }, mempl.TxInfo{}) + res, err := env.Mempool.CheckTxSync(tx, mempl.TxInfo{}) if err != nil { return nil, err } - res := <-resCh r := res.GetCheckTx() return &ctypes.ResultBroadcastTx{ Code: r.Code, @@ -74,16 +70,12 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc } defer env.EventBus.Unsubscribe(context.Background(), subscriber, q) - // Broadcast tx and wait for CheckTx result - checkTxResCh := make(chan *abci.Response, 1) - err = env.Mempool.CheckTx(tx, func(res *abci.Response) { - checkTxResCh <- res - }, mempl.TxInfo{}) + // Broadcast tx and check tx + checkTxResMsg, err := env.Mempool.CheckTxSync(tx, mempl.TxInfo{}) if err != nil { env.Logger.Error("Error on broadcastTxCommit", "err", err) return nil, fmt.Errorf("error on broadcastTxCommit: %v", err) } - checkTxResMsg := <-checkTxResCh checkTxRes := checkTxResMsg.GetCheckTx() if checkTxRes.Code != abci.CodeTypeOK { return &ctypes.ResultBroadcastTxCommit{ diff --git a/state/helpers_test.go b/state/helpers_test.go index a85e35748..4565c92b6 100644 --- a/state/helpers_test.go +++ b/state/helpers_test.go @@ -274,10 +274,14 @@ func (app *testApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx return abci.ResponseDeliverTx{Events: []abci.Event{}} } -func (app *testApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { +func (app *testApp) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx { return abci.ResponseCheckTx{} } +func (app *testApp) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) { + callback(abci.ResponseCheckTx{}) +} + func (app *testApp) Commit() abci.ResponseCommit { return abci.ResponseCommit{} }