Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: parallel checktx #3

Merged
merged 38 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
31369aa
feat: concurrent checktx
dudong2 Dec 6, 2024
44d9ac2
feat: concurrent rechecktx
dudong2 Dec 9, 2024
93f66c6
feat: checkTxAsyncReactor
dudong2 Dec 9, 2024
3175751
fix: Use Begin/EndRecheckTxSync for app_conn
dudong2 Dec 9, 2024
3099b2b
chore: revise abci.Client, Async() interfaces
dudong2 Dec 10, 2024
fc88b00
test: Fix TestCacheAfterUpdate
dudong2 Dec 11, 2024
ce06f3b
test: Fix TestClientServer
dudong2 Dec 11, 2024
bafab12
test: Fix tests related with Application
dudong2 Dec 11, 2024
2f694a1
test: Fix TestHangingAsyncCalls
dudong2 Dec 12, 2024
70b64d3
test: Fix clist_mempool_test.go
dudong2 Dec 12, 2024
ceeb20f
fix: Add handler for BeginRecheckTx/EndRecheckTx
dudong2 Dec 12, 2024
881fe44
test: Fix TestExtendVoteCalledWhenEnabled
dudong2 Dec 12, 2024
5f91d8e
test: Fix state_test.go
dudong2 Dec 12, 2024
8bcf311
test: Fix TestCallbackInvokedWhenSetEarly
dudong2 Dec 12, 2024
66aff5d
chore: Fix lint
dudong2 Dec 12, 2024
4a8e351
chore: Fix lint
dudong2 Dec 12, 2024
a0c57bd
refactor: Use *sync.WaitGroup directly
dudong2 Jan 8, 2025
88b4ae2
chore: Use CheckTxSync from mempool.reactor.Receive()
dudong2 Jan 8, 2025
f0ffeb7
chore: Remove original rechecktx logics already commented out
dudong2 Jan 8, 2025
b2b8195
chore: Sync GetGlobalCallback call with other client types
dudong2 Jan 8, 2025
502d49d
refactor: Sync with original cometbft about callback
dudong2 Jan 8, 2025
4df9287
fix: Use own mutex for ResponseCallback
dudong2 Jan 8, 2025
a977306
chore: Comment Begin/EndRecheckTx lock
dudong2 Jan 9, 2025
544741a
chore: Revert ostracon#226
dudong2 Jan 10, 2025
e77a690
test: Un-comment socket_client_test.gochore: Sync some codes with ori…
dudong2 Jan 10, 2025
721e49d
chore: Sync some codes with cometbft original
dudong2 Jan 10, 2025
05fd5f9
refactor: Sync with original cometbft
dudong2 Jan 11, 2025
7d9de48
test: Fix kvstore test
dudong2 Jan 11, 2025
c0b953d
chore: Remove useless codes
dudong2 Jan 11, 2025
b23244d
refactor: Use ResponseCheckTx pointer directly
dudong2 Jan 12, 2025
529386f
refactor: Use original cometbft codes for rpc/BroadcastTxSync
dudong2 Jan 12, 2025
a506c24
refactor: Use original cometbft codes for rpc/BroadcastTxCommit
dudong2 Jan 12, 2025
0efabd4
refactor: Sync with original cometbft for mempool/CheckTxSync
dudong2 Jan 12, 2025
881d172
test: Fix test
dudong2 Jan 12, 2025
3c7ad6e
refactor: Sync with original cometbft
dudong2 Jan 12, 2025
224bc7c
refactor: Simplify CheckTxAsync
dudong2 Jan 12, 2025
402df98
refactor: Sync with original cometbft for reqResCb
dudong2 Jan 12, 2025
a1b84ce
chore: Fix comments
dudong2 Jan 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@ type Client interface {
// for the v0 mempool. We should explore refactoring the
// mempool to remove this vestige behavior.
SetResponseCallback(Callback)

CheckTxSync(context.Context, *types.RequestCheckTx) (*types.ResponseCheckTx, error)
BeginRecheckTxSync(context.Context, *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) // Signals the beginning of rechecking
EndRecheckTxSync(context.Context, *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) // Signals the end of rechecking

CheckTxAsync(context.Context, *types.RequestCheckTx) (*ReqRes, error)
BeginRecheckTxAsync(context.Context, *types.RequestBeginRecheckTx) (*ReqRes, error)
EndRecheckTxAsync(context.Context, *types.RequestEndRecheckTx) (*ReqRes, error)
}

//----------------------------------------
Expand Down Expand Up @@ -114,6 +121,13 @@ func (r *ReqRes) InvokeCallback() {
r.callbackInvoked = true
}

// SetDone marks the ReqRes object as done.
func (r *ReqRes) SetInvoked() {
r.mtx.Lock()
r.callbackInvoked = true
r.mtx.Unlock()
}

// GetCallback returns the configured callback of the ReqRes object which may be
// nil. Note, it is not safe to concurrently call this in cases where it is
// marked done and SetCallback is called before calling GetCallback as that
Expand Down
73 changes: 60 additions & 13 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (cli *grpcClient) OnStart() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()

reqres.SetInvoked()
reqres.Done()

// Notify client listener if set
Expand Down Expand Up @@ -161,15 +162,6 @@ func (cli *grpcClient) SetResponseCallback(resCb Callback) {

//----------------------------------------

func (cli *grpcClient) CheckTxAsync(ctx context.Context, req *types.RequestCheckTx) (*ReqRes, error) {
res, err := cli.client.CheckTx(ctx, req, grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
return nil, err
}
return cli.finishAsyncCall(types.ToRequestCheckTx(req), &types.Response{Value: &types.Response_CheckTx{CheckTx: res}}), nil
}

// finishAsyncCall creates a ReqRes for an async call, and immediately populates it
// with the response. We don't complete it until it's been ordered via the channel.
func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
Expand All @@ -194,10 +186,6 @@ func (cli *grpcClient) Info(ctx context.Context, req *types.RequestInfo) (*types
return cli.client.Info(ctx, req, grpc.WaitForReady(true))
}

func (cli *grpcClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return cli.client.CheckTx(ctx, req, grpc.WaitForReady(true))
}

func (cli *grpcClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) {
return cli.client.Query(ctx, types.ToRequestQuery(req).GetQuery(), grpc.WaitForReady(true))
}
Expand Down Expand Up @@ -245,3 +233,62 @@ func (cli *grpcClient) VerifyVoteExtension(ctx context.Context, req *types.Reque
func (cli *grpcClient) FinalizeBlock(ctx context.Context, req *types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
return cli.client.FinalizeBlock(ctx, types.ToRequestFinalizeBlock(req).GetFinalizeBlock(), grpc.WaitForReady(true))
}

func (cli *grpcClient) CheckTxSync(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return cli.client.CheckTx(ctx, req, grpc.WaitForReady(true))
}

func (cli *grpcClient) BeginRecheckTxSync(ctx context.Context, params *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
reqres, _ := cli.BeginRecheckTxAsync(ctx, params)
reqres.Wait()
return reqres.Response.GetBeginRecheckTx(), cli.Error()
}

func (cli *grpcClient) EndRecheckTxSync(ctx context.Context, params *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
reqres, _ := cli.EndRecheckTxAsync(ctx, params)
reqres.Wait()
return reqres.Response.GetEndRecheckTx(), cli.Error()
}

func (cli *grpcClient) CheckTxAsync(ctx context.Context, req *types.RequestCheckTx) (*ReqRes, error) {
res, err := cli.client.CheckTx(ctx, req, grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
return nil, err
}
return cli.finishAsyncCall(types.ToRequestCheckTx(req), &types.Response{Value: &types.Response_CheckTx{CheckTx: res}}), nil
}

func (cli *grpcClient) BeginRecheckTxAsync(ctx context.Context, params *types.RequestBeginRecheckTx) (*ReqRes, error) {
req := types.ToRequestBeginRecheckTx(params)
res, err := cli.client.BeginRecheckTx(ctx, req.GetBeginRecheckTx(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_BeginRecheckTx{BeginRecheckTx: res}}), nil
}

func (cli *grpcClient) EndRecheckTxAsync(ctx context.Context, params *types.RequestEndRecheckTx) (*ReqRes, error) {
req := types.ToRequestEndRecheckTx(params)
res, err := cli.client.EndRecheckTx(ctx, req.GetEndRecheckTx(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_EndRecheckTx{EndRecheckTx: res}}), nil
}

func (cli *grpcClient) CheckTxSyncForApp(context.Context, *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
panic("not implemented")
}

func (cli *grpcClient) CheckTxAsyncForApp(context.Context, *types.RequestCheckTx, types.CheckTxCallback) {
panic("not implemented")
}

func (cli *grpcClient) BeginRecheckTx(ctx context.Context, params *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
panic("not implemented")
}

func (cli *grpcClient) EndRecheckTx(ctx context.Context, params *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
panic("not implemented")
}
101 changes: 80 additions & 21 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,6 @@ func (app *localClient) SetResponseCallback(cb Callback) {
app.mtx.Unlock()
}

func (app *localClient) CheckTxAsync(ctx context.Context, req *types.RequestCheckTx) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

res, err := app.Application.CheckTx(ctx, req)
if err != nil {
return nil, err
}
return app.callback(
types.ToRequestCheckTx(req),
types.ToResponseCheckTx(res),
), nil
}

func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes {
app.Callback(req, res)
rr := newLocalReqRes(req, res)
Expand Down Expand Up @@ -92,13 +78,6 @@ func (app *localClient) Info(ctx context.Context, req *types.RequestInfo) (*type
return app.Application.Info(ctx, req)
}

func (app *localClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

return app.Application.CheckTx(ctx, req)
}

func (app *localClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
Expand Down Expand Up @@ -184,3 +163,83 @@ func (app *localClient) FinalizeBlock(ctx context.Context, req *types.RequestFin

return app.Application.FinalizeBlock(ctx, req)
}

func (app *localClient) CheckTxSync(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
// CONTRACT: Application should handle concurrent `CheckTx`
// In this abci client layer, we don't protect `CheckTx` with a mutex for concurrency
// app.mtx.Lock()
// defer app.mtx.Unlock()
return app.Application.CheckTxSyncForApp(ctx, req)
}

func (app *localClient) BeginRecheckTxSync(ctx context.Context, req *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
// NOTE: commented out for performance. delete all after commenting out all `app.mtx`
// app.mtx.Lock()
// defer app.mtx.Unlock()

return app.Application.BeginRecheckTx(ctx, req)
}

func (app *localClient) EndRecheckTxSync(ctx context.Context, req *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
// NOTE: commented out for performance. delete all after commenting out all `app.mtx`
// app.mtx.Lock()
// defer app.mtx.Unlock()

return app.Application.EndRecheckTx(ctx, req)
}

func (app *localClient) CheckTxAsync(ctx context.Context, reqCheckTx *types.RequestCheckTx) (*ReqRes, error) {
req := types.ToRequestCheckTx(reqCheckTx)
reqRes := NewReqRes(req)

app.Application.CheckTxAsyncForApp(ctx, reqCheckTx, func(resCheckTx *types.ResponseCheckTx) {
res := types.ToResponseCheckTx(resCheckTx)
app.Callback(req, res)
reqRes.Response = res
reqRes.SetInvoked()
reqRes.Done()

// Notify reqRes listener if set
reqRes.InvokeCallback()
})

return reqRes, nil
}

func (app *localClient) BeginRecheckTxAsync(ctx context.Context, req *types.RequestBeginRecheckTx) (*ReqRes, error) {
res, err := app.Application.BeginRecheckTx(ctx, req)
if err != nil {
return nil, err
}
return app.callback(
types.ToRequestBeginRecheckTx(req),
types.ToResponseBeginRecheckTx(res),
), nil
}

func (app *localClient) EndRecheckTxAsync(ctx context.Context, req *types.RequestEndRecheckTx) (*ReqRes, error) {
res, err := app.Application.EndRecheckTx(ctx, req)
if err != nil {
return nil, err
}
return app.callback(
types.ToRequestEndRecheckTx(req),
types.ToResponseEndRecheckTx(res),
), nil
}

func (app *localClient) CheckTxSyncForApp(context.Context, *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
panic("not implemented")
}

func (app *localClient) CheckTxAsyncForApp(context.Context, *types.RequestCheckTx, types.CheckTxCallback) {
panic("not implemented")
}

func (app *localClient) BeginRecheckTx(ctx context.Context, params *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
panic("not implemented")
}

func (app *localClient) EndRecheckTx(ctx context.Context, params *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
panic("not implemented")
}
Loading
Loading