Skip to content

Commit

Permalink
executor: simplfy code and move channel close to retrive func
Browse files Browse the repository at this point in the history
Signed-off-by: IcePigZDB <icepigzdb@gmail.com>

Signed-off-by: IcePigZDB <icepigzdb@gmail.com>
  • Loading branch information
IcePigZDB committed Oct 9, 2021
1 parent 039220c commit 6debfe2
Showing 1 changed file with 6 additions and 28 deletions.
34 changes: 6 additions & 28 deletions executor/memtable_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,11 +742,11 @@ func (h *hotRegionsResponseHeap) Pop() interface{} {
}

type hotRegionsHistoryRetriver struct {
dummyCloser
isDrained bool
retrieving bool
heap *hotRegionsResponseHeap
extractor *plannercore.HotRegionsHistoryTableExtractor
cancel context.CancelFunc
}

// HistoryHotRegionsRequest wrap conditions push down to PD.
Expand Down Expand Up @@ -830,18 +830,10 @@ func (e *hotRegionsHistoryRetriver) initialize(ctx context.Context, sctx session
// Change uint to ture false slice,
var isLearners, isLeaders []bool
for _, l := range e.extractor.IsLearners {
if l == 1 {
isLearners = append(isLearners, true)
} else if l == 0 {
isLearners = append(isLearners, false)
}
isLearners = append(isLearners, l == 1)
}
for _, l := range e.extractor.IsLeaders {
if l == 1 {
isLeaders = append(isLeaders, true)
} else if l == 0 {
isLeaders = append(isLeaders, false)
}
isLeaders = append(isLeaders, l == 1)
}

// set hotType before request
Expand All @@ -861,14 +853,12 @@ func (e *hotRegionsHistoryRetriver) initialize(ctx context.Context, sctx session
func (e *hotRegionsHistoryRetriver) startRetrieving(
ctx context.Context,
sctx sessionctx.Context,
serversInfo []infoschema.ServerInfo,
pdServers []infoschema.ServerInfo,
req *HistoryHotRegionsRequest,
) ([]chan hotRegionsResult, error) {
// The retrieve progress may be abort
ctx, e.cancel = context.WithCancel(ctx)

var results []chan hotRegionsResult
for _, srv := range serversInfo {
for _, srv := range pdServers {
for typ := range e.extractor.HotRegionTypes {
req.HotRegionTypes = []string{typ}
jsonBody, err := json.Marshal(req)
Expand All @@ -880,8 +870,6 @@ func (e *hotRegionsHistoryRetriver) startRetrieving(
results = append(results, ch)
go func(ch chan hotRegionsResult, address string, body *bytes.Buffer) {
util.WithRecovery(func() {
defer close(ch)

url := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), address, pdapi.HotHistory)
req, err := http.NewRequest(http.MethodGet, url, body)
if err != nil {
Expand Down Expand Up @@ -929,6 +917,7 @@ func (e *hotRegionsHistoryRetriver) retrieve(ctx context.Context, sctx sessionct
// Initialize the heap
e.heap = &hotRegionsResponseHeap{}
for _, ch := range results {
defer close(ch)
result := <-ch
if result.err != nil || len(result.messages.HistoryHotRegion) == 0 {
if result.err != nil {
Expand Down Expand Up @@ -1046,14 +1035,3 @@ func (e *hotRegionsHistoryRetriver) getHotRegionRowWithSchemaInfo(
}
return row, nil
}

func (e *hotRegionsHistoryRetriver) close() error {
if e.cancel != nil {
e.cancel()
}
return nil
}

func (e *hotRegionsHistoryRetriver) getRuntimeStats() execdetails.RuntimeStats {
return nil
}

0 comments on commit 6debfe2

Please sign in to comment.