Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Protect window operator by circuit breaker #1006

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public PhysicalPlan visitRename(RenameOperator node, Object context) {
*/
@Override
public PhysicalPlan visitTableScan(TableScanOperator node, Object context) {
return new ResourceMonitorPlan(node, resourceMonitor);
return doProtect(node);
}

@Override
Expand Down Expand Up @@ -111,10 +111,14 @@ public PhysicalPlan visitHead(HeadOperator node, Object context) {
);
}

/**
* Decorate input node with {@link ResourceMonitorPlan} to avoid aggregate
* window function pre-loads too many data into memory in worst case.
*/
@Override
public PhysicalPlan visitWindow(WindowOperator node, Object context) {
return new WindowOperator(
visitInput(node.getInput(), context),
doProtect(visitInput(node.getInput(), context)),
node.getWindowFunction(),
node.getWindowDefinition());
}
Expand All @@ -124,11 +128,10 @@ public PhysicalPlan visitWindow(WindowOperator node, Object context) {
*/
@Override
public PhysicalPlan visitSort(SortOperator node, Object context) {
return new ResourceMonitorPlan(
return doProtect(
new SortOperator(
visitInput(node.getInput(), context),
node.getSortList()),
resourceMonitor);
node.getSortList()));
}

/**
Expand All @@ -155,4 +158,16 @@ PhysicalPlan visitInput(PhysicalPlan node, Object context) {
return node.accept(this, context);
}
}

private PhysicalPlan doProtect(PhysicalPlan node) {
if (isProtected(node)) {
return node;
}
return new ResourceMonitorPlan(node, resourceMonitor);
}

private boolean isProtected(PhysicalPlan node) {
return (node instanceof ResourceMonitorPlan);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
@RequiredArgsConstructor
@EqualsAndHashCode
public class ResourceMonitorPlan extends PhysicalPlan {

/**
* How many method calls to delegate's next() to perform resource check once.
*/
public static final long NUMBER_OF_NEXT_CALL_TO_CHECK = 1000;

/**
* Delegated PhysicalPlan.
*/
Expand All @@ -44,6 +50,13 @@ public class ResourceMonitorPlan extends PhysicalPlan {
@ToString.Exclude
private final ResourceMonitor monitor;

/**
* Count how many calls to delegate's next() already.
*/
@EqualsAndHashCode.Exclude
private long nextCallCount = 0L;


@Override
public <R, C> R accept(PhysicalPlanNodeVisitor<R, C> visitor, C context) {
return delegate.accept(visitor, context);
Expand Down Expand Up @@ -74,6 +87,10 @@ public boolean hasNext() {

@Override
public ExprValue next() {
boolean shouldCheck = (++nextCallCount % NUMBER_OF_NEXT_CALL_TO_CHECK == 0);
if (shouldCheck && !this.monitor.isHealthy()) {
throw new IllegalStateException("resource is not enough to load next row, quit.");
}
return delegate.next();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.AvgAggregator;
import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.NamedAggregator;
import com.amazon.opendistroforelasticsearch.sql.expression.window.WindowDefinition;
import com.amazon.opendistroforelasticsearch.sql.expression.window.aggregation.AggregateWindowFunction;
import com.amazon.opendistroforelasticsearch.sql.expression.window.ranking.RankFunction;
import com.amazon.opendistroforelasticsearch.sql.monitor.ResourceMonitor;
import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlan;
Expand Down Expand Up @@ -213,6 +214,50 @@ public void testProtectSortForWindowOperator() {
windowDefinition)));
}

@Test
public void testProtectWindowOperatorInput() {
NamedExpression avg = named(mock(AggregateWindowFunction.class));
WindowDefinition windowDefinition = mock(WindowDefinition.class);

assertEquals(
window(
resourceMonitor(
values()),
avg,
windowDefinition),
executionProtector.protect(
window(
values(),
avg,
windowDefinition)));
}

@SuppressWarnings("unchecked")
@Test
public void testNotProtectWindowOperatorInputIfAlreadyProtected() {
NamedExpression avg = named(mock(AggregateWindowFunction.class));
Pair<Sort.SortOption, Expression> sortItem =
ImmutablePair.of(DEFAULT_ASC, DSL.ref("age", INTEGER));
WindowDefinition windowDefinition =
new WindowDefinition(emptyList(), ImmutableList.of(sortItem));

assertEquals(
window(
resourceMonitor(
sort(
values(emptyList()),
sortItem)),
avg,
windowDefinition),
executionProtector.protect(
window(
sort(
values(emptyList()),
sortItem),
avg,
windowDefinition)));
}

@Test
public void testWithoutProtection() {
Expression filterExpr = literal(ExprBooleanValue.of(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,26 @@ void openSuccess() {

@Test
void nextSuccess() {
monitorPlan.next();
verify(plan, times(1)).next();
when(resourceMonitor.isHealthy()).thenReturn(true);

for (int i = 1; i <= 1000; i++) {
monitorPlan.next();
}
verify(resourceMonitor, times(1)).isHealthy();
verify(plan, times(1000)).next();
}

@Test
void nextExceedResourceLimit() {
when(resourceMonitor.isHealthy()).thenReturn(false);

for (int i = 1; i < 1000; i++) {
monitorPlan.next();
}

IllegalStateException exception =
assertThrows(IllegalStateException.class, () -> monitorPlan.next());
assertEquals("resource is not enough to load next row, quit.", exception.getMessage());
}

@Test
Expand Down