Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make BulkProcessor.add asynchronously execute bulk request if needed #50440

Closed
dakrone opened this issue Dec 20, 2019 · 1 comment · Fixed by #91238
Closed

Make BulkProcessor.add asynchronously execute bulk request if needed #50440

dakrone opened this issue Dec 20, 2019 · 1 comment · Fixed by #91238
Assignees
Labels
>enhancement Team:Data Management Meta label for data/management team

Comments

@dakrone
Copy link
Member

dakrone commented Dec 20, 2019

Currently when invoking the .add method on BulkProcessor, there is a chance that the .add also executes the request. This execution should be done asynchronously so that the add method can be blocking in terms of adding to the queue, but non-blocking for actual execution.

@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-features (:Core/Features/Java High Level REST Client)

@rjernst rjernst added the Team:Data Management Meta label for data/management team label May 4, 2020
@joegallo joegallo self-assigned this Feb 5, 2021
fixmebot bot referenced this issue in VectorXz/elasticsearch Apr 22, 2021
fixmebot bot referenced this issue in VectorXz/elasticsearch May 28, 2021
fixmebot bot referenced this issue in VectorXz/elasticsearch Aug 4, 2021
elasticsearchmachine pushed a commit that referenced this issue Jan 9, 2023
We have been seeing deadlocks in ILMHistoryStore in production (#68468).
The deadlock appears to be caused by the fact that BulkProcessor uses
two locks (`BulkProcessor.lock` and `BulkRequestHandler.semaphore`) and
holds onto the latter lock for an indefinite amount of time.

This PR avoids deadlock by using a new BulkProcessor2 that does not
require that both locks be held at once, and drastically shortens the
amount of time that either lock needs to be held. It does this by adding
a new queue (in the Retry2 class). Note that we have left the original
BulkProcessor in place, and have cloned/modified it into BulkProcessor2.
BulkProcessor2 for now is only used by ILMHistoryStore but will likely
replace BulkProcessor altogether in the near future. 

The flow in the original BulkProcessor is like this:

- ILMHistoryStore adds IndexRequests to BulkProcessor asynchronously.
- BulkProcessor acquires its lock to build up a BulkRequest from these IndexRequests.
- If a call from ILMHistoryStore adds an IndexRequest that pushes the BulkRequest over its configured threshold (size or count) then it calls BulkRequestHandler to load that BulkRequest to the server.
- BulkRequestHandler must acquire a token from its semaphore to do this.
- It calls Client::bulk from the current thread.
- If it fails, it keeps the semaphore lock while it retries (possibly multiple times).

The flow in the new BulkProcessor2:

- ILMHistoryStore adds IndexRequests to BulkProcessor synchronously (since this part is very fast now).
- BulkProcessor acquires its lock to build up a BulkRequest from these IndexRequests.
- If a call from ILMHistoryStore adds an IndexRequest that pushes the BulkRequest over its configured threshold (size or count) then it calls Retry2::withBackoff to attempt to load the bulk a fixed number of times.
- If the number of bytes already in flight to Elasticsearch is higher than a configured number, or if Elasticsearch is too busy, the listener is notified with an EsRejectedExecutionException.
- Either way, control returns immediately and the BulkProcessor lock is released.

We are no longer using a semaphore to throttle how many concurrent
requests can be sent to Elasticsearch at once. And there is no longer
any blocking. Instead we throttle the total number of bytes in flight to
Elasticsearch (approximate), and allow Elasticsearch to throw an
EsRejectedExecutionException if it thinks there are too many concurrent
requests.

Closes #50440 Closes #68468
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>enhancement Team:Data Management Meta label for data/management team
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants