-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make BulkProcessor.add asynchronously execute bulk request if needed #50440
Labels
Comments
Pinging @elastic/es-core-features (:Core/Features/Java High Level REST Client) |
This was referenced May 13, 2020
elasticsearchmachine
pushed a commit
that referenced
this issue
Jan 9, 2023
We have been seeing deadlocks in ILMHistoryStore in production (#68468). The deadlock appears to be caused by the fact that BulkProcessor uses two locks (`BulkProcessor.lock` and `BulkRequestHandler.semaphore`) and holds onto the latter lock for an indefinite amount of time. This PR avoids deadlock by using a new BulkProcessor2 that does not require that both locks be held at once, and drastically shortens the amount of time that either lock needs to be held. It does this by adding a new queue (in the Retry2 class). Note that we have left the original BulkProcessor in place, and have cloned/modified it into BulkProcessor2. BulkProcessor2 for now is only used by ILMHistoryStore but will likely replace BulkProcessor altogether in the near future. The flow in the original BulkProcessor is like this: - ILMHistoryStore adds IndexRequests to BulkProcessor asynchronously. - BulkProcessor acquires its lock to build up a BulkRequest from these IndexRequests. - If a call from ILMHistoryStore adds an IndexRequest that pushes the BulkRequest over its configured threshold (size or count) then it calls BulkRequestHandler to load that BulkRequest to the server. - BulkRequestHandler must acquire a token from its semaphore to do this. - It calls Client::bulk from the current thread. - If it fails, it keeps the semaphore lock while it retries (possibly multiple times). The flow in the new BulkProcessor2: - ILMHistoryStore adds IndexRequests to BulkProcessor synchronously (since this part is very fast now). - BulkProcessor acquires its lock to build up a BulkRequest from these IndexRequests. - If a call from ILMHistoryStore adds an IndexRequest that pushes the BulkRequest over its configured threshold (size or count) then it calls Retry2::withBackoff to attempt to load the bulk a fixed number of times. - If the number of bytes already in flight to Elasticsearch is higher than a configured number, or if Elasticsearch is too busy, the listener is notified with an EsRejectedExecutionException. - Either way, control returns immediately and the BulkProcessor lock is released. We are no longer using a semaphore to throttle how many concurrent requests can be sent to Elasticsearch at once. And there is no longer any blocking. Instead we throttle the total number of bytes in flight to Elasticsearch (approximate), and allow Elasticsearch to throw an EsRejectedExecutionException if it thinks there are too many concurrent requests. Closes #50440 Closes #68468
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Currently when invoking the
.add
method onBulkProcessor
, there is a chance that the.add
also executes the request. This execution should be done asynchronously so that the add method can be blocking in terms of adding to the queue, but non-blocking for actual execution.The text was updated successfully, but these errors were encountered: