Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: concurrent recheckTx #163

Merged
merged 14 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,24 @@ func (app *localClient) DeliverTxAsync(params types.RequestDeliverTx) *ReqRes {
)
}

func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes {
res := app.Application.CheckTx(req)
return app.callback(
types.ToRequestCheckTx(req),
types.ToResponseCheckTx(res),
)
func (app *localClient) CheckTxAsync(params types.RequestCheckTx) *ReqRes {
req := types.ToRequestCheckTx(params)
reqRes := NewReqRes(req)

app.Application.CheckTxAsync(params, func(r types.ResponseCheckTx) {
res := types.ToResponseCheckTx(r)
app.Callback(req, res)
reqRes.Response = res
reqRes.Done()
reqRes.SetDone()

// Notify reqRes listener if set
if cb := reqRes.GetCallback(); cb != nil {
cb(res)
}
})

return reqRes
}

func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes {
Expand Down Expand Up @@ -201,7 +213,7 @@ func (app *localClient) DeliverTxSync(req types.RequestDeliverTx) (*types.Respon
}

func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
res := app.Application.CheckTx(req)
res := app.Application.CheckTxSync(req)
return &res, nil
}

Expand Down Expand Up @@ -265,6 +277,7 @@ func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRe
func newLocalReqRes(req *types.Request, res *types.Response) *ReqRes {
reqRes := NewReqRes(req)
reqRes.Response = res
reqRes.Done()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just asking out of curiosity, did you put this code in because there a problem with the absence of this code before? Or is it just a formality?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no bug behavior before but, as you can see as, it's not good in terms of consistency.

reqRes.SetDone()
return reqRes
}
10 changes: 9 additions & 1 deletion abci/example/counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,15 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli
return types.ResponseDeliverTx{Code: code.CodeTypeOK}
}

func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
func (app *Application) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx {
return app.checkTx(req)
}

func (app *Application) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) {
callback(app.checkTx(req))
}

func (app *Application) checkTx(req types.RequestCheckTx) types.ResponseCheckTx {
if app.serial {
if len(req.Tx) > 8 {
return types.ResponseCheckTx{
Expand Down
10 changes: 9 additions & 1 deletion abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,15 @@ func (app *Application) DeliverTx(req types.RequestDeliverTx) types.ResponseDeli
return types.ResponseDeliverTx{Code: code.CodeTypeOK, Events: events}
}

func (app *Application) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
func (app *Application) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx {
return app.checkTx(req)
}

func (app *Application) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) {
callback(app.checkTx(req))
}

func (app *Application) checkTx(req types.RequestCheckTx) types.ResponseCheckTx {
return types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}
}

Expand Down
8 changes: 6 additions & 2 deletions abci/example/kvstore/persistent_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,12 @@ func (app *PersistentKVStoreApplication) DeliverTx(req types.RequestDeliverTx) t
return app.app.DeliverTx(req)
}

func (app *PersistentKVStoreApplication) CheckTx(req types.RequestCheckTx) types.ResponseCheckTx {
return app.app.CheckTx(req)
func (app *PersistentKVStoreApplication) CheckTxSync(req types.RequestCheckTx) types.ResponseCheckTx {
return app.app.CheckTxSync(req)
}

func (app *PersistentKVStoreApplication) CheckTxAsync(req types.RequestCheckTx, callback types.CheckTxCallback) {
app.app.CheckTxAsync(req, callback)
}

func (app *PersistentKVStoreApplication) BeginRecheckTx(req types.RequestBeginRecheckTx) types.ResponseBeginRecheckTx {
Expand Down
13 changes: 10 additions & 3 deletions abci/types/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
context "golang.org/x/net/context"
)

type CheckTxCallback func(ResponseCheckTx)

// Application is an interface that enables any finite, deterministic state machine
// to be driven by a blockchain-based replication engine via the ABCI.
// All methods take a RequestXxx argument and return a ResponseXxx argument,
Expand All @@ -15,7 +17,8 @@ type Application interface {
Query(RequestQuery) ResponseQuery // Query for state

// Mempool Connection
CheckTx(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool
CheckTxSync(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool
CheckTxAsync(RequestCheckTx, CheckTxCallback) // Asynchronously validate a tx for the mempool
BeginRecheckTx(RequestBeginRecheckTx) ResponseBeginRecheckTx // Signals the beginning of rechecking
EndRecheckTx(RequestEndRecheckTx) ResponseEndRecheckTx // Signals the end of rechecking

Expand Down Expand Up @@ -51,10 +54,14 @@ func (BaseApplication) DeliverTx(req RequestDeliverTx) ResponseDeliverTx {
return ResponseDeliverTx{Code: CodeTypeOK}
}

func (BaseApplication) CheckTx(req RequestCheckTx) ResponseCheckTx {
func (BaseApplication) CheckTxSync(req RequestCheckTx) ResponseCheckTx {
return ResponseCheckTx{Code: CodeTypeOK}
}

func (BaseApplication) CheckTxAsync(req RequestCheckTx, callback CheckTxCallback) {
callback(ResponseCheckTx{Code: CodeTypeOK})
}

func (BaseApplication) BeginRecheckTx(req RequestBeginRecheckTx) ResponseBeginRecheckTx {
return ResponseBeginRecheckTx{Code: CodeTypeOK}
}
Expand Down Expand Up @@ -114,7 +121,7 @@ func (app *GRPCApplication) DeliverTx(ctx context.Context, req *RequestDeliverTx
}

func (app *GRPCApplication) CheckTx(ctx context.Context, req *RequestCheckTx) (*ResponseCheckTx, error) {
res := app.app.CheckTx(*req)
res := app.app.CheckTxSync(*req)
return &res, nil
}

Expand Down
6 changes: 5 additions & 1 deletion blockchain/v0/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,14 @@ func (app *testApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx
return abci.ResponseDeliverTx{Events: []abci.Event{}}
}

func (app *testApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
func (app *testApp) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx {
return abci.ResponseCheckTx{}
}

func (app *testApp) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) {
callback(abci.ResponseCheckTx{})
}

func (app *testApp) Commit() abci.ResponseCommit {
return abci.ResponseCommit{}
}
Expand Down
16 changes: 12 additions & 4 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func deliverTxsRange(cs *State, start, end int) {
for i := start; i < end; i++ {
txBytes := make([]byte, 8)
binary.BigEndian.PutUint64(txBytes, uint64(i))
err := assertMempool(cs.txNotifier).CheckTx(txBytes, nil, mempl.TxInfo{})
_, err := assertMempool(cs.txNotifier).CheckTxSync(txBytes, mempl.TxInfo{})
if err != nil {
panic(fmt.Sprintf("Error after CheckTx: %v", err))
}
Expand Down Expand Up @@ -161,13 +161,13 @@ func TestMempoolRmBadTx(t *testing.T) {
// Try to send the tx through the mempool.
// CheckTx should not err, but the app should return a bad abci code
// and the tx should get removed from the pool
err := assertMempool(cs.txNotifier).CheckTx(txBytes, func(r *abci.Response) {
err := assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(r *abci.Response) {
if r.GetCheckTx().Code != code.CodeTypeBadNonce {
t.Errorf("expected checktx to return bad nonce, got %v", r)
return
}
checkTxRespCh <- struct{}{}
}, mempl.TxInfo{})
})
if err != nil {
t.Errorf("error after CheckTx: %v", err)
return
Expand Down Expand Up @@ -233,7 +233,15 @@ func (app *CounterApplication) DeliverTx(req abci.RequestDeliverTx) abci.Respons
return abci.ResponseDeliverTx{Code: code.CodeTypeOK}
}

func (app *CounterApplication) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
func (app *CounterApplication) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx {
return app.checkTx(req)
}

func (app *CounterApplication) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) {
callback(app.checkTx(req))
}

func (app *CounterApplication) checkTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
txValue := txAsUint64(req.Tx)
app.mempoolTxCountMtx.Lock()
defer app.mempoolTxCountMtx.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)

// send a tx
if err := assertMempool(css[3].txNotifier).CheckTx([]byte{1, 2, 3}, nil, mempl.TxInfo{}); err != nil {
if _, err := assertMempool(css[3].txNotifier).CheckTxSync([]byte{1, 2, 3}, mempl.TxInfo{}); err != nil {
t.Error(err)
}

Expand Down Expand Up @@ -543,7 +543,7 @@ func waitForAndValidateBlock(
err := validateBlock(newBlock, activeVals)
assert.Nil(t, err)
for _, tx := range txs {
err := assertMempool(css[j].txNotifier).CheckTx(tx, nil, mempl.TxInfo{})
_, err := assertMempool(css[j].txNotifier).CheckTxSync(tx, mempl.TxInfo{})
assert.Nil(t, err)
}
}, css)
Expand Down
14 changes: 7 additions & 7 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func sendTxs(ctx context.Context, cs *State) {
return
default:
tx := []byte{byte(i)}
assertMempool(cs.txNotifier).CheckTx(tx, nil, mempl.TxInfo{})
assertMempool(cs.txNotifier).CheckTxSync(tx, mempl.TxInfo{})
i++
}
}
Expand Down Expand Up @@ -350,7 +350,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
require.NoError(t, err)
valPubKey1ABCI := types.TM2PB.PubKey(newValidatorPubKey1)
newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx1, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ := css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts := propBlock.MakePartSet(partSize)
Expand All @@ -376,7 +376,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
require.NoError(t, err)
updatePubKey1ABCI := types.TM2PB.PubKey(updateValidatorPubKey1)
updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25)
err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(updateValidatorTx1, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
Expand All @@ -402,13 +402,13 @@ func TestSimulateValidatorsChange(t *testing.T) {
require.NoError(t, err)
newVal2ABCI := types.TM2PB.PubKey(newValidatorPubKey2)
newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx2, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx2, mempl.TxInfo{})
assert.Nil(t, err)
newValidatorPubKey3, err := css[nVals+2].privValidator.GetPubKey()
require.NoError(t, err)
newVal3ABCI := types.TM2PB.PubKey(newValidatorPubKey3)
newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(newValidatorTx3, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
Expand Down Expand Up @@ -442,7 +442,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
ensureNewProposal(proposalCh, height, round)

removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx2, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(removeValidatorTx2, mempl.TxInfo{})
assert.Nil(t, err)

rs = css[0].GetRoundState()
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
height++
incrementHeight(vss...)
removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, nil, mempl.TxInfo{})
_, err = assertMempool(css[0].txNotifier).CheckTxSync(removeValidatorTx3, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
Expand Down
37 changes: 34 additions & 3 deletions mempool/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,46 @@ func BenchmarkReap(b *testing.B) {
for i := 0; i < size; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTx(tx, nil, TxInfo{})
mempool.CheckTxSync(tx, TxInfo{})
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
mempool.ReapMaxBytesMaxGas(100000000, 10000000)
}
}

func BenchmarkCheckTx(b *testing.B) {
func BenchmarkReapWithCheckTxAsync(b *testing.B) {
app := kvstore.NewApplication()
cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithApp(cc)
defer cleanup()

size := 10000
for i := 0; i < size; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTxAsync(tx, TxInfo{}, nil)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
mempool.ReapMaxBytesMaxGas(100000000, 10000000)
}
}

func BenchmarkCheckTxSync(b *testing.B) {
app := kvstore.NewApplication()
cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithApp(cc)
defer cleanup()

for i := 0; i < b.N; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTxSync(tx, TxInfo{})
}
}

func BenchmarkCheckTxAsync(b *testing.B) {
app := kvstore.NewApplication()
cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithApp(cc)
Expand All @@ -35,7 +66,7 @@ func BenchmarkCheckTx(b *testing.B) {
for i := 0; i < b.N; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTx(tx, nil, TxInfo{})
mempool.CheckTxAsync(tx, TxInfo{}, nil)
}
}

Expand Down
4 changes: 2 additions & 2 deletions mempool/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestCacheAfterUpdate(t *testing.T) {
for tcIndex, tc := range tests {
for i := 0; i < tc.numTxsToCreate; i++ {
tx := types.Tx{byte(i)}
err := mempool.CheckTx(tx, nil, TxInfo{})
_, err := mempool.CheckTxSync(tx, TxInfo{})
require.NoError(t, err)
}

Expand All @@ -71,7 +71,7 @@ func TestCacheAfterUpdate(t *testing.T) {

for _, v := range tc.reAddIndices {
tx := types.Tx{byte(v)}
_ = mempool.CheckTx(tx, nil, TxInfo{})
_, _ = mempool.CheckTxSync(tx, TxInfo{})
}

cache := mempool.cache.(*mapTxCache)
Expand Down
Loading