-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change bulk's retry condition to be based on RestStatus #29329
Conversation
Previously bulk's retry logic was based on Exception type of the failed response, here we change it to be based on RestStatus, in order to support rest hight level's request.
Since this is a community submitted pull request, a Jenkins build has not been kicked off automatically. Can an Elastic organization member please verify the contents of this patch and then kick off a build manually? |
1 similar comment
Since this is a community submitted pull request, a Jenkins build has not been kicked off automatically. Can an Elastic organization member please verify the contents of this patch and then kick off a build manually? |
Pinging @elastic/es-core-infra |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good @PnPie thanks a lot for opening this! I left a few comments but this is already pretty close.
public Retry(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Scheduler scheduler) { | ||
this.retryOnThrowable = retryOnThrowable; | ||
public Retry(RestStatus retryOnStatus, BackoffPolicy backoffPolicy, Scheduler scheduler) { | ||
this.retryOnStatus = retryOnStatus; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we consider making this hardcoded rather than an argument given that we always pass in the same value for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, as it's only used here in bulk for this case.
final Throwable cause = bulkItemResponse.getFailure().getCause(); | ||
final Throwable rootCause = ExceptionsHelper.unwrapCause(cause); | ||
if (!rootCause.getClass().equals(retryOnThrowable)) { | ||
final RestStatus status = bulkItemResponse.getFailure().getStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to double check: we don't need to unwrap here anymore because the status of the root cause is propagated to its ancestor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we get the status
field of BulkItemResponse
's Failure
, it is a seperate field than Exception cause
in Failure
class. So I find it always has the good value (RestStatus.TOO_MANY_REQUESTS
) ? because the exception type was changed only through toXContent/fromXContent
of BulkItemResponse
, but in it the status
was already parsed seperatly. So the status should always be good ? (except after toXContent/fromXContent, the BulkItemResponse
was transfered again using readFrom/writeTo
, which I don't think it's the case ?)
If it's this, I changed bulkItemResponse.getFailure().getStatus();
to bulkItemResponse.status();
, because it's the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is correct. I also don't follow why readFrom/writeTo
Would cause issues, the exception type does change but the status stays the same right?
if (rootCause instanceof EsRejectedExecutionException) { | ||
if (rejectedExecutionExpected == false) { | ||
if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) { | ||
if (!rejectedExecutionExpected) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you leave the previous rejectedExecutionExpected == false
please? we prefer this one for readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you address this please?
if (rejectedExecutionExpected == false) { | ||
if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) { | ||
if (!rejectedExecutionExpected) { | ||
Throwable rootCause = ExceptionsHelper.unwrapCause(failure.getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we do not even need to unwrap it here anymore? could we just use getCause instead when throwing assertion error below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that this has not been addressed. Is that on purpose? I see that you have done this on the client version of the test so it should be fine here too.
|
||
static { | ||
System.setProperty("tests.rest.cluster", "localhost:9200"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this static block should go away. I don't see why it's needed.
assertThat(searchResultCount, lessThan(numberOfAsyncOps)); | ||
} else { | ||
assertThat(searchResultCount, equalTo(numberOfAsyncOps)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the assertions seem more accurate here, thanks! Would you mind making the same change in the original test for the transport client? It should work there too right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes !
assertThat(searchResultCount, equalTo(numberOfAsyncOps)); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think that we should also have the last check based on the search API and the returned total hits? Or maybe now that we are using multi_get that step is not necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think the multi get request
we prepared when we indexed the documents is doing this, the same thing ? do a search (multi get) for all the indices in the end, to compare ? Seems in rest high level tests we are using rather multi get.
} | ||
|
||
highLevelClient().indices().refresh(new RefreshRequest()); | ||
int searchResultCount = highLevelClient().multiGet(multiGetRequest).getResponses().length; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you rename this variable, this is not about search anymore, rather the result of multi_get
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@javanna Thank you for your detailed reveiw ! I updated it according to the comments.
assertThat(searchResultCount, lessThan(numberOfAsyncOps)); | ||
} else { | ||
assertThat(searchResultCount, equalTo(numberOfAsyncOps)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes !
assertThat(searchResultCount, equalTo(numberOfAsyncOps)); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think the multi get request
we prepared when we indexed the documents is doing this, the same thing ? do a search (multi get) for all the indices in the end, to compare ? Seems in rest high level tests we are using rather multi get.
public Retry(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Scheduler scheduler) { | ||
this.retryOnThrowable = retryOnThrowable; | ||
public Retry(RestStatus retryOnStatus, BackoffPolicy backoffPolicy, Scheduler scheduler) { | ||
this.retryOnStatus = retryOnStatus; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, as it's only used here in bulk for this case.
final Throwable cause = bulkItemResponse.getFailure().getCause(); | ||
final Throwable rootCause = ExceptionsHelper.unwrapCause(cause); | ||
if (!rootCause.getClass().equals(retryOnThrowable)) { | ||
final RestStatus status = bulkItemResponse.getFailure().getStatus(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we get the status
field of BulkItemResponse
's Failure
, it is a seperate field than Exception cause
in Failure
class. So I find it always has the good value (RestStatus.TOO_MANY_REQUESTS
) ? because the exception type was changed only through toXContent/fromXContent
of BulkItemResponse
, but in it the status
was already parsed seperatly. So the status should always be good ? (except after toXContent/fromXContent, the BulkItemResponse
was transfered again using readFrom/writeTo
, which I don't think it's the case ?)
If it's this, I changed bulkItemResponse.getFailure().getStatus();
to bulkItemResponse.status();
, because it's the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @PnPie thanks for addressing the comments, I left a couple more minors, but I see some tests are failing:
Tests with failures:
- org.elasticsearch.client.CrudIT.testBulkProcessorIntegration
- org.elasticsearch.client.BulkProcessorRetryIT.testBulkRejectionLoadWithBackoff
- org.elasticsearch.client.BulkProcessorRetryIT.testBulkRejectionLoadWithoutBackoff
- org.elasticsearch.client.BulkProcessorIT.testBulkProcessorConcurrentRequestsReadOnlyIndex
and
Tests with failures:
- org.elasticsearch.action.bulk.RetryTests.testRetryWithListenerBacksOff
- org.elasticsearch.action.bulk.RetryTests.testRetryBacksOff
Could you please have a look at these? I am under the impressions that tests were green before recent changes, but I may be wrong, could you please look into this? Let me know if you need any help
if (rejectedExecutionExpected == false) { | ||
if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) { | ||
if (!rejectedExecutionExpected) { | ||
Throwable rootCause = ExceptionsHelper.unwrapCause(failure.getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that this has not been addressed. Is that on purpose? I see that you have done this on the client version of the test so it should be fine here too.
if (rootCause instanceof EsRejectedExecutionException) { | ||
if (rejectedExecutionExpected == false) { | ||
if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) { | ||
if (!rejectedExecutionExpected) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you address this please?
hi @PnPie would you have a chance to address my last comments? This is a good change, would love to get it in. |
…o rest_highlevel_bulk_retry
Hello @javanna, |
test this please |
retest this please |
retest this please |
@PnPie tests were failing but not sure the failures were related to your changes, I merged master in and triggered a new build. |
Hi @javanna, |
Hello @javanna, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the record, this requires #30384 otherwise the added tests fail. |
retest this please |
thanks a lot @PnPie ! |
…or-you * elastic/master: (22 commits) Docs: Test examples that recreate lang analyzers (elastic#29535) BulkProcessor to retry based on status code (elastic#29329) Add GET Repository High Level REST API (elastic#30362) add a comment explaining the need for RetryOnReplicaException on missing mappings Add `coordinating_only` node selector (elastic#30313) Stop forking groovyc (elastic#30471) Avoid setting connection request timeout (elastic#30384) Use date format in `date_range` mapping before fallback to default (elastic#29310) Watcher: Increase HttpClient parallel sent requests (elastic#30130) Mute ML upgrade test (elastic#30458) Stop forking javac (elastic#30462) Client: Deprecate many argument performRequest (elastic#30315) Docs: Use task_id in examples of tasks (elastic#30436) Security: Rename IndexLifecycleManager to SecurityIndexManager (elastic#30442) [Docs] Fix typo in cardinality-aggregation.asciidoc (elastic#30434) Avoid NPE in `more_like_this` when field has zero tokens (elastic#30365) Build: Switch to building javadoc with html5 (elastic#30440) Add a quick tour of the project to CONTRIBUTING (elastic#30187) Reindex: Use request flavored methods (elastic#30317) Silence SplitIndexIT.testSplitIndexPrimaryTerm test failure. (elastic#30432) ...
Previously `BulkProcessor` retry logic was based on the exception type of the failed response (`EsRejectedExecutionException`). This commit changes it to be based on the returned status code. This allows us to reproduce the same retry behaviour when the `BulkProcessor` is used from the high-level REST client, which was previously not the case as we cannot rebuild the same exception type when parsing back the response. This change has no effect on the transport client. Closes #28885
* master: Upgrade to Lucene-7.4-snapshot-6705632810 (#30519) add version compatibility from 6.4.0 after backport, see #30319 (#30390) Security: Simplify security index listeners (#30466) Add proper longitude validation in geo_polygon_query (#30497) Remove Discovery.AckListener.onTimeout() (#30514) Build: move generated-resources to build (#30366) Reindex: Fold "with all deps" project into reindex (#30154) Isolate REST client single host tests (#30504) Solve Gradle deprecation warnings around shadowJar (#30483) SAML: Process only signed data (#30420) Remove BWC repository test (#30500) Build: Remove xpack specific run task (#30487) AwaitsFix IntegTestZipClientYamlTestSuiteIT#indices.split tests LLClient: Add setJsonEntity (#30447) Expose CommonStatsFlags directly in IndicesStatsRequest. (#30163) Silence IndexUpgradeIT test failures. (#30430) Bump Gradle heap to 1792m (#30484) [docs] add warning for read-write indices in force merge documentation (#28869) Avoid deadlocks in cache (#30461) Test: remove hardcoded list of unconfigured ciphers (#30367) mute SplitIndexIT due to #30416 Docs: Test examples that recreate lang analyzers (#29535) BulkProcessor to retry based on status code (#29329) Add GET Repository High Level REST API (#30362) add a comment explaining the need for RetryOnReplicaException on missing mappings Add `coordinating_only` node selector (#30313) Stop forking groovyc (#30471) Avoid setting connection request timeout (#30384) Use date format in `date_range` mapping before fallback to default (#29310) Watcher: Increase HttpClient parallel sent requests (#30130) # Conflicts: # x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java
* 6.x: Upgrade to Lucene-7.4-snapshot-6705632810 (#30519) Remove Discovery.AckListener.onTimeout() (#30514) Build: move generated-resources to build (#30366) Reindex: Fold "with all deps" project into reindex (#30154) Isolate REST client single host tests (#30504) Remove BWC repository test (#30500) Build: Remove xpack specific run task (#30487) AwaitsFix IntegTestZipClientYamlTestSuiteIT#indices.split tests LLClient: Add setJsonEntity (#30447) [docs] add warning for read-write indices in force merge documentation (#28869) Avoid deadlocks in cache (#30461) BulkProcessor to retry based on status code (#29329) Avoid setting connection request timeout (#30384) Test: remove hardcoded list of unconfigured ciphers (#30367) Add GET Repository High Level REST API (#30362) mute SplitIndexIT due to #30416 Docs: Test examples that recreate lang analyzers (#29535) add a comment explaining the need for RetryOnReplicaException on missing mappings Pass the task to broadcast actions (#29672) Stop forking groovyc (#30471) Add `coordinating_only` node selector (#30313) Fix accidental error in changelog Use date format in `date_range` mapping before fallback to default (#29310) Watcher: Increase HttpClient parallel sent requests (#30130) [Security][Tests] Azeri(Turkish) locale tripps opensaml dependency
@javanna This may be backported now. |
Previously `BulkProcessor` retry logic was based on the exception type of the failed response (`EsRejectedExecutionException`). This commit changes it to be based on the returned status code. This allows us to reproduce the same retry behaviour when the `BulkProcessor` is used from the high-level REST client, which was previously not the case as we cannot rebuild the same exception type when parsing back the response. This change has no effect on the transport client. Closes #28885
Previously bulk's retry logic was based on the exception type (
EsRejectedExecutionException
) of the failed response, this changes it to be based on RestStatus (RestStatus.TOO_MANY_REQUESTS
), in order to support rest hight level client. (more information can be found #29254).Close #28885