Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: base branch v0.38.12 to v0.38.17 #10

Open
wants to merge 4 commits into
base: basechain/develop-v0.38.17
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@ type Client interface {
// for the v0 mempool. We should explore refactoring the
// mempool to remove this vestige behavior.
SetResponseCallback(Callback)

CheckTxSync(context.Context, *types.RequestCheckTx) (*types.ResponseCheckTx, error)
BeginRecheckTxSync(context.Context, *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) // Signals the beginning of rechecking
EndRecheckTxSync(context.Context, *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) // Signals the end of rechecking

CheckTxAsync(context.Context, *types.RequestCheckTx) (*ReqRes, error)
BeginRecheckTxAsync(context.Context, *types.RequestBeginRecheckTx) (*ReqRes, error)
EndRecheckTxAsync(context.Context, *types.RequestEndRecheckTx) (*ReqRes, error)
}

//----------------------------------------
Expand Down Expand Up @@ -114,6 +121,13 @@ func (r *ReqRes) InvokeCallback() {
r.callbackInvoked = true
}

// SetDone marks the ReqRes object as done.
func (r *ReqRes) SetInvoked() {
r.mtx.Lock()
r.callbackInvoked = true
r.mtx.Unlock()
}

// GetCallback returns the configured callback of the ReqRes object which may be
// nil. Note, it is not safe to concurrently call this in cases where it is
// marked done and SetCallback is called before calling GetCallback as that
Expand Down
73 changes: 60 additions & 13 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (cli *grpcClient) OnStart() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()

reqres.SetInvoked()
reqres.Done()

// Notify client listener if set
Expand Down Expand Up @@ -161,15 +162,6 @@ func (cli *grpcClient) SetResponseCallback(resCb Callback) {

//----------------------------------------

func (cli *grpcClient) CheckTxAsync(ctx context.Context, req *types.RequestCheckTx) (*ReqRes, error) {
res, err := cli.client.CheckTx(ctx, req, grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
return nil, err
}
return cli.finishAsyncCall(types.ToRequestCheckTx(req), &types.Response{Value: &types.Response_CheckTx{CheckTx: res}}), nil
}

// finishAsyncCall creates a ReqRes for an async call, and immediately populates it
// with the response. We don't complete it until it's been ordered via the channel.
func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
Expand All @@ -194,10 +186,6 @@ func (cli *grpcClient) Info(ctx context.Context, req *types.RequestInfo) (*types
return cli.client.Info(ctx, req, grpc.WaitForReady(true))
}

func (cli *grpcClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return cli.client.CheckTx(ctx, req, grpc.WaitForReady(true))
}

func (cli *grpcClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) {
return cli.client.Query(ctx, types.ToRequestQuery(req).GetQuery(), grpc.WaitForReady(true))
}
Expand Down Expand Up @@ -245,3 +233,62 @@ func (cli *grpcClient) VerifyVoteExtension(ctx context.Context, req *types.Reque
func (cli *grpcClient) FinalizeBlock(ctx context.Context, req *types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
return cli.client.FinalizeBlock(ctx, types.ToRequestFinalizeBlock(req).GetFinalizeBlock(), grpc.WaitForReady(true))
}

func (cli *grpcClient) CheckTxSync(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return cli.client.CheckTx(ctx, req, grpc.WaitForReady(true))
}

func (cli *grpcClient) BeginRecheckTxSync(ctx context.Context, params *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
reqres, _ := cli.BeginRecheckTxAsync(ctx, params)
reqres.Wait()
return reqres.Response.GetBeginRecheckTx(), cli.Error()
}

func (cli *grpcClient) EndRecheckTxSync(ctx context.Context, params *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
reqres, _ := cli.EndRecheckTxAsync(ctx, params)
reqres.Wait()
return reqres.Response.GetEndRecheckTx(), cli.Error()
}

func (cli *grpcClient) CheckTxAsync(ctx context.Context, req *types.RequestCheckTx) (*ReqRes, error) {
res, err := cli.client.CheckTx(ctx, req, grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
return nil, err
}
return cli.finishAsyncCall(types.ToRequestCheckTx(req), &types.Response{Value: &types.Response_CheckTx{CheckTx: res}}), nil
}

func (cli *grpcClient) BeginRecheckTxAsync(ctx context.Context, params *types.RequestBeginRecheckTx) (*ReqRes, error) {
req := types.ToRequestBeginRecheckTx(params)
res, err := cli.client.BeginRecheckTx(ctx, req.GetBeginRecheckTx(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_BeginRecheckTx{BeginRecheckTx: res}}), nil
}

func (cli *grpcClient) EndRecheckTxAsync(ctx context.Context, params *types.RequestEndRecheckTx) (*ReqRes, error) {
req := types.ToRequestEndRecheckTx(params)
res, err := cli.client.EndRecheckTx(ctx, req.GetEndRecheckTx(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_EndRecheckTx{EndRecheckTx: res}}), nil
}

func (cli *grpcClient) CheckTxSyncForApp(context.Context, *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
panic("not implemented")
}

func (cli *grpcClient) CheckTxAsyncForApp(context.Context, *types.RequestCheckTx, types.CheckTxCallback) {
panic("not implemented")
}

func (cli *grpcClient) BeginRecheckTx(ctx context.Context, params *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
panic("not implemented")
}

func (cli *grpcClient) EndRecheckTx(ctx context.Context, params *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
panic("not implemented")
}
101 changes: 80 additions & 21 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,6 @@ func (app *localClient) SetResponseCallback(cb Callback) {
app.mtx.Unlock()
}

func (app *localClient) CheckTxAsync(ctx context.Context, req *types.RequestCheckTx) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

res, err := app.Application.CheckTx(ctx, req)
if err != nil {
return nil, err
}
return app.callback(
types.ToRequestCheckTx(req),
types.ToResponseCheckTx(res),
), nil
}

func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes {
app.Callback(req, res)
rr := newLocalReqRes(req, res)
Expand Down Expand Up @@ -92,13 +78,6 @@ func (app *localClient) Info(ctx context.Context, req *types.RequestInfo) (*type
return app.Application.Info(ctx, req)
}

func (app *localClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

return app.Application.CheckTx(ctx, req)
}

func (app *localClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
Expand Down Expand Up @@ -184,3 +163,83 @@ func (app *localClient) FinalizeBlock(ctx context.Context, req *types.RequestFin

return app.Application.FinalizeBlock(ctx, req)
}

func (app *localClient) CheckTxSync(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
// CONTRACT: Application should handle concurrent `CheckTx`
// In this abci client layer, we don't protect `CheckTx` with a mutex for concurrency
// app.mtx.Lock()
// defer app.mtx.Unlock()
return app.Application.CheckTxSyncForApp(ctx, req)
}

func (app *localClient) BeginRecheckTxSync(ctx context.Context, req *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
// NOTE: commented out for performance. delete all after commenting out all `app.mtx`
// app.mtx.Lock()
// defer app.mtx.Unlock()

return app.Application.BeginRecheckTx(ctx, req)
}

func (app *localClient) EndRecheckTxSync(ctx context.Context, req *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
// NOTE: commented out for performance. delete all after commenting out all `app.mtx`
// app.mtx.Lock()
// defer app.mtx.Unlock()

return app.Application.EndRecheckTx(ctx, req)
}

func (app *localClient) CheckTxAsync(ctx context.Context, reqCheckTx *types.RequestCheckTx) (*ReqRes, error) {
req := types.ToRequestCheckTx(reqCheckTx)
reqRes := NewReqRes(req)

app.Application.CheckTxAsyncForApp(ctx, reqCheckTx, func(resCheckTx *types.ResponseCheckTx) {
res := types.ToResponseCheckTx(resCheckTx)
app.Callback(req, res)
reqRes.Response = res
reqRes.SetInvoked()
reqRes.Done()

// Notify reqRes listener if set
reqRes.InvokeCallback()
})

return reqRes, nil
}

func (app *localClient) BeginRecheckTxAsync(ctx context.Context, req *types.RequestBeginRecheckTx) (*ReqRes, error) {
res, err := app.Application.BeginRecheckTx(ctx, req)
if err != nil {
return nil, err
}
return app.callback(
types.ToRequestBeginRecheckTx(req),
types.ToResponseBeginRecheckTx(res),
), nil
}

func (app *localClient) EndRecheckTxAsync(ctx context.Context, req *types.RequestEndRecheckTx) (*ReqRes, error) {
res, err := app.Application.EndRecheckTx(ctx, req)
if err != nil {
return nil, err
}
return app.callback(
types.ToRequestEndRecheckTx(req),
types.ToResponseEndRecheckTx(res),
), nil
}

func (app *localClient) CheckTxSyncForApp(context.Context, *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
panic("not implemented")
}

func (app *localClient) CheckTxAsyncForApp(context.Context, *types.RequestCheckTx, types.CheckTxCallback) {
panic("not implemented")
}

func (app *localClient) BeginRecheckTx(ctx context.Context, params *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
panic("not implemented")
}

func (app *localClient) EndRecheckTx(ctx context.Context, params *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
panic("not implemented")
}
Loading
Loading