Skip to content

Commit

Permalink
feat: concurrent checkTx (#160)
Browse files Browse the repository at this point in the history
* chore: remove mtx from localClient.CheckTx

* chore: revise mempool lock when commit

* chore: implement begin/end recheck tx abci

* chore: implement begin/end recheck tx for grpc and example

* chore: fix tests

* chore: revert unwanted comment changes

* test: fix tests & revise `CounterApplication` (test application)
  • Loading branch information
jinsan-line authored Jan 11, 2021
1 parent b557e0c commit 2d720ad
Show file tree
Hide file tree
Showing 17 changed files with 2,545 additions and 513 deletions.
4 changes: 4 additions & 0 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Client interface {
InitChainAsync(types.RequestInitChain) *ReqRes
BeginBlockAsync(types.RequestBeginBlock) *ReqRes
EndBlockAsync(types.RequestEndBlock) *ReqRes
BeginRecheckTxAsync(types.RequestBeginRecheckTx) *ReqRes
EndRecheckTxAsync(types.RequestEndRecheckTx) *ReqRes

EchoSync(msg string) (*types.ResponseEcho, error)
InfoSync(types.RequestInfo) (*types.ResponseInfo, error)
Expand All @@ -45,6 +47,8 @@ type Client interface {
InitChainSync(types.RequestInitChain) (*types.ResponseInitChain, error)
BeginBlockSync(types.RequestBeginBlock) (*types.ResponseBeginBlock, error)
EndBlockSync(types.RequestEndBlock) (*types.ResponseEndBlock, error)
BeginRecheckTxSync(types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error)
EndRecheckTxSync(types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error)
}

//----------------------------------------
Expand Down
28 changes: 28 additions & 0 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,24 @@ func (cli *grpcClient) EndBlockAsync(params types.RequestEndBlock) *ReqRes {
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_EndBlock{EndBlock: res}})
}

func (cli *grpcClient) BeginRecheckTxAsync(params types.RequestBeginRecheckTx) *ReqRes {
req := types.ToRequestBeginRecheckTx(params)
res, err := cli.client.BeginRecheckTx(context.Background(), req.GetBeginRecheckTx(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_BeginRecheckTx{BeginRecheckTx: res}})
}

func (cli *grpcClient) EndRecheckTxAsync(params types.RequestEndRecheckTx) *ReqRes {
req := types.ToRequestEndRecheckTx(params)
res, err := cli.client.EndRecheckTx(context.Background(), req.GetEndRecheckTx(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_EndRecheckTx{EndRecheckTx: res}})
}

func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
reqres := NewReqRes(req)
reqres.Response = res // Set response
Expand Down Expand Up @@ -295,3 +313,13 @@ func (cli *grpcClient) EndBlockSync(params types.RequestEndBlock) (*types.Respon
reqres := cli.EndBlockAsync(params)
return reqres.Response.GetEndBlock(), cli.Error()
}

func (cli *grpcClient) BeginRecheckTxSync(params types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
reqres := cli.BeginRecheckTxAsync(params)
return reqres.Response.GetBeginRecheckTx(), cli.Error()
}

func (cli *grpcClient) EndRecheckTxSync(params types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
reqres := cli.EndRecheckTxAsync(params)
return reqres.Response.GetEndRecheckTx(), cli.Error()
}
32 changes: 26 additions & 6 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ func (app *localClient) DeliverTxAsync(params types.RequestDeliverTx) *ReqRes {
}

func (app *localClient) CheckTxAsync(req types.RequestCheckTx) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.CheckTx(req)
return app.callback(
types.ToRequestCheckTx(req),
Expand Down Expand Up @@ -153,6 +150,22 @@ func (app *localClient) EndBlockAsync(req types.RequestEndBlock) *ReqRes {
)
}

func (app *localClient) BeginRecheckTxAsync(req types.RequestBeginRecheckTx) *ReqRes {
res := app.Application.BeginRecheckTx(req)
return app.callback(
types.ToRequestBeginRecheckTx(req),
types.ToResponseBeginRecheckTx(res),
)
}

func (app *localClient) EndRecheckTxAsync(req types.RequestEndRecheckTx) *ReqRes {
res := app.Application.EndRecheckTx(req)
return app.callback(
types.ToRequestEndRecheckTx(req),
types.ToResponseEndRecheckTx(res),
)
}

//-------------------------------------------------------

func (app *localClient) FlushSync() error {
Expand Down Expand Up @@ -188,9 +201,6 @@ func (app *localClient) DeliverTxSync(req types.RequestDeliverTx) (*types.Respon
}

func (app *localClient) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.CheckTx(req)
return &res, nil
}
Expand Down Expand Up @@ -235,6 +245,16 @@ func (app *localClient) EndBlockSync(req types.RequestEndBlock) (*types.Response
return &res, nil
}

func (app *localClient) BeginRecheckTxSync(req types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
res := app.Application.BeginRecheckTx(req)
return &res, nil
}

func (app *localClient) EndRecheckTxSync(req types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
res := app.Application.EndRecheckTx(req)
return &res, nil
}

//-------------------------------------------------------

func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes {
Expand Down
8 changes: 8 additions & 0 deletions abci/example/kvstore/persistent_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ func (app *PersistentKVStoreApplication) CheckTx(req types.RequestCheckTx) types
return app.app.CheckTx(req)
}

func (app *PersistentKVStoreApplication) BeginRecheckTx(req types.RequestBeginRecheckTx) types.ResponseBeginRecheckTx {
return app.app.BeginRecheckTx(req)
}

func (app *PersistentKVStoreApplication) EndRecheckTx(req types.RequestEndRecheckTx) types.ResponseEndRecheckTx {
return app.app.EndRecheckTx(req)
}

// Commit will panic if InitChain was not called
func (app *PersistentKVStoreApplication) Commit() types.ResponseCommit {
return app.app.Commit()
Expand Down
23 changes: 22 additions & 1 deletion abci/types/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ type Application interface {
Query(RequestQuery) ResponseQuery // Query for state

// Mempool Connection
CheckTx(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool
CheckTx(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool
BeginRecheckTx(RequestBeginRecheckTx) ResponseBeginRecheckTx // Signals the beginning of rechecking
EndRecheckTx(RequestEndRecheckTx) ResponseEndRecheckTx // Signals the end of rechecking

// Consensus Connection
InitChain(RequestInitChain) ResponseInitChain // Initialize blockchain w validators/other info from TendermintCore
Expand Down Expand Up @@ -53,6 +55,14 @@ func (BaseApplication) CheckTx(req RequestCheckTx) ResponseCheckTx {
return ResponseCheckTx{Code: CodeTypeOK}
}

func (BaseApplication) BeginRecheckTx(req RequestBeginRecheckTx) ResponseBeginRecheckTx {
return ResponseBeginRecheckTx{Code: CodeTypeOK}
}

func (BaseApplication) EndRecheckTx(req RequestEndRecheckTx) ResponseEndRecheckTx {
return ResponseEndRecheckTx{Code: CodeTypeOK}
}

func (BaseApplication) Commit() ResponseCommit {
return ResponseCommit{}
}
Expand Down Expand Up @@ -108,6 +118,17 @@ func (app *GRPCApplication) CheckTx(ctx context.Context, req *RequestCheckTx) (*
return &res, nil
}

func (app *GRPCApplication) BeginRecheckTx(ctx context.Context, req *RequestBeginRecheckTx) (
*ResponseBeginRecheckTx, error) {
res := app.app.BeginRecheckTx(*req)
return &res, nil
}

func (app *GRPCApplication) EndRecheckTx(ctx context.Context, req *RequestEndRecheckTx) (*ResponseEndRecheckTx, error) {
res := app.app.EndRecheckTx(*req)
return &res, nil
}

func (app *GRPCApplication) Query(ctx context.Context, req *RequestQuery) (*ResponseQuery, error) {
res := app.app.Query(*req)
return &res, nil
Expand Down
24 changes: 24 additions & 0 deletions abci/types/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,18 @@ func ToRequestEndBlock(req RequestEndBlock) *Request {
}
}

func ToRequestBeginRecheckTx(req RequestBeginRecheckTx) *Request {
return &Request{
Value: &Request_BeginRecheckTx{&req},
}
}

func ToRequestEndRecheckTx(req RequestEndRecheckTx) *Request {
return &Request{
Value: &Request_EndRecheckTx{&req},
}
}

//----------------------------------------

func ToResponseException(errStr string) *Response {
Expand Down Expand Up @@ -196,3 +208,15 @@ func ToResponseEndBlock(res ResponseEndBlock) *Response {
Value: &Response_EndBlock{&res},
}
}

func ToResponseBeginRecheckTx(res ResponseBeginRecheckTx) *Response {
return &Response{
Value: &Response_BeginRecheckTx{&res},
}
}

func ToResponseEndRecheckTx(res ResponseEndRecheckTx) *Response {
return &Response{
Value: &Response_EndRecheckTx{&res},
}
}
Loading

0 comments on commit 2d720ad

Please sign in to comment.