Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic task cancellation mechanism is invalid on coordinating node's reduce phase #70347

Closed
lqbilbo opened this issue Mar 12, 2021 · 5 comments
Assignees
Labels
:Distributed/Task Management Issues for anything around the Tasks API - both persistent and node level. :Search/Search Search-related issues that do not fall into other categories Team:Distributed Meta label for distributed team Team:Search Meta label for search team

Comments

@lqbilbo
Copy link

lqbilbo commented Mar 12, 2021

Elasticsearch version: (7.6.2)

Plugins installed: [analysis-ik]

JVM version (openjdk 13.0.2):

OS version (Linux n23-161-209 4.9.0-8-amd64 #1 SMP Debian 4.9.110-3+deb9u6 (2018-10-08) x86_64 GNU/Linux):

Description of the problem including expected versus actual behavior:

We expect the long-running aggregation search task will be cancelled immediately when the channel closed(#43332), but the actual behavior is that when we cancel the client request on the query phase it can cancel the task as we expected and throw the exception '...Caused by: java.lang.IllegalStateException: Task cancelled before it started: by user request' and the search task is paused. But if we cancel the client request during reduce phase, the log will be 'Received ban for the parent [N78L0bWWQEuSLPvitGDBxw:10714] on the node [N78L0bWWQEuSLPvitGDBxw], reason: [by user request]' but the search task is still running.

Steps to reproduce:

Please include a minimal but complete recreation of the problem,
including (e.g.) index creation, mappings, settings, query etc. The easier
you make for us to reproduce it, the more likely that somebody will take the
time to look at it.

  1. the query body is as follows:
{
   "query": {
       "bool": {
           "filter": [
               {
                   "terms": {
                       "insurance_id": [
                           111,
                           222,
                           333,
                           444,
                           555,
                           666,
                           777
                           ...(150 more)
                       ]
                   }
               },
               {
                   "range": {
                       "start_time": {
                           "lt": "2020-12-31 00:00:00",
                           "gte": "2020-12-01 00:00:00",
                           "time_zone": "+00:00"
                       }
                   }
               }
           ]
       }
   },
   "aggs": {
       "start_time": {
           "date_histogram": {
               "field": "start_time",
               "interval": "day",
               "extended_bounds": {
                   "max": "2020-12-31 00:00:00",
                   "min": "2020-12-01 00:00:00"
               },
               "min_doc_count": 0,
               "format": "yyyy-MM-dd HH:mm:ss"
           },
           "aggs": {
               "address_id": {
                   "terms": {
                       "field": "address_id",
                       "size": 100000
                   },
                   "aggs": {
                       "age": {
                           "sum": {
                               "field": "age"
                           }
                       },
                       "salary": {
                           "sum": {
                               "field": "salary"
                           }
                       },
                       "height": {
                           "sum": {
                               "field": "height"
                           }
                       },
                       "weight": {
                           "sum": {
                               "field": "weight"
                           }
                       },
                       "english_score": {
                           "sum": {
                               "field": "english_score"
                           }
                       },
                       "math_score": {
                           "sum": {
                               "field": "math_score"
                           }
                       }
                   }
               }
           }
       }
   },
   "size": 0
}
  1. request params:
allow_partial_search_results=false,
timeout=1000ms
  1. index shards: 8 primary + 8 replica

Provide logs (if relevant): I take notes about the logs on coordinating node.
(1) when I cancel the client request on query phase, the log is:

org.elasticsearch.action.search.SearchPhaseExecutionException: Partial shards failure
   at org.elasticsearch.action.search.AbstractSearchAsyncAction.onPhaseFailure(AbstractSearchAsyncAction.java:550) [elasticsearch-7.6.2-SNAPSHOT.jar:7.6.2-SNAPSHOT]
   at org.elasticsearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:327) [elasticsearch-7.6.2-SNAPSHOT.jar:7.6.2-SNAPSHOT]
   at org.elasticsearch.action.search.AbstractSearchAsyncAction.onPhaseDone(AbstractSearchAsyncAction.java:579) [elasticsearch-7.6.2-SNAPSHOT.jar:7.6.2-SNAPSHOT]
   at org.elasticsearch.action.search.AbstractSearchAsyncAction.successfulShardExecution(AbstractSearchAsyncAction.java:502) [elasticsearch-7.6.2-SNAPSHOT.jar:7.6.2-SNAPSHOT]
   at org.elasticsearch.action.search.AbstractSearchAsyncAction.onShardResult(AbstractSearchAsyncAction.java:490) [elasticsearch-7.6.2-SNAPSHOT.jar:7.6.2-SNAPSHOT]
   at org.elasticsearch.action.search.AbstractSearchAsyncAction.access$000(AbstractSearchAsyncAction.java:67) [elasticsearch-7.6.2-SNAPSHOT.jar:7.6.2-SNAPSHOT]
   ...

Caused by: org.elasticsearch.ElasticsearchException$1: Task cancelled before it started: by user request
   at org.elasticsearch.ElasticsearchException.guessRootCauses(ElasticsearchException.java:644) ~[elasticsearch-7.6.2-SNAPSHOT.jar:7.6.2-SNAPSHOT]
   at org.elasticsearch.action.search.SearchPhaseExecutionException.guessRootCauses(SearchPhaseExecutionException.java:167) ~[elasticsearch-7.6.2-SNAPSHOT.jar:7.6.2-SNAPSHOT]
   at org.elasticsearch.action.search.SearchPhaseExecutionException.getCause(SearchPhaseExecutionException.java:112) ~[elasticsearch-7.6.2-SNAPSHOT.jar:7.6.2-SNAPSHOT]
   at org.apache.logging.log4j.core.impl.ThrowableProxy.<init>(ThrowableProxy.java:139) [log4j-core-2.11.1.jar:2.11.1]
   at org.apache.logging.log4j.core.impl.ThrowableProxy.<init>(ThrowableProxy.java:122) [log4j-core-2.11.1.jar:2.11.1]
   at org.apache.logging.log4j.core.impl.MutableLogEvent.getThrownProxy(MutableLogEvent.java:351) [log4j-core-2.11.1.jar:2.11.1]
   ...

the search task is cancelled while I cancel the client request, so I don't take notes of time.

(2) when I cancel the client request on reduce phase, the log is:

[2021-03-12T17:17:30,201][DEBUG][o.e.a.s.TransportSearchAction] AbstractSearchAsyncAction.run begin, current time is [48568493537953821]
[2021-03-12T17:17:42,463][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] Received ban for the parent [N78L0bWWQEuSLPvitGDBxw:10714] on the node [N78L0bWWQEuSLPvitGDBxw], reason: [by user request]
[2021-03-12T17:18:05,123][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:10714] to the node [3gtuxzK2SY-zuO-uwXsA9A]
[2021-03-12T17:18:05,122][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:10714] to the node [qRGABA5GQayYidPQuW1Uow]
[2021-03-12T17:18:05,122][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:10714] to the node [CTb_1tG5RUSSAUp1Kzlxuw]
[2021-03-12T17:18:05,122][DEBUG][o.e.a.a.c.n.t.c.TransportCancelTasksAction] Sending remove ban for tasks with the parent [N78L0bWWQEuSLPvitGDBxw:10714] to the node [3ruMY6IJQs6yKQtPGLF7kw]
...

the process lasts nearly 35 seconds.

(3) If I don't cancel the client request, the response is:

"took" : 36027,
  "timed_out" : false,
  "_shards" : {
    "total" : 240,
    "successful" : 240,
    "skipped" : 0,
    "failed" : 0
  },

the process lasts nearly 36 seconds. So you can see when I cancel the client request on reduce phase, it still cost such a long time as I don't cancel the client request.

@lqbilbo lqbilbo added >bug needs:triage Requires assignment of a team area label labels Mar 12, 2021
@lqbilbo
Copy link
Author

lqbilbo commented Mar 15, 2021

@DaveCTurner Would you please have a look at this issue and give a help? Thanks

@dnhatn dnhatn added :Distributed/Task Management Issues for anything around the Tasks API - both persistent and node level. and removed >bug needs:triage Requires assignment of a team area label labels Mar 15, 2021
@elasticmachine elasticmachine added the Team:Distributed Meta label for distributed team label Mar 15, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@dnhatn dnhatn added the :Search/Search Search-related issues that do not fall into other categories label Mar 15, 2021
@elasticmachine elasticmachine added the Team:Search Meta label for search team label Mar 15, 2021
@dnhatn dnhatn self-assigned this Mar 15, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-search (Team:Search)

@dnhatn
Copy link
Member

dnhatn commented Mar 16, 2021

Hi @lqbilbo

We have an improvement in 7.7 that cancels search requests more quickly. Would be great if you can give it a try. Thanks very much for your interest in Elasticsearch.

@dnhatn dnhatn closed this as completed Mar 16, 2021
@Hailei
Copy link

Hailei commented Mar 16, 2021

Hi @lqbilbo

We have an improvement in 7.7 that cancels search requests more quickly. Would be great if you can give it a try. Thanks very much for your interest in Elasticsearch.

@dnhatn I don't think this improvement can address this problem. the performance of a single shard is good, the response time is only around 200ms, the main bottleneck occurs on the reduce phase in the coordinator rather than datanode, specially internalterms reduce. take the query that this issue mentioned as a example, internalterms reduce take 20s+, it's very slow. we add some debug information, found out it only reduce from about 192920 buckets to 96458

---[25834.468133ms] org.elasticsearch.search.aggregations.bucket.terms.LongTerms:reduce() +---[0.021769ms] java.util.List:iterator() #148 +---[min=0.00996ms,max=0.011012ms,total=0.020972ms,count=2] java.util.Iterator:hasNext() #57 +---[0.011056ms] java.util.Iterator:next() #57 ---[25834.243877ms] org.elasticsearch.search.aggregations.bucket.terms.InternalMappedTerms:reduce() #153
`---[25834.149544ms] org.elasticsearch.search.aggregations.bucket.terms.InternalTerms:reduce()
+---[0.010885ms] java.util.HashMap:() #196
+---[0.01061ms] java.util.List:iterator() #200
+---[min=0.007999ms,max=1.110653ms,total=1672.07046ms,count=192920] java.util.Iterator:hasNext() #57
+---[min=0.008212ms,max=0.282905ms,total=1730.821392ms,count=192917] java.util.Iterator:next() #57
+---[0.011168ms] java.lang.Object:getClass() #203
+---[min=0.010713ms,max=0.01103ms,total=0.021743ms,count=2] java.lang.Object:equals() #57
+---[0.010436ms] java.lang.Object:getClass() #207
+---[0.010805ms] java.lang.Object:getClass() #57
+---[0.012455ms] org.elasticsearch.search.aggregations.bucket.terms.InternalTerms:getSumOfOtherDocCounts() #215
+---[0.01156ms] org.elasticsearch.search.aggregations.bucket.terms.InternalTerms:getBuckets() #217
+---[0.010559ms] java.util.List:size() #57
+---[0.011703ms] org.elasticsearch.search.aggregations.bucket.terms.InternalTerms:getShardSize() #57
+---[0.011505ms] org.elasticsearch.search.aggregations.bucket.terms.InternalTerms:setDocCountError() #239
+---[0.011052ms] org.elasticsearch.search.aggregations.bucket.terms.InternalTerms:getBuckets() #240
+---[0.010479ms] java.util.List:iterator() #57
+---[min=0.008819ms,max=0.202757ms,total=954.341927ms,count=96458] org.elasticsearch.search.aggregations.bucket.terms.InternalTerms$Bucket:getKey() #249
+---[min=0.008465ms,max=0.315619ms,total=952.380985ms,count=96458] java.util.Map:get() #57
+---[min=0.00798ms,max=0.24264ms,total=865.291886ms,count=96458] java.util.ArrayList:() #251
+---[min=0.008809ms,max=0.207877ms,total=952.034638ms,count=96458] org.elasticsearch.search.aggregations.bucket.terms.InternalTerms$Bucket:getKey() #252
+---[min=0.008751ms,max=12.175545ms,total=987.84302ms,count=96458] java.util.Map:put() #57
+---[min=0.008193ms,max=0.236767ms,total=912.445066ms,count=96458] java.util.List:add() #254
+---[0.011224ms] org.elasticsearch.search.aggregations.InternalAggregation$ReduceContext:isFinalReduce() #258
+---[0.008884ms] java.util.Map:size() #57
+---[0.012234ms] org.elasticsearch.search.aggregations.BucketOrder:comparator() #259
+---[0.130125ms] org.elasticsearch.search.aggregations.bucket.terms.BucketPriorityQueue:() #57
+---[0.009372ms] java.util.Map:values() #260
+---[0.010341ms] java.util.Collection:iterator() #57
+---[min=0.011565ms,max=0.248213ms,total=1214.813142ms,count=96458] org.elasticsearch.search.aggregations.bucket.terms.InternalTerms:reduceBucket() #261
+---[min=0.00931ms,max=0.263542ms,total=934.588464ms,count=96458] org.elasticsearch.search.aggregations.bucket.terms.BucketPriorityQueue:insertWithOverflow() #268
+---[min=0.008921ms,max=0.25315ms,total=883.052061ms,count=96458] org.elasticsearch.search.aggregations.InternalAggregation$ReduceContext:consumeBucketsAndMaybeBreak() #273
+---[0.01017ms] org.elasticsearch.search.aggregations.bucket.terms.BucketPriorityQueue:size() #279
+---[0.123329ms] org.elasticsearch.search.aggregations.bucket.terms.InternalTerms:createBucketsArray() #57
+---[0.009128ms] org.elasticsearch.search.aggregations.bucket.terms.BucketPriorityQueue:size() #280
+---[min=0.008945ms,max=0.2487ms,total=1011.780436ms,count=96458] org.elasticsearch.search.aggregations.bucket.terms.BucketPriorityQueue:pop() #281
+---[0.009289ms] java.util.List:size() #287
+---[0.00938ms] java.util.Arrays:asList() #289

So two major problem that we need to solve.

  1. cancel the execution on the reduce phase in the coordinator node
  2. improve the performance of reduce

About the first problem, we can add cancel check point in the inside of consumeBucketsAndMaybeBreak method. once creating a new bucket , we also check if the current task is canceled. if checking on creating a single bucket is a big overhead, we can check every multiple buckets

previously, we compare elasticsearch with apache doris. take terms bucket aggregation query as example. if the response contains more than 60 thousands buckets, it will run until timeout, but apache doris can be easy to handle.

a query that build 507157 buckets on apache doris only take 219ms

  • Probe Method: HashTable Linear Probing
    - BuildTime: 219.809ms
    - GetResultsTime: 20.840ms
    - HTResize: 96
    - HTResizeTime: 19.852ms
    - HashBuckets: 1.048576M (1048576)
    - HashCollisions: 40
    - HashFailedProbe: 0
    - HashFilledBuckets: 507.157K (507157)
    - HashProbe: 507.157K (507157)
    - HashTravelLength: 1.012417M (1012417)
    - LargestPartitionPercent: 6
    - MaxPartitionLevel: 0
    - NumRepartitions: 0
    - PartitionsCreated: 16
    - PeakMemoryUsage: 82.28 MB
    - RowsProcessed: 507.157K (507157)
    - RowsRepartitioned: 0
    - RowsReturned: 507.157K (507157)
    - RowsReturnedRate: 565.461K /sec
    - SpilledPartitions: 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed/Task Management Issues for anything around the Tasks API - both persistent and node level. :Search/Search Search-related issues that do not fall into other categories Team:Distributed Meta label for distributed team Team:Search Meta label for search team
Projects
None yet
Development

No branches or pull requests

4 participants