Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Oct 14, 2024
1 parent e649ffb commit e9f1723
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -93,9 +94,7 @@ public void onFailure(Exception exception) {
}

latch.await();
if (failures.isEmpty() == false) {
fail(failures.getFirst());
}
assertThat(failures, empty());
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class TransportInternalInferModelAction extends HandledTransportAction<Re
private final XPackLicenseState licenseState;
private final TrainedModelProvider trainedModelProvider;
private final AdaptiveAllocationsScalerService adaptiveAllocationsScalerService;
private final InferenceWaitForAllocation scalingInference;
private final InferenceWaitForAllocation waitForAllocation;
private final ThreadPool threadPool;

TransportInternalInferModelAction(
Expand All @@ -94,7 +94,7 @@ public class TransportInternalInferModelAction extends HandledTransportAction<Re
this.licenseState = licenseState;
this.trainedModelProvider = trainedModelProvider;
this.adaptiveAllocationsScalerService = adaptiveAllocationsScalerService;
this.scalingInference = new InferenceWaitForAllocation(assignmentService, this::inferOnBlockedRequest);
this.waitForAllocation = new InferenceWaitForAllocation(assignmentService, this::inferOnBlockedRequest);
this.threadPool = threadPool;
}

Expand Down Expand Up @@ -280,7 +280,7 @@ private void inferAgainstAllocatedModel(
if (starting) {
message += "; starting deployment of one allocation";
logger.info(message);
scalingInference.waitForAssignment(
waitForAllocation.waitForAssignment(
new InferenceWaitForAllocation.WaitingRequest(request, responseBuilder, parentTaskId, listener)
);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public InferenceWaitForAllocation(
*/
public synchronized void waitForAssignment(WaitingRequest request) {
logger.info("waitForAssignment will wait for condition");
if (pendingRequestCount.get() > MAX_PENDING_REQUEST_COUNT) {
if (pendingRequestCount.incrementAndGet() >= MAX_PENDING_REQUEST_COUNT) {
pendingRequestCount.decrementAndGet();
request.listener.onFailure(
new ElasticsearchStatusException(
"Rejected inference request waiting for an allocation of deployment [{}]. Too many pending requests",
Expand All @@ -96,7 +97,6 @@ public synchronized void waitForAssignment(WaitingRequest request) {
return;
}

pendingRequestCount.incrementAndGet();
var predicate = new DeploymentHasAtLeastOneAllocation(request.deploymentId());

assignmentService.waitForAssignmentCondition(
Expand Down

0 comments on commit e9f1723

Please sign in to comment.