Skip to content

Commit

Permalink
feat: concurrent recheckTx (#163)
Browse files Browse the repository at this point in the history
* chore: remove `recheckCursor/End`

* chore: decompose `mempool.CheckTx()` into `CheckTxSync()` and `CheckTxAsync()`

* chore: `localClient.CheckTxAsync()` calls `app.CheckTxAsync()`

* fix: tests failed to build

* fix: call `mem.reqResCb()` correctly

* chore: rename abci.Application.`CheckTx()` to `CheckTxSync()`

* chore: CListMempool.`recheckTxs()` waits all txs are rechecked

* chore: revise `mempool.CheckTxAsync()` to call `RUnlock()` in the callback

* fix: how to call `mem.resResCb()`

* chore: bench with `CheckTxAsync()`

* chore: revise `mem.reqResCb()`

* chore: `reqRes.Done()` in `newLocalReqRes()`

* fix: call `reqRes.Callback()` if set

* chore: revise calling order `reqRes.Done()` and `reqRes.SetDone()`
  • Loading branch information
jinsan-line authored Jan 21, 2021
1 parent 8316c03 commit 65d890b
Show file tree
Hide file tree
Showing 22 changed files with 265 additions and 150 deletions.
27 changes: 20 additions & 7 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,24 @@ func (app *localClient) DeliverTxAsync(params types.RequestDeliverTx) *ReqRes {
)
}

func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes {
res := app.Application.CheckTx(req)
return app.callback(
types.ToRequestCheckTx(req),
types.ToResponseCheckTx(res),
)
func (app *localClient) CheckTxAsync(params types.RequestCheckTx) *ReqRes {
req := types.ToRequestCheckTx(params)
reqRes := NewReqRes(req)

app.Application.CheckTxAsync(params, func(r types.ResponseCheckTx) {
res := types.ToResponseCheckTx(r)
app.Callback(req, res)
reqRes.Response = res
reqRes.Done()
reqRes.SetDone()

// Notify reqRes listener if set
if cb := reqRes.GetCallback(); cb != nil {
cb(res)
}
})

return reqRes
}

func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes {
Expand Down Expand Up @@ -201,7 +213,7 @@ func (app *localClient) DeliverTxSync(req types.RequestDeliverTx) (*types.Respon
}

func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
res := app.Application.CheckTx(req)
res := app.Application.CheckTxSync(req)
return &res, nil
}

Expand Down Expand Up @@ -265,6 +277,7 @@ func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRe
func newLocalReqRes(req *types.Request, res *types.Response) *ReqRes {
reqRes := NewReqRes(req)
reqRes.Response = res
reqRes.Done()
reqRes.SetDone()
return reqRes
}
10 changes: 9 additions & 1 deletion abci/example/counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,15 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli
return types.ResponseDeliverTx{Code: code.CodeTypeOK}
}

func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
func (app *Application) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx {
return app.checkTx(req)
}

func (app *Application) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) {
callback(app.checkTx(req))
}

func (app *Application) checkTx(req types.RequestCheckTx) types.ResponseCheckTx {
if app.serial {
if len(req.Tx) > 8 {
return types.ResponseCheckTx{
Expand Down
10 changes: 9 additions & 1 deletion abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,15 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli
return types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: events}
}

func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
func (app *Application) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx {
return app.checkTx(req)
}

func (app *Application) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) {
callback(app.checkTx(req))
}

func (app *Application) checkTx(req types.RequestCheckTx) types.ResponseCheckTx {
return types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}
}

Expand Down
8 changes: 6 additions & 2 deletions abci/example/kvstore/persistent_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,12 @@ func (app *PersistentKVStoreApplication) DeliverTx(req types.RequestDeliverTx) t
return app.app.DeliverTx(req)
}

func (app *PersistentKVStoreApplication) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
return app.app.CheckTx(req)
func (app *PersistentKVStoreApplication) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx {
return app.app.CheckTxSync(req)
}

func (app *PersistentKVStoreApplication) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) {
app.app.CheckTxAsync(req, callback)
}

func (app *PersistentKVStoreApplication) BeginRecheckTx(req types.RequestBeginRecheckTx) types.ResponseBeginRecheckTx {
Expand Down
13 changes: 10 additions & 3 deletions abci/types/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
context "golang.org/x/net/context"
)

type CheckTxCallback func(ResponseCheckTx)

// Application is an interface that enables any finite, deterministic state machine
// to be driven by a blockchain-based replication engine via the ABCI.
// All methods take a RequestXxx argument and return a ResponseXxx argument,
Expand All @@ -15,7 +17,8 @@ type Application interface {
Query(RequestQuery) ResponseQuery // Query for state

// Mempool Connection
CheckTx(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool
CheckTxSync(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool
CheckTxAsync(RequestCheckTx, CheckTxCallback) // Asynchronously validate a tx for the mempool
BeginRecheckTx(RequestBeginRecheckTx) ResponseBeginRecheckTx // Signals the beginning of rechecking
EndRecheckTx(RequestEndRecheckTx) ResponseEndRecheckTx // Signals the end of rechecking

Expand Down Expand Up @@ -51,10 +54,14 @@ func (BaseApplication) DeliverTx(req RequestDeliverTx) ResponseDeliverTx {
return ResponseDeliverTx{Code: CodeTypeOK}
}

func (BaseApplication) CheckTx(req RequestCheckTx) ResponseCheckTx {
func (BaseApplication) CheckTxSync(req RequestCheckTx) ResponseCheckTx {
return ResponseCheckTx{Code: CodeTypeOK}
}

func (BaseApplication) CheckTxAsync(req RequestCheckTx, callback CheckTxCallback) {
callback(ResponseCheckTx{Code: CodeTypeOK})
}

func (BaseApplication) BeginRecheckTx(req RequestBeginRecheckTx) ResponseBeginRecheckTx {
return ResponseBeginRecheckTx{Code: CodeTypeOK}
}
Expand Down Expand Up @@ -114,7 +121,7 @@ func (app *GRPCApplication) DeliverTx(ctx context.Context, req *RequestDeliverTx
}

func (app *GRPCApplication) CheckTx(ctx context.Context, req *RequestCheckTx) (*ResponseCheckTx, error) {
res := app.app.CheckTx(*req)
res := app.app.CheckTxSync(*req)
return &res, nil
}

Expand Down
6 changes: 5 additions & 1 deletion blockchain/v0/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,14 @@ func (app *testApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx
return abci.ResponseDeliverTx{Events: []abci.Event{}}
}

func (app *testApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
func (app *testApp) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx {
return abci.ResponseCheckTx{}
}

func (app *testApp) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) {
callback(abci.ResponseCheckTx{})
}

func (app *testApp) Commit() abci.ResponseCommit {
return abci.ResponseCommit{}
}
Expand Down
16 changes: 12 additions & 4 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func deliverTxsRange(cs *State, start, end int) {
for i := start; i < end; i++ {
txBytes := make([]byte, 8)
binary.BigEndian.PutUint64(txBytes, uint64(i))
err := assertMempool(cs.txNotifier).CheckTx(txBytes, nil, mempl.TxInfo{})
_, err := assertMempool(cs.txNotifier).CheckTxSync(txBytes, mempl.TxInfo{})
if err != nil {
panic(fmt.Sprintf("Error after CheckTx: %v", err))
}
Expand Down Expand Up @@ -161,13 +161,13 @@ func TestMempoolRmBadTx(t *testing.T) {
// Try to send the tx through the mempool.
// CheckTx should not err, but the app should return a bad abci code
// and the tx should get removed from the pool
err := assertMempool(cs.txNotifier).CheckTx(txBytes, func(r *abci.Response) {
err := assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(r *abci.Response) {
if r.GetCheckTx().Code != code.CodeTypeBadNonce {
t.Errorf("expected checktx to return bad nonce, got %v", r)
return
}
checkTxRespCh <- struct{}{}
}, mempl.TxInfo{})
})
if err != nil {
t.Errorf("error after CheckTx: %v", err)
return
Expand Down Expand Up @@ -233,7 +233,15 @@ func (app *CounterApplication) DeliverTx(req abci.RequestDeliverTx) abci.Respons
return abci.ResponseDeliverTx{Code: code.CodeTypeOK}
}

func (app *CounterApplication) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
func (app *CounterApplication) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx {
return app.checkTx(req)
}

func (app *CounterApplication) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) {
callback(app.checkTx(req))
}

func (app *CounterApplication) checkTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
txValue := txAsUint64(req.Tx)
app.mempoolTxCountMtx.Lock()
defer app.mempoolTxCountMtx.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)

// send a tx
if err := assertMempool(css[3].txNotifier).CheckTx([]byte{1, 2, 3}, nil, mempl.TxInfo{}); err != nil {
if _, err := assertMempool(css[3].txNotifier).CheckTxSync([]byte{1, 2, 3}, mempl.TxInfo{}); err != nil {
t.Error(err)
}

Expand Down Expand Up @@ -543,7 +543,7 @@ func waitForAndValidateBlock(
err := validateBlock(newBlock, activeVals)
assert.Nil(t, err)
for _, tx := range txs {
err := assertMempool(css[j].txNotifier).CheckTx(tx, nil, mempl.TxInfo{})
_, err := assertMempool(css[j].txNotifier).CheckTxSync(tx, mempl.TxInfo{})
assert.Nil(t, err)
}
}, css)
Expand Down
14 changes: 7 additions & 7 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func sendTxs(ctx context.Context, cs *State) {
return
default:
tx := []byte{byte(i)}
assertMempool(cs.txNotifier).CheckTx(tx, nil, mempl.TxInfo{})
assertMempool(cs.txNotifier).CheckTxSync(tx, mempl.TxInfo{})
i++
}
}
Expand Down Expand Up @@ -350,7 +350,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
require.NoError(t, err)
valPubKey1ABCI := types.TM2PB.PubKey(newValidatorPubKey1)
newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx1, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ := css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts := propBlock.MakePartSet(partSize)
Expand All @@ -376,7 +376,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
require.NoError(t, err)
updatePubKey1ABCI := types.TM2PB.PubKey(updateValidatorPubKey1)
updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25)
err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(updateValidatorTx1, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
Expand All @@ -402,13 +402,13 @@ func TestSimulateValidatorsChange(t *testing.T) {
require.NoError(t, err)
newVal2ABCI := types.TM2PB.PubKey(newValidatorPubKey2)
newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx2, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx2, mempl.TxInfo{})
assert.Nil(t, err)
newValidatorPubKey3, err := css[nVals+2].privValidator.GetPubKey()
require.NoError(t, err)
newVal3ABCI := types.TM2PB.PubKey(newValidatorPubKey3)
newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx3, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
Expand Down Expand Up @@ -442,7 +442,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
ensureNewProposal(proposalCh, height, round)

removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx2, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(removeValidatorTx2, mempl.TxInfo{})
assert.Nil(t, err)

rs = css[0].GetRoundState()
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
height++
incrementHeight(vss...)
removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(removeValidatorTx3, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
Expand Down
37 changes: 34 additions & 3 deletions mempool/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,46 @@ func BenchmarkReap(b *testing.B) {
for i := 0; i < size; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTx(tx, nil, TxInfo{})
mempool.CheckTxSync(tx, TxInfo{})
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
mempool.ReapMaxBytesMaxGas(100000000, 10000000)
}
}

func BenchmarkCheckTx(b *testing.B) {
func BenchmarkReapWithCheckTxAsync(b *testing.B) {
app := kvstore.NewApplication()
cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithApp(cc)
defer cleanup()

size := 10000
for i := 0; i < size; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTxAsync(tx, TxInfo{}, nil)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
mempool.ReapMaxBytesMaxGas(100000000, 10000000)
}
}

func BenchmarkCheckTxSync(b *testing.B) {
app := kvstore.NewApplication()
cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithApp(cc)
defer cleanup()

for i := 0; i < b.N; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTxSync(tx, TxInfo{})
}
}

func BenchmarkCheckTxAsync(b *testing.B) {
app := kvstore.NewApplication()
cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithApp(cc)
Expand All @@ -35,7 +66,7 @@ func BenchmarkCheckTx(b *testing.B) {
for i := 0; i < b.N; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTx(tx, nil, TxInfo{})
mempool.CheckTxAsync(tx, TxInfo{}, nil)
}
}

Expand Down
4 changes: 2 additions & 2 deletions mempool/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestCacheAfterUpdate(t *testing.T) {
for tcIndex, tc := range tests {
for i := 0; i < tc.numTxsToCreate; i++ {
tx := types.Tx{byte(i)}
err := mempool.CheckTx(tx, nil, TxInfo{})
_, err := mempool.CheckTxSync(tx, TxInfo{})
require.NoError(t, err)
}

Expand All @@ -71,7 +71,7 @@ func TestCacheAfterUpdate(t *testing.T) {

for _, v := range tc.reAddIndices {
tx := types.Tx{byte(v)}
_ = mempool.CheckTx(tx, nil, TxInfo{})
_, _ = mempool.CheckTxSync(tx, TxInfo{})
}

cache := mempool.cache.(*mapTxCache)
Expand Down
Loading

0 comments on commit 65d890b

Please sign in to comment.