-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Decouple BulkProcessor from client implementation #23373
Decouple BulkProcessor from client implementation #23373
Conversation
This is kind of a first take an this issue. I would like some feedback so that I can make any appropriate adjustments. There are a couple things that are kind of ugly about this right now. Once you do not pass the client to the BulkProcessor you must pass three more things:
It is implicit that the method reference will execute the action on a thread pool. But then you still need the thread pool to schedule the retry. Maybe this should just take a second method reference to call on the retry? And then the bulk processor just needs to calculate if the retry should happen / the time value to back off. It is up to the retry method to implement the execution? Also should the Settings be required or optional? We can instantiate a logger without settings, but there is less information logged in that case (don't have the prefixes). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left a comment, looks good though. I would also be interested to see this applied to the REST high level client (especially the thread pool bits). Maybe we can add a test or enhance the current tests that we have.
|
||
/** | ||
* Creates a builder of bulk processor with the client to use and the listener that will be used | ||
* to be notified on the completion of bulk requests. | ||
*/ | ||
public Builder(Client client, Listener listener) { | ||
this.client = client; | ||
public Builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener, Settings settings, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have two static helpers that allow to create the processor either providing the Client or the RestHighLevelClient ? I am thinking of users, there are many existing usages of BulkProcessor out there. I may be ok to change the way it gets created, but as a user I would be surprised to have to pass in a method reference. That should be more of an implementation detail.
|
// Objects.requireNonNull(listener, "listener"); | ||
// | ||
// return new Builder(consumer, listener, settings, threadPool); | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leftover?
That makes sense. Can we have the same public method that takes a
You're right the threadpool is not needed in the high level client. Maybe have two different creation paths through builder that make it very clear that providing the |
maybe instead of having to pass down the threadpool we should abstract that too with a |
I haven't read the PR but that is my instinct for how to deal with retries as well. |
* Sets an optional BiFunction to schedule flush actions and request retries | ||
*/ | ||
public Builder setScheduleFunction(BiFunction<TimeValue, Runnable, ScheduledFuture<?>> scheduleFunction) { | ||
this.scheduleFunction = scheduleFunction; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see this member being used anywhere? Maybe I am missing something? I also wonder if we should have a test setting this as well so we can check that it's actaully used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not being used right now. I had added this as I was exploring passing a lambda to the builder. I'll provide a more in-depth comment about where this PR is currently a little later today.
I explored this and ran into a problem. I have successfully implemented a version where the user can pass a threadpool instance. If they do not, I instantiate a scheduledthreadpool instance. In this version the builder is still using a As this action is slightly different (must take the accumulated pieces and execute request opposed to just executing request) the Any thoughts? Is the current design sufficient (where you can provide a threadpool otherwise a scheduled threadpool will be created)? Or is there a different approach I should take? |
Another option would be that if they provided a Essentially the BiFunction approach does not remove the need for a threadpool unless the "flush" design was changed. |
honestly I think we should just decouple client from this and stick with ThreadPool. Just require threadpool to be set and we are good. If somebody has a client they can just call |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few comments, thanks for trying to abstract away the thread pool, I needed to see what it looked like in practice. I agree with Simon that we should go back and rather require a thread pool. We can work later on a few helper methods specifically for that in the high level client.
}, Settings.EMPTY) | ||
.setConcurrentRequests(0) | ||
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.GB)) | ||
.setBulkActions(nbItems + 1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why +1000 here and such a high size value? Do we want to make sure that all docs are sent in one go?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why I set that so high. I is higher than nbItems to ensure that it does not flush until I manually do it. But I will make it 1 instead of 1000.
|
||
@Override | ||
public void afterBulk(long executionId, BulkRequest request, Throwable failure) { | ||
error.set(failure); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we want to keep track of the request here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main point is of that callback is to set an exception if one occurs. And if the exception is not null than the test fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good to me
} else { | ||
BytesReference source = XContentBuilder.builder(xContentType.xContent()).startObject().field("id", i).endObject().bytes(); | ||
if (opType == DocWriteRequest.OpType.INDEX) { | ||
IndexRequest indexRequest = new IndexRequest("index", "test", id).source(source, xContentType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we pass XContentBuilder in to the source method directly? if we it in all these branches we may not need to call .bytes
and save the source to a variable?
assertNull(responseRef.get()); | ||
assertNull(requestRef.get()); | ||
|
||
processor.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why flush and not close? could we even use a a try with resources?
} | ||
} | ||
|
||
public static Builder builder(Client client, Listener listener) { | ||
Objects.requireNonNull(client, "client"); | ||
Objects.requireNonNull(listener, "listener"); | ||
|
||
return new Builder(client, listener); | ||
return new Builder(client::bulk, listener, client.settings()).setThreadPool(client.threadPool()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, effectively this change will be backwards compatible for users using this method. Not for users calling the public Builder constructor, but I think that is a good reason to make them switch to this static method.
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new Flush(), flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS); | ||
BiFunction<TimeValue, Runnable, ScheduledFuture<?>> scheduleFn; | ||
if (threadPool == null) { | ||
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, (name != null ? "[" + name + "]" : "") + "bulk_processor"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I would consider taking settings out. We try to extract node.name from it as far as I understand, which I don't think is ever going to be set on a transport client. Maybe we can just live without that part of the thread name and remove some complexity. This may very well come from the times where node client was widely used, hence node.name made sense in the past, but it doesn't anymore.
scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); | ||
scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); | ||
this.schedulerToStop = scheduledExecutor; | ||
scheduleFn = Retry.fromScheduledExecutorService(scheduledExecutor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to agree with Simon on requiring a ThreadPool. We can have a helper in the high level client to create one. Otherwise the code here gets too complex and I am not sure it is worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure what helper we would need...new ThreadPool(settings);
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably new ThreadPool(Settings.EMPTY)
:) I wonder if the helper could hide the creation of the thread pool even, maybe not, but we will see that later. As you pointed out it may be that the helper is not needed at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright. I can make a threadpool mandatory for the builder.
Thanks for the reviews. I've made most of the changes from @javanna's review. I got distracted a little today setting up my workstation. But I should finish up the PR tomorrow. |
@javanna I think my updates addressed the issues you raised. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had another look, getting closer but I think we can simplify things further. @s1monw mind having a look too just to make sure we are not the same page?
* The default is to back off exponentially. | ||
* | ||
* @see org.elasticsearch.action.bulk.BackoffPolicy#exponentialBackoff() | ||
* Sets an optional thread pool on which to schedules flush actions and request retries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should it not be required? If so I would move it to the constructor of the Builder and remove the setter for it.
this.client = client; | ||
this.logger = Loggers.getLogger(getClass(), client.settings()); | ||
protected final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer; | ||
protected final BiFunction<TimeValue, Runnable, ScheduledFuture<?>> scheduler; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we make the threadpool mandatory, then this scheduler becomes just ThreadPool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see how we can consider bulk processor decoupled if it needs a ThreadPool.
this.listener = listener; | ||
this.logger = Loggers.getLogger(getClass(), client.settings()); | ||
this.logger = Loggers.getLogger(getClass(), settings); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can get rid of settings here too. It's done to prefix the log lines with node.name. Is this still important with transport client / REST client? I think this is almost never set on a client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retry.withAsyncBackoff
is used by AbstractAsyncBulkByScrollAction
. I left the Settings
at the retry logic since I did not want to change how logging is done for that class. Instead Settings
are not required by the BulkProcessor and it just passes Settings.EMPTY
to the retry level. Is that not correct? I was just hesitant to change code that will impact other pieces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nik9000 do you think we can get rid of Settings here or shall we not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok let's leave this for now. We can address this later. I think it was a good move not to remote settings yet. I will follow-up with Nik on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, didn't catch this ping. I believe this should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tbrooks8 no need to do it as part of this PR, we can make this change later.
AbstractRetryHandler(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Client client, ActionListener<BulkResponse> listener) { | ||
RetryHandler(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, | ||
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, ActionListener<BulkResponse> listener, | ||
Settings settings, BiFunction<TimeValue, Runnable, ScheduledFuture<?>> scheduleFn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally here you would just need to replace Client with ThreadPool, at least if we can get rid of Settings. Most of these changes should not be needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still pass the settings here as explain in this comment #23373 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left a few more comments, thanks @tbrooks8
* The default is to back off exponentially. | ||
* | ||
* @see org.elasticsearch.action.bulk.BackoffPolicy#exponentialBackoff() | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are these javadocs gone?
BulkProcessor(Client client, BackoffPolicy backoffPolicy, Listener listener, @Nullable String name, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval) { | ||
BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener, | ||
int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval, | ||
@Nullable ThreadPool threadPool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the ThreadPool still be nullable?
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(client.settings(), (name != null ? "[" + name + "]" : "") + "bulk_processor")); | ||
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); | ||
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); | ||
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new Flush(), flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may have forgotten, but what has changed here compared to the startFlush method? We don't call daemonThreadFactory as we dropped the name and settings requirement for logging right? I wonder if we should still call that and just provide the standard "bulk_processor" prefix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the past startFlush could use a ScheduledExecutor that we might have created in the BulkProcessor. Now we require a ThreadPool we do not need this code anymore. As we can just use the thread pool to schedule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this sounds weird, as we were requiring a Client before, which comes with a ThreadPool implicitly. That means this change could be potentially made outside of this PR right (not asking you to move it, just trying to better understand)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this change is not dependent on my work. The "flush" task probably could have been scheduled using the client's ThreadPool. But for some reason the BulkProcessor was creating its own for scheduling flush tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it thank you.
@@ -334,6 +325,10 @@ private boolean isOverTheLimit() { | |||
return false; | |||
} | |||
|
|||
private Runnable cancelTask(ThreadPool.Cancellable cancellable) { | |||
return cancellable::cancel; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we fold this method into its only caller above? Also did we change the way we cancel the flush scheduler on close? Weren't we using FutureUtils#cancel before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we can fold it in. And we are not using FutureUtils because we are not cancelling a ScheduledFuture anymore. Repeated tasks scheduled on the ES ThreadPool are implement ThreadPool.Cancellable
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it thanks.
} | ||
|
||
// Start period flushing task after everything is setup | ||
this.cancelTask = startFlush(flushInterval, threadPool); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it hard to read that startFlush returns the task to call when closing the processor. Maybe the whole method can be moved here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if I just can that to this:
ThreadPool.Cancellable cancellableFlushTask = startFlushTask(flushInterval, threadPool);
And then the close() method just calls cancel(). No need for a weird conversion to Runnable.
If there is no flushing I will just return a NoOp ThreadPool.Cancellable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good thanks
return actionFuture.actionGet(); | ||
} | ||
|
||
static BiFunction<TimeValue, Runnable, ScheduledFuture<?>> fromThreadPool(ThreadPool threadPool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given that we require ThreadPool, I wonder if we still need this method and its caller too. Can't we just require ThreadPool like we used to require the Client before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't look like this requires ThreadPool though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like, only this method requires it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that you left comments based on a previous version of this PR, not quite sure how that happened.
this.listener = listener; | ||
this.logger = Loggers.getLogger(getClass(), client.settings()); | ||
this.logger = Loggers.getLogger(getClass(), settings); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nik9000 do you think we can get rid of Settings here or shall we not?
private final Logger logger; | ||
private final Client client; | ||
private final BiFunction<TimeValue, Runnable, ScheduledFuture<?>> scheduleFn; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need the function? after all this handler will depend on the ThreadPool as that is the only way we'll use it. Should we just have a ThreadPool member instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do that. I guess the only reason we would not do that is if we wanted Retry
to be usable without a ThreadPool. In the current version Retry does not require a ThreadPool. So theoretically, this class could be used with just a lambda. Although Retry is currently only used by the BulkProcessor
and AbstractAsyncBulkByScrollAction
.
I don't really have a preference. Let me know what you think is best.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would find it more readable at the moment with the hard dependency on ThreadPool, which is what we are going for at the moment, even in client code. We can always change this if we don't want to depend on thread pool later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I'll adjust Retry to depend on ThreadPool.
*/ | ||
public void withAsyncBackoff(Client client, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) { | ||
AsyncRetryHandler r = new AsyncRetryHandler(retryOnThrowable, backoffPolicy, client, listener); | ||
public void withAsyncBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest, ActionListener<BulkResponse> listener, Settings settings) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we rather have ThreadPool as an argument here instead and remove the new on(ThreadPool)
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can. Although we do not do that with policy which is also required:
public Retry policy(BackoffPolicy backoffPolicy) {
this.backoffPolicy = backoffPolicy;
return this;
}
Do you just want me to turn Retry
into a normal "ctor-using" class?
Retry retry = new Retry(Class<? extends Throwable> retryOnThrowable, BackoffPolicy policy, ThreadPool threadPool, Settings settings);
retry.withAsyncBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest, ActionListener<BulkResponse> listener);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dunno. I don't love this class. I was trying to keep it closer as it used to be, and only replace Client with biconsumer and threadpool. I also don't think it's clear from the method name that withSyncBackoff
actually sends the request. But I don't want to change too much as part of this PR either. If you feel like it's better as-is I am fine with it too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well I definitely do not like the current distinction between withSyncBackoff
and withASyncBackoff
. I have a separate PR that I have been working on locally to simplify the Retry class and its relationship with the BulkProcessor
. My plan was to submit that PR once this is merged.
I guess it is just a question of what you think for this PR. If you think constructing Retry
is out of scope for this, I can consider changes in a follow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool then leave what you have, looking forward to your followup PR
return () -> {}; | ||
} | ||
|
||
ThreadPool.Cancellable cancellable = threadPool.scheduleWithFixedDelay(new Flush(), flushInterval, ThreadPool.Names.SAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure threadPoll can be nullable if you use it here?
this.client = client; | ||
this.logger = Loggers.getLogger(getClass(), client.settings()); | ||
protected final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer; | ||
protected final BiFunction<TimeValue, Runnable, ScheduledFuture<?>> scheduler; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see how we can consider bulk processor decoupled if it needs a ThreadPool.
.on(EsRejectedExecutionException.class) | ||
.policy(backoffPolicy) | ||
.withSyncBackoff(client, bulkRequest); | ||
.on(EsRejectedExecutionException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you not do the formatting changes in future? Personally I like the 8 spaces, but regardless I think it makes the change harder to review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see how we can consider bulk processor decoupled if it needs a ThreadPool.
Requiring a ThreadPool
is the way forward that we chose after a few attempts. We will require a ThreadPool
in the high level client too. It is simple enough to create. We will see what else can be done to make this better. The bulk processor is decoupled meaning that it doesn't require a Client anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left a couple of questions and thoughts, but LGTM.
validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest); | ||
|
||
threadPool.shutdown(); | ||
threadPool.awaitTermination(1, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can replace these two lines with a call to ThreadPool.terminate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even better EsTestCase#terminate
try(BulkProcessor processor = new BulkProcessor.Builder(highLevelClient()::bulkAsync, listener, threadPool) | ||
.setConcurrentRequests(0) | ||
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.GB)) | ||
.setBulkActions(nbItems + 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does it matter that the bulk processor flushes all the docs at once on closing? can't we rely on the fact that after close everything is flushed, but it doesn't really matter how many bulk requests are sent? Is it because it makes bulk response validation more complicated as we'd have to accumulate the responses gotten from the listener?
}; | ||
} | ||
|
||
return threadPool.scheduleWithFixedDelay(new Flush(), flushInterval, ThreadPool.Names.SAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is SAME thread pool ok here? This way, can other operations interfere with periodic bulk flushing if enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine the assumption is that flush is pretty fast?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be fast as it just calls the Consumer
which will likely dispatch to whatever threading model the client is using. But I do feel "same" is kind of weird as I assume that is thread that created the BulkProcessor
. Should this be generic
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible that the flush could block as a call to semaphore.acquire();
is made. And it is possible that user initiated flushes are happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think SAME is fine. We can totally go on the save side and just use GENERIC too. I don't think it's critical so lets just play safe and use GENERIC?
this.listener = listener; | ||
this.logger = Loggers.getLogger(getClass(), client.settings()); | ||
this.logger = Loggers.getLogger(getClass(), settings); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok let's leave this for now. We can address this later. I think it was a good move not to remote settings yet. I will follow-up with Nik on this.
/** | ||
* Sets an optional name to identify this bulk processor. | ||
*/ | ||
public Builder setName(String name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the record, I think it is ok to remove setName here. the name was only used to have slightly different logging for the flush daemon thread. We can obtain the same now that we use the thread pool in the same way we do everywhere in this class, through node.name in the settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM thanks for all the iterations
|
||
@Override | ||
public void afterBulk(long executionId, BulkRequest request, Throwable failure) { | ||
error.set(failure); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good to me
/** | ||
* Sets an optional name to identify this bulk processor. | ||
*/ | ||
public Builder setName(String name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
}; | ||
} | ||
|
||
return threadPool.scheduleWithFixedDelay(new Flush(), flushInterval, ThreadPool.Names.SAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine the assumption is that flush is pretty fast?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still LGTM thanks @tbrooks8
This commit modifies the
BulkProcessor
to be decoupled from theclient implementation. Instead it just takes a
BiConsumer<BulkRequest, ActionListener<BulkResponse>>
that executesthe
BulkRequest
.