Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple BulkProcessor from client implementation #23373

Merged
merged 24 commits into from
Apr 5, 2017

Conversation

Tim-Brooks
Copy link
Contributor

This commit modifies the BulkProcessor to be decoupled from the
client implementation. Instead it just takes a
BiConsumer<BulkRequest, ActionListener<BulkResponse>> that executes
the BulkRequest.

@Tim-Brooks
Copy link
Contributor Author

Tim-Brooks commented Feb 25, 2017

This is kind of a first take an this issue. I would like some feedback so that I can make any appropriate adjustments.

There are a couple things that are kind of ugly about this right now. Once you do not pass the client to the BulkProcessor you must pass three more things:

  1. The method reference for making the bulk request
  2. The thread pool that the request will be processed on and a retry will be processed on
  3. Settings (for instantiating a logger).

It is implicit that the method reference will execute the action on a thread pool. But then you still need the thread pool to schedule the retry. Maybe this should just take a second method reference to call on the retry? And then the bulk processor just needs to calculate if the retry should happen / the time value to back off. It is up to the retry method to implement the execution?

Also should the Settings be required or optional? We can instantiate a logger without settings, but there is less information logged in that case (don't have the prefixes).

@Tim-Brooks Tim-Brooks added the :Core/Infra/Core Core issues without another label label Feb 26, 2017
@javanna javanna added :Core/Infra/Transport API Transport client API and removed :Core/Infra/Core Core issues without another label labels Feb 26, 2017
Copy link
Member

@javanna javanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left a comment, looks good though. I would also be interested to see this applied to the REST high level client (especially the thread pool bits). Maybe we can add a test or enhance the current tests that we have.


/**
* Creates a builder of bulk processor with the client to use and the listener that will be used
* to be notified on the completion of bulk requests.
*/
public Builder(Client client, Listener listener) {
this.client = client;
public Builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener, Settings settings,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have two static helpers that allow to create the processor either providing the Client or the RestHighLevelClient ? I am thinking of users, there are many existing usages of BulkProcessor out there. I may be ok to change the way it gets created, but as a user I would be surprised to have to pass in a method reference. That should be more of an implementation detail.

@Tim-Brooks
Copy link
Contributor Author

Tim-Brooks commented Feb 28, 2017

  1. I modified the PR to keep the same static helper.
  2. I explored using this for the the RestHighLevelClient. As the core does not depend on that package, there is not a way to create a static helper that specifically takes that class.
  3. The thread pool also causes some issues. I'm not sure that users of the RestHighLevelClient will have a whole org.elasticsearch.threadpool.ThreadPool instance available to pass. Especially as that particular threadpool is not necessary for using the client. And the retry logic depends on threadPool.getThreadContext().preserveContext(retry) functionality. I'm not sure if preserving this is necessary in context of the RestHighLevelClient?

// Objects.requireNonNull(listener, "listener");
//
// return new Builder(consumer, listener, settings, threadPool);
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leftover?

@javanna
Copy link
Member

javanna commented Feb 28, 2017

I modified the PR to keep the same static helper.I explored using this for the the RestHighLevelClient. As the core does not depend on that package, there is not a way to create a static helper that specifically takes that class.

That makes sense. Can we have the same public method that takes a Client in the processor builder though? The helper to create it given a RestHighLevelClient should rather go to rest-high-level.

The thread pool also causes some issues. I'm not sure that users of the RestHighLevelClient will have a whole org.elasticsearch.threadpool.ThreadPool instance available to pass. Especially as that particular threadpool is not necessary for using the client. And the retry logic depends on threadPool.getThreadContext().preserveContext(retry) functionality. I'm not sure if preserving this is necessary in context of the RestHighLevelClient?

You're right the threadpool is not needed in the high level client. Maybe have two different creation paths through builder that make it very clear that providing the Client also deals with settings and threadpool, while when providing the consumer directly we stop there. Not sure how to represent the threadpool internally in the latter case. I would want to avoid having it set to null, but having a no-op threadpool seems overkill. @s1monw WDYT?

@s1monw
Copy link
Contributor

s1monw commented Mar 20, 2017

You're right the threadpool is not needed in the high level client. Maybe have two different creation paths through builder that make it very clear that providing the Client also deals with settings and threadpool, while when providing the consumer directly we stop there. Not sure how to represent the threadpool internally in the latter case. I would want to avoid having it set to null, but having a no-op threadpool seems overkill. @s1monw WDYT?

maybe instead of having to pass down the threadpool we should abstract that too with a BiFunction<TimeValue, Runnable, ScheduledFuture<?>> that we call instead of using the threadpool. Yet, the Builder can still accept a threadpool and build it's own function unless one is explicitly specified. In that function we can do all the context preserving we need to. Users of the REST client can then just go ahead and implement their own version of the scheduled retry?

@nik9000
Copy link
Member

nik9000 commented Mar 20, 2017

instead of having to pass down the threadpool we should abstract that too with a BiFunction<TimeValue, Runnable, ScheduledFuture<?>>

I haven't read the PR but that is my instinct for how to deal with retries as well.

* Sets an optional BiFunction to schedule flush actions and request retries
*/
public Builder setScheduleFunction(BiFunction<TimeValue, Runnable, ScheduledFuture<?>> scheduleFunction) {
this.scheduleFunction = scheduleFunction;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this member being used anywhere? Maybe I am missing something? I also wonder if we should have a test setting this as well so we can check that it's actaully used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not being used right now. I had added this as I was exploring passing a lambda to the builder. I'll provide a more in-depth comment about where this PR is currently a little later today.

@Tim-Brooks
Copy link
Contributor Author

Tim-Brooks commented Mar 27, 2017

maybe instead of having to pass down the threadpool we should abstract that too with a BiFunction<TimeValue, Runnable, ScheduledFuture<?>> that we call instead of using the threadpool. Yet, the Builder can still accept a threadpool and build it's own function unless one is explicitly specified. In that function we can do all the context preserving we need to. Users of the REST client can then just go ahead and implement their own version of the scheduled retry?

I explored this and ran into a problem. I have successfully implemented a version where the user can pass a threadpool instance. If they do not, I instantiate a scheduledthreadpool instance. In this version the builder is still using a BiConsumer<BulkRequest, ActionListener<BulkResponse>>. The problem that I encountered with having the user pass a BiFunction<TimeValue, Runnable, ScheduledFuture<?>> that encapsulates the scheduling logic is that there is the "flush" feature. Currently the BulkProcessor allows the user to specify a flush interval. At that interval the processor will take the accumulated bulk request components and execute the request.

As this action is slightly different (must take the accumulated pieces and execute request opposed to just executing request) the BiFunction<TimeValue, Runnable, ScheduledFuture<?>> is not quite sufficient.

Any thoughts? Is the current design sufficient (where you can provide a threadpool otherwise a scheduled threadpool will be created)? Or is there a different approach I should take?

@Tim-Brooks
Copy link
Contributor Author

Tim-Brooks commented Mar 27, 2017

Another option would be that if they provided a BiFunction<TimeValue, Runnable, ScheduledFuture<?>> and did not provide a threadpool, we could create a new scheduled executor to execute the "flush" actions.

Essentially the BiFunction approach does not remove the need for a threadpool unless the "flush" design was changed.

@s1monw
Copy link
Contributor

s1monw commented Mar 28, 2017

honestly I think we should just decouple client from this and stick with ThreadPool. Just require threadpool to be set and we are good. If somebody has a client they can just call builder.setThreadPool(client.getThreadPool()) and we are good, if somebody want's to use the bulk processor they need to create a threadpool, period!

Copy link
Member

@javanna javanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few comments, thanks for trying to abstract away the thread pool, I needed to see what it looked like in practice. I agree with Simon that we should go back and rather require a thread pool. We can work later on a few helper methods specifically for that in the high level client.

}, Settings.EMPTY)
.setConcurrentRequests(0)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.GB))
.setBulkActions(nbItems + 1000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why +1000 here and such a high size value? Do we want to make sure that all docs are sent in one go?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why I set that so high. I is higher than nbItems to ensure that it does not flush until I manually do it. But I will make it 1 instead of 1000.


@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
error.set(failure);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we want to keep track of the request here too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main point is of that callback is to set an exception if one occurs. And if the exception is not null than the test fails.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me

} else {
BytesReference source = XContentBuilder.builder(xContentType.xContent()).startObject().field("id", i).endObject().bytes();
if (opType == DocWriteRequest.OpType.INDEX) {
IndexRequest indexRequest = new IndexRequest("index", "test", id).source(source, xContentType);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pass XContentBuilder in to the source method directly? if we it in all these branches we may not need to call .bytes and save the source to a variable?

assertNull(responseRef.get());
assertNull(requestRef.get());

processor.flush();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why flush and not close? could we even use a a try with resources?

}
}

public static Builder builder(Client client, Listener listener) {
Objects.requireNonNull(client, "client");
Objects.requireNonNull(listener, "listener");

return new Builder(client, listener);
return new Builder(client::bulk, listener, client.settings()).setThreadPool(client.threadPool());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, effectively this change will be backwards compatible for users using this method. Not for users calling the public Builder constructor, but I think that is a good reason to make them switch to this static method.

this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new Flush(), flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS);
BiFunction<TimeValue, Runnable, ScheduledFuture<?>> scheduleFn;
if (threadPool == null) {
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, (name != null ? "[" + name + "]" : "") + "bulk_processor");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would consider taking settings out. We try to extract node.name from it as far as I understand, which I don't think is ever going to be set on a transport client. Maybe we can just live without that part of the thread name and remove some complexity. This may very well come from the times where node client was widely used, hence node.name made sense in the past, but it doesn't anymore.

scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
this.schedulerToStop = scheduledExecutor;
scheduleFn = Retry.fromScheduledExecutorService(scheduledExecutor);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to agree with Simon on requiring a ThreadPool. We can have a helper in the high level client to create one. Otherwise the code here gets too complex and I am not sure it is worth it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what helper we would need...new ThreadPool(settings);?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably new ThreadPool(Settings.EMPTY) :) I wonder if the helper could hide the creation of the thread pool even, maybe not, but we will see that later. As you pointed out it may be that the helper is not needed at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright. I can make a threadpool mandatory for the builder.

@Tim-Brooks
Copy link
Contributor Author

Tim-Brooks commented Mar 30, 2017

Thanks for the reviews. I've made most of the changes from @javanna's review. I got distracted a little today setting up my workstation. But I should finish up the PR tomorrow.

@Tim-Brooks
Copy link
Contributor Author

@javanna I think my updates addressed the issues you raised.

Copy link
Member

@javanna javanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had another look, getting closer but I think we can simplify things further. @s1monw mind having a look too just to make sure we are not the same page?

* The default is to back off exponentially.
*
* @see org.elasticsearch.action.bulk.BackoffPolicy#exponentialBackoff()
* Sets an optional thread pool on which to schedules flush actions and request retries
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it not be required? If so I would move it to the constructor of the Builder and remove the setter for it.

this.client = client;
this.logger = Loggers.getLogger(getClass(), client.settings());
protected final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
protected final BiFunction<TimeValue, Runnable, ScheduledFuture<?>> scheduler;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we make the threadpool mandatory, then this scheduler becomes just ThreadPool?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how we can consider bulk processor decoupled if it needs a ThreadPool.

this.listener = listener;
this.logger = Loggers.getLogger(getClass(), client.settings());
this.logger = Loggers.getLogger(getClass(), settings);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can get rid of settings here too. It's done to prefix the log lines with node.name. Is this still important with transport client / REST client? I think this is almost never set on a client.

Copy link
Contributor Author

@Tim-Brooks Tim-Brooks Mar 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry.withAsyncBackoff is used by AbstractAsyncBulkByScrollAction. I left the Settings at the retry logic since I did not want to change how logging is done for that class. Instead Settings are not required by the BulkProcessor and it just passes Settings.EMPTY to the retry level. Is that not correct? I was just hesitant to change code that will impact other pieces.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nik9000 do you think we can get rid of Settings here or shall we not?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok let's leave this for now. We can address this later. I think it was a good move not to remote settings yet. I will follow-up with Nik on this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, didn't catch this ping. I believe this should be fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tbrooks8 no need to do it as part of this PR, we can make this change later.

AbstractRetryHandler(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy, Client client, ActionListener<BulkResponse> listener) {
RetryHandler(Class<? extends Throwable> retryOnThrowable, BackoffPolicy backoffPolicy,
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, ActionListener<BulkResponse> listener,
Settings settings, BiFunction<TimeValue, Runnable, ScheduledFuture<?>> scheduleFn) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally here you would just need to replace Client with ThreadPool, at least if we can get rid of Settings. Most of these changes should not be needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still pass the settings here as explain in this comment #23373 (comment).

Copy link
Member

@javanna javanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left a few more comments, thanks @tbrooks8

* The default is to back off exponentially.
*
* @see org.elasticsearch.action.bulk.BackoffPolicy#exponentialBackoff()
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are these javadocs gone?

BulkProcessor(Client client, BackoffPolicy backoffPolicy, Listener listener, @Nullable String name, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval) {
BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
@Nullable ThreadPool threadPool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the ThreadPool still be nullable?

this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(client.settings(), (name != null ? "[" + name + "]" : "") + "bulk_processor"));
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(new Flush(), flushInterval.millis(), flushInterval.millis(), TimeUnit.MILLISECONDS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may have forgotten, but what has changed here compared to the startFlush method? We don't call daemonThreadFactory as we dropped the name and settings requirement for logging right? I wonder if we should still call that and just provide the standard "bulk_processor" prefix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past startFlush could use a ScheduledExecutor that we might have created in the BulkProcessor. Now we require a ThreadPool we do not need this code anymore. As we can just use the thread pool to schedule.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this sounds weird, as we were requiring a Client before, which comes with a ThreadPool implicitly. That means this change could be potentially made outside of this PR right (not asking you to move it, just trying to better understand)?

Copy link
Contributor Author

@Tim-Brooks Tim-Brooks Apr 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this change is not dependent on my work. The "flush" task probably could have been scheduled using the client's ThreadPool. But for some reason the BulkProcessor was creating its own for scheduling flush tasks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it thank you.

@@ -334,6 +325,10 @@ private boolean isOverTheLimit() {
return false;
}

private Runnable cancelTask(ThreadPool.Cancellable cancellable) {
return cancellable::cancel;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we fold this method into its only caller above? Also did we change the way we cancel the flush scheduler on close? Weren't we using FutureUtils#cancel before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can fold it in. And we are not using FutureUtils because we are not cancelling a ScheduledFuture anymore. Repeated tasks scheduled on the ES ThreadPool are implement ThreadPool.Cancellable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it thanks.

}

// Start period flushing task after everything is setup
this.cancelTask = startFlush(flushInterval, threadPool);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it hard to read that startFlush returns the task to call when closing the processor. Maybe the whole method can be moved here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if I just can that to this:

ThreadPool.Cancellable cancellableFlushTask = startFlushTask(flushInterval, threadPool);

And then the close() method just calls cancel(). No need for a weird conversion to Runnable.

If there is no flushing I will just return a NoOp ThreadPool.Cancellable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good thanks

return actionFuture.actionGet();
}

static BiFunction<TimeValue, Runnable, ScheduledFuture<?>> fromThreadPool(ThreadPool threadPool) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that we require ThreadPool, I wonder if we still need this method and its caller too. Can't we just require ThreadPool like we used to require the Client before?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like this requires ThreadPool though?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like, only this method requires it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that you left comments based on a previous version of this PR, not quite sure how that happened.

this.listener = listener;
this.logger = Loggers.getLogger(getClass(), client.settings());
this.logger = Loggers.getLogger(getClass(), settings);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nik9000 do you think we can get rid of Settings here or shall we not?

private final Logger logger;
private final Client client;
private final BiFunction<TimeValue, Runnable, ScheduledFuture<?>> scheduleFn;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need the function? after all this handler will depend on the ThreadPool as that is the only way we'll use it. Should we just have a ThreadPool member instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that. I guess the only reason we would not do that is if we wanted Retry to be usable without a ThreadPool. In the current version Retry does not require a ThreadPool. So theoretically, this class could be used with just a lambda. Although Retry is currently only used by the BulkProcessor and AbstractAsyncBulkByScrollAction.

I don't really have a preference. Let me know what you think is best.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would find it more readable at the moment with the hard dependency on ThreadPool, which is what we are going for at the moment, even in client code. We can always change this if we don't want to depend on thread pool later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I'll adjust Retry to depend on ThreadPool.

*/
public void withAsyncBackoff(Client client, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
AsyncRetryHandler r = new AsyncRetryHandler(retryOnThrowable, backoffPolicy, client, listener);
public void withAsyncBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest, ActionListener<BulkResponse> listener, Settings settings) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we rather have ThreadPool as an argument here instead and remove the new on(ThreadPool) method?

Copy link
Contributor Author

@Tim-Brooks Tim-Brooks Apr 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can. Although we do not do that with policy which is also required:

public Retry policy(BackoffPolicy backoffPolicy) {
        this.backoffPolicy = backoffPolicy;
        return this;
    }

Do you just want me to turn Retry into a normal "ctor-using" class?

Retry retry = new Retry(Class<? extends Throwable> retryOnThrowable, BackoffPolicy policy, ThreadPool threadPool, Settings settings);

retry.withAsyncBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest, ActionListener<BulkResponse> listener);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dunno. I don't love this class. I was trying to keep it closer as it used to be, and only replace Client with biconsumer and threadpool. I also don't think it's clear from the method name that withSyncBackoff actually sends the request. But I don't want to change too much as part of this PR either. If you feel like it's better as-is I am fine with it too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I definitely do not like the current distinction between withSyncBackoff and withASyncBackoff . I have a separate PR that I have been working on locally to simplify the Retry class and its relationship with the BulkProcessor. My plan was to submit that PR once this is merged.

I guess it is just a question of what you think for this PR. If you think constructing Retry is out of scope for this, I can consider changes in a follow up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool then leave what you have, looking forward to your followup PR

return () -> {};
}

ThreadPool.Cancellable cancellable = threadPool.scheduleWithFixedDelay(new Flush(), flushInterval, ThreadPool.Names.SAME);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure threadPoll can be nullable if you use it here?

this.client = client;
this.logger = Loggers.getLogger(getClass(), client.settings());
protected final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
protected final BiFunction<TimeValue, Runnable, ScheduledFuture<?>> scheduler;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how we can consider bulk processor decoupled if it needs a ThreadPool.

.on(EsRejectedExecutionException.class)
.policy(backoffPolicy)
.withSyncBackoff(client, bulkRequest);
.on(EsRejectedExecutionException.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you not do the formatting changes in future? Personally I like the 8 spaces, but regardless I think it makes the change harder to review.

Copy link
Member

@javanna javanna Apr 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how we can consider bulk processor decoupled if it needs a ThreadPool.

Requiring a ThreadPool is the way forward that we chose after a few attempts. We will require a ThreadPool in the high level client too. It is simple enough to create. We will see what else can be done to make this better. The bulk processor is decoupled meaning that it doesn't require a Client anymore.

Copy link
Member

@javanna javanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left a couple of questions and thoughts, but LGTM.

validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);

threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.SECONDS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can replace these two lines with a call to ThreadPool.terminate

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even better EsTestCase#terminate

try(BulkProcessor processor = new BulkProcessor.Builder(highLevelClient()::bulkAsync, listener, threadPool)
.setConcurrentRequests(0)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.GB))
.setBulkActions(nbItems + 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it matter that the bulk processor flushes all the docs at once on closing? can't we rely on the fact that after close everything is flushed, but it doesn't really matter how many bulk requests are sent? Is it because it makes bulk response validation more complicated as we'd have to accumulate the responses gotten from the listener?

};
}

return threadPool.scheduleWithFixedDelay(new Flush(), flushInterval, ThreadPool.Names.SAME);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is SAME thread pool ok here? This way, can other operations interfere with periodic bulk flushing if enabled?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine the assumption is that flush is pretty fast?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fast as it just calls the Consumer which will likely dispatch to whatever threading model the client is using. But I do feel "same" is kind of weird as I assume that is thread that created the BulkProcessor. Should this be generic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible that the flush could block as a call to semaphore.acquire(); is made. And it is possible that user initiated flushes are happening.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think SAME is fine. We can totally go on the save side and just use GENERIC too. I don't think it's critical so lets just play safe and use GENERIC?

this.listener = listener;
this.logger = Loggers.getLogger(getClass(), client.settings());
this.logger = Loggers.getLogger(getClass(), settings);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok let's leave this for now. We can address this later. I think it was a good move not to remote settings yet. I will follow-up with Nik on this.

/**
* Sets an optional name to identify this bulk processor.
*/
public Builder setName(String name) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the record, I think it is ok to remove setName here. the name was only used to have slightly different logging for the flush daemon thread. We can obtain the same now that we use the thread pool in the same way we do everywhere in this class, through node.name in the settings.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM thanks for all the iterations


@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
error.set(failure);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me

/**
* Sets an optional name to identify this bulk processor.
*/
public Builder setName(String name) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

};
}

return threadPool.scheduleWithFixedDelay(new Flush(), flushInterval, ThreadPool.Names.SAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine the assumption is that flush is pretty fast?

Copy link
Member

@javanna javanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still LGTM thanks @tbrooks8

@Tim-Brooks Tim-Brooks merged commit 5b1fbe5 into elastic:master Apr 5, 2017
@Tim-Brooks Tim-Brooks deleted the bulk_processor_method_reference branch November 14, 2018 14:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants