Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mocktikv: split rpcHandler to kvHandler and coprHandler #22857

Merged
merged 6 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions store/mockstore/mocktikv/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/pingcap/tipb/go-tipb"
)

func (h *rpcHandler) handleCopAnalyzeRequest(req *coprocessor.Request) *coprocessor.Response {
func (h coprHandler) handleCopAnalyzeRequest(req *coprocessor.Request) *coprocessor.Response {
resp := &coprocessor.Response{}
if len(req.Ranges) == 0 {
return resp
Expand Down Expand Up @@ -62,7 +62,7 @@ func (h *rpcHandler) handleCopAnalyzeRequest(req *coprocessor.Request) *coproces
return resp
}

func (h *rpcHandler) handleAnalyzeIndexReq(req *coprocessor.Request, analyzeReq *tipb.AnalyzeReq) (*coprocessor.Response, error) {
func (h coprHandler) handleAnalyzeIndexReq(req *coprocessor.Request, analyzeReq *tipb.AnalyzeReq) (*coprocessor.Response, error) {
ranges, err := h.extractKVRanges(req.Ranges, false)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -125,7 +125,7 @@ type analyzeColumnsExec struct {
fields []*ast.ResultField
}

func (h *rpcHandler) handleAnalyzeColumnsReq(req *coprocessor.Request, analyzeReq *tipb.AnalyzeReq) (_ *coprocessor.Response, err error) {
func (h coprHandler) handleAnalyzeColumnsReq(req *coprocessor.Request, analyzeReq *tipb.AnalyzeReq) (_ *coprocessor.Response, err error) {
sc := flagsToStatementContext(analyzeReq.Flags)
sc.TimeZone, err = constructTimeZone("", int(analyzeReq.TimeZoneOffset))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/pingcap/tipb/go-tipb"
)

func (h *rpcHandler) handleCopChecksumRequest(req *coprocessor.Request) *coprocessor.Response {
func (h coprHandler) handleCopChecksumRequest(req *coprocessor.Request) *coprocessor.Response {
resp := &tipb.ChecksumResponse{
Checksum: 1,
TotalKvs: 1,
Expand Down
38 changes: 19 additions & 19 deletions store/mockstore/mocktikv/cop_handler_dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type dagContext struct {
evalCtx *evalContext
}

func (h *rpcHandler) handleCopDAGRequest(req *coprocessor.Request) *coprocessor.Response {
func (h coprHandler) handleCopDAGRequest(req *coprocessor.Request) *coprocessor.Response {
resp := &coprocessor.Response{}
dagCtx, e, dagReq, err := h.buildDAGExecutor(req)
if err != nil {
Expand Down Expand Up @@ -88,7 +88,7 @@ func (h *rpcHandler) handleCopDAGRequest(req *coprocessor.Request) *coprocessor.
return buildResp(selResp, execDetails, err)
}

func (h *rpcHandler) buildDAGExecutor(req *coprocessor.Request) (*dagContext, executor, *tipb.DAGRequest, error) {
func (h coprHandler) buildDAGExecutor(req *coprocessor.Request) (*dagContext, executor, *tipb.DAGRequest, error) {
if len(req.Ranges) == 0 {
return nil, nil, nil, errors.New("request range is null")
}
Expand Down Expand Up @@ -133,7 +133,7 @@ func constructTimeZone(name string, offset int) (*time.Location, error) {
return timeutil.ConstructTimeZone(name, offset)
}

func (h *rpcHandler) handleCopStream(ctx context.Context, req *coprocessor.Request) (tikvpb.Tikv_CoprocessorStreamClient, error) {
func (h coprHandler) handleCopStream(ctx context.Context, req *coprocessor.Request) (tikvpb.Tikv_CoprocessorStreamClient, error) {
dagCtx, e, dagReq, err := h.buildDAGExecutor(req)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -147,7 +147,7 @@ func (h *rpcHandler) handleCopStream(ctx context.Context, req *coprocessor.Reque
}, nil
}

func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor, *tipb.Executor, error) {
func (h coprHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor, *tipb.Executor, error) {
var currExec executor
var err error
var childExec *tipb.Executor
Expand Down Expand Up @@ -179,7 +179,7 @@ func (h *rpcHandler) buildExec(ctx *dagContext, curr *tipb.Executor) (executor,
return currExec, childExec, errors.Trace(err)
}

func (h *rpcHandler) buildDAGForTiFlash(ctx *dagContext, farther *tipb.Executor) (executor, error) {
func (h coprHandler) buildDAGForTiFlash(ctx *dagContext, farther *tipb.Executor) (executor, error) {
curr, child, err := h.buildExec(ctx, farther)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -194,7 +194,7 @@ func (h *rpcHandler) buildDAGForTiFlash(ctx *dagContext, farther *tipb.Executor)
return curr, nil
}

func (h *rpcHandler) buildDAG(ctx *dagContext, executors []*tipb.Executor) (executor, error) {
func (h coprHandler) buildDAG(ctx *dagContext, executors []*tipb.Executor) (executor, error) {
var src executor
for i := 0; i < len(executors); i++ {
curr, _, err := h.buildExec(ctx, executors[i])
Expand All @@ -207,7 +207,7 @@ func (h *rpcHandler) buildDAG(ctx *dagContext, executors []*tipb.Executor) (exec
return src, nil
}

func (h *rpcHandler) buildTableScan(ctx *dagContext, executor *tipb.Executor) (*tableScanExec, error) {
func (h coprHandler) buildTableScan(ctx *dagContext, executor *tipb.Executor) (*tableScanExec, error) {
columns := executor.TblScan.Columns
ctx.evalCtx.setColumnInfo(columns)
ranges, err := h.extractKVRanges(ctx.keyRanges, executor.TblScan.Desc)
Expand Down Expand Up @@ -258,7 +258,7 @@ func (h *rpcHandler) buildTableScan(ctx *dagContext, executor *tipb.Executor) (*
return e, nil
}

func (h *rpcHandler) buildIndexScan(ctx *dagContext, executor *tipb.Executor) (*indexScanExec, error) {
func (h coprHandler) buildIndexScan(ctx *dagContext, executor *tipb.Executor) (*indexScanExec, error) {
var err error
columns := executor.IdxScan.Columns
ctx.evalCtx.setColumnInfo(columns)
Expand Down Expand Up @@ -311,7 +311,7 @@ func (h *rpcHandler) buildIndexScan(ctx *dagContext, executor *tipb.Executor) (*
return e, nil
}

func (h *rpcHandler) buildSelection(ctx *dagContext, executor *tipb.Executor) (*selectionExec, error) {
func (h coprHandler) buildSelection(ctx *dagContext, executor *tipb.Executor) (*selectionExec, error) {
var err error
var relatedColOffsets []int
pbConds := executor.Selection.Conditions
Expand All @@ -335,7 +335,7 @@ func (h *rpcHandler) buildSelection(ctx *dagContext, executor *tipb.Executor) (*
}, nil
}

func (h *rpcHandler) getAggInfo(ctx *dagContext, executor *tipb.Executor) ([]aggregation.Aggregation, []expression.Expression, []int, error) {
func (h coprHandler) getAggInfo(ctx *dagContext, executor *tipb.Executor) ([]aggregation.Aggregation, []expression.Expression, []int, error) {
length := len(executor.Aggregation.AggFunc)
aggs := make([]aggregation.Aggregation, 0, length)
var err error
Expand Down Expand Up @@ -366,7 +366,7 @@ func (h *rpcHandler) getAggInfo(ctx *dagContext, executor *tipb.Executor) ([]agg
return aggs, groupBys, relatedColOffsets, nil
}

func (h *rpcHandler) buildHashAgg(ctx *dagContext, executor *tipb.Executor) (*hashAggExec, error) {
func (h coprHandler) buildHashAgg(ctx *dagContext, executor *tipb.Executor) (*hashAggExec, error) {
aggs, groupBys, relatedColOffsets, err := h.getAggInfo(ctx, executor)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -384,7 +384,7 @@ func (h *rpcHandler) buildHashAgg(ctx *dagContext, executor *tipb.Executor) (*ha
}, nil
}

func (h *rpcHandler) buildStreamAgg(ctx *dagContext, executor *tipb.Executor) (*streamAggExec, error) {
func (h coprHandler) buildStreamAgg(ctx *dagContext, executor *tipb.Executor) (*streamAggExec, error) {
aggs, groupBys, relatedColOffsets, err := h.getAggInfo(ctx, executor)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -406,7 +406,7 @@ func (h *rpcHandler) buildStreamAgg(ctx *dagContext, executor *tipb.Executor) (*
}, nil
}

func (h *rpcHandler) buildTopN(ctx *dagContext, executor *tipb.Executor) (*topNExec, error) {
func (h coprHandler) buildTopN(ctx *dagContext, executor *tipb.Executor) (*topNExec, error) {
topN := executor.TopN
var err error
var relatedColOffsets []int
Expand Down Expand Up @@ -664,7 +664,7 @@ func (mock *mockCopStreamClient) readBlockFromExecutor() (tipb.Chunk, bool, *cop
return chunk, finish, &ran, mock.exec.Counts(), warnings, nil
}

func (h *rpcHandler) initSelectResponse(err error, warnings []stmtctx.SQLWarn, counts []int64) *tipb.SelectResponse {
func (h coprHandler) initSelectResponse(err error, warnings []stmtctx.SQLWarn, counts []int64) *tipb.SelectResponse {
selResp := &tipb.SelectResponse{
Error: toPBError(err),
OutputCounts: counts,
Expand All @@ -675,7 +675,7 @@ func (h *rpcHandler) initSelectResponse(err error, warnings []stmtctx.SQLWarn, c
return selResp
}

func (h *rpcHandler) fillUpData4SelectResponse(selResp *tipb.SelectResponse, dagReq *tipb.DAGRequest, dagCtx *dagContext, rows [][][]byte) error {
func (h coprHandler) fillUpData4SelectResponse(selResp *tipb.SelectResponse, dagReq *tipb.DAGRequest, dagCtx *dagContext, rows [][][]byte) error {
switch dagReq.EncodeType {
case tipb.EncodeType_TypeDefault:
h.encodeDefault(selResp, rows, dagReq.OutputOffsets)
Expand All @@ -690,7 +690,7 @@ func (h *rpcHandler) fillUpData4SelectResponse(selResp *tipb.SelectResponse, dag
return nil
}

func (h *rpcHandler) constructRespSchema(dagCtx *dagContext) []*types.FieldType {
func (h coprHandler) constructRespSchema(dagCtx *dagContext) []*types.FieldType {
var root *tipb.Executor
if len(dagCtx.dagReq.Executors) == 0 {
root = dagCtx.dagReq.RootExecutor
Expand All @@ -717,7 +717,7 @@ func (h *rpcHandler) constructRespSchema(dagCtx *dagContext) []*types.FieldType
return schema
}

func (h *rpcHandler) encodeDefault(selResp *tipb.SelectResponse, rows [][][]byte, colOrdinal []uint32) {
func (h coprHandler) encodeDefault(selResp *tipb.SelectResponse, rows [][][]byte, colOrdinal []uint32) {
var chunks []tipb.Chunk
for i := range rows {
requestedRow := dummySlice
Expand All @@ -730,7 +730,7 @@ func (h *rpcHandler) encodeDefault(selResp *tipb.SelectResponse, rows [][][]byte
selResp.EncodeType = tipb.EncodeType_TypeDefault
}

func (h *rpcHandler) encodeChunk(selResp *tipb.SelectResponse, rows [][][]byte, colTypes []*types.FieldType, colOrdinal []uint32, loc *time.Location) error {
func (h coprHandler) encodeChunk(selResp *tipb.SelectResponse, rows [][][]byte, colTypes []*types.FieldType, colOrdinal []uint32, loc *time.Location) error {
var chunks []tipb.Chunk
respColTypes := make([]*types.FieldType, 0, len(colOrdinal))
for _, ordinal := range colOrdinal {
Expand Down Expand Up @@ -826,7 +826,7 @@ func toPBError(err error) *tipb.Error {
}

// extractKVRanges extracts kv.KeyRanges slice from a SelectRequest.
func (h *rpcHandler) extractKVRanges(keyRanges []*coprocessor.KeyRange, descScan bool) (kvRanges []kv.KeyRange, err error) {
func (h coprHandler) extractKVRanges(keyRanges []*coprocessor.KeyRange, descScan bool) (kvRanges []kv.KeyRange, err error) {
for _, kran := range keyRanges {
if bytes.Compare(kran.GetStart(), kran.GetEnd()) >= 0 {
err = errors.Errorf("invalid range, start should be smaller than end: %v %v", kran.GetStart(), kran.GetEnd())
Expand Down
Loading