From 131525728a192510b120cfb626b6929b1cdec705 Mon Sep 17 00:00:00 2001 From: Dai Date: Mon, 25 Jan 2021 15:21:35 -0800 Subject: [PATCH 1/2] Change protector and add UT --- .../ElasticsearchExecutionProtector.java | 25 ++++++++--- .../ElasticsearchExecutionProtectorTest.java | 45 +++++++++++++++++++ 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/ElasticsearchExecutionProtector.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/ElasticsearchExecutionProtector.java index bec7d3150d..b017839616 100644 --- a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/ElasticsearchExecutionProtector.java +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/ElasticsearchExecutionProtector.java @@ -77,7 +77,7 @@ public PhysicalPlan visitRename(RenameOperator node, Object context) { */ @Override public PhysicalPlan visitTableScan(TableScanOperator node, Object context) { - return new ResourceMonitorPlan(node, resourceMonitor); + return doProtect(node); } @Override @@ -111,10 +111,14 @@ public PhysicalPlan visitHead(HeadOperator node, Object context) { ); } + /** + * Decorate input node with {@link ResourceMonitorPlan} to avoid aggregate + * window function pre-loads too many data into memory in worst case. + */ @Override public PhysicalPlan visitWindow(WindowOperator node, Object context) { return new WindowOperator( - visitInput(node.getInput(), context), + doProtect(visitInput(node.getInput(), context)), node.getWindowFunction(), node.getWindowDefinition()); } @@ -124,11 +128,10 @@ public PhysicalPlan visitWindow(WindowOperator node, Object context) { */ @Override public PhysicalPlan visitSort(SortOperator node, Object context) { - return new ResourceMonitorPlan( + return doProtect( new SortOperator( visitInput(node.getInput(), context), - node.getSortList()), - resourceMonitor); + node.getSortList())); } /** @@ -155,4 +158,16 @@ PhysicalPlan visitInput(PhysicalPlan node, Object context) { return node.accept(this, context); } } + + private PhysicalPlan doProtect(PhysicalPlan node) { + if (isProtected(node)) { + return node; + } + return new ResourceMonitorPlan(node, resourceMonitor); + } + + private boolean isProtected(PhysicalPlan node) { + return (node instanceof ResourceMonitorPlan); + } + } diff --git a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ElasticsearchExecutionProtectorTest.java b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ElasticsearchExecutionProtectorTest.java index 03af6f5641..0a38cc5740 100644 --- a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ElasticsearchExecutionProtectorTest.java +++ b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ElasticsearchExecutionProtectorTest.java @@ -50,6 +50,7 @@ import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.AvgAggregator; import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.NamedAggregator; import com.amazon.opendistroforelasticsearch.sql.expression.window.WindowDefinition; +import com.amazon.opendistroforelasticsearch.sql.expression.window.aggregation.AggregateWindowFunction; import com.amazon.opendistroforelasticsearch.sql.expression.window.ranking.RankFunction; import com.amazon.opendistroforelasticsearch.sql.monitor.ResourceMonitor; import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlan; @@ -213,6 +214,50 @@ public void testProtectSortForWindowOperator() { windowDefinition))); } + @Test + public void testProtectWindowOperatorInput() { + NamedExpression avg = named(mock(AggregateWindowFunction.class)); + WindowDefinition windowDefinition = mock(WindowDefinition.class); + + assertEquals( + window( + resourceMonitor( + values()), + avg, + windowDefinition), + executionProtector.protect( + window( + values(), + avg, + windowDefinition))); + } + + @SuppressWarnings("unchecked") + @Test + public void testNotProtectWindowOperatorInputIfAlreadyProtected() { + NamedExpression avg = named(mock(AggregateWindowFunction.class)); + Pair sortItem = + ImmutablePair.of(DEFAULT_ASC, DSL.ref("age", INTEGER)); + WindowDefinition windowDefinition = + new WindowDefinition(emptyList(), ImmutableList.of(sortItem)); + + assertEquals( + window( + resourceMonitor( + sort( + values(emptyList()), + sortItem)), + avg, + windowDefinition), + executionProtector.protect( + window( + sort( + values(emptyList()), + sortItem), + avg, + windowDefinition))); + } + @Test public void testWithoutProtection() { Expression filterExpr = literal(ExprBooleanValue.of(true)); From 8e2a209847c346bb4657772a07d79bda25965906 Mon Sep 17 00:00:00 2001 From: Dai Date: Mon, 25 Jan 2021 16:04:11 -0800 Subject: [PATCH 2/2] Check resource on every 1000 calls --- .../protector/ResourceMonitorPlan.java | 17 ++++++++++++++ .../executor/ResourceMonitorPlanTest.java | 22 +++++++++++++++++-- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/ResourceMonitorPlan.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/ResourceMonitorPlan.java index 0225a46974..112e8dd2c3 100644 --- a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/ResourceMonitorPlan.java +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/protector/ResourceMonitorPlan.java @@ -33,6 +33,12 @@ @RequiredArgsConstructor @EqualsAndHashCode public class ResourceMonitorPlan extends PhysicalPlan { + + /** + * How many method calls to delegate's next() to perform resource check once. + */ + public static final long NUMBER_OF_NEXT_CALL_TO_CHECK = 1000; + /** * Delegated PhysicalPlan. */ @@ -44,6 +50,13 @@ public class ResourceMonitorPlan extends PhysicalPlan { @ToString.Exclude private final ResourceMonitor monitor; + /** + * Count how many calls to delegate's next() already. + */ + @EqualsAndHashCode.Exclude + private long nextCallCount = 0L; + + @Override public R accept(PhysicalPlanNodeVisitor visitor, C context) { return delegate.accept(visitor, context); @@ -74,6 +87,10 @@ public boolean hasNext() { @Override public ExprValue next() { + boolean shouldCheck = (++nextCallCount % NUMBER_OF_NEXT_CALL_TO_CHECK == 0); + if (shouldCheck && !this.monitor.isHealthy()) { + throw new IllegalStateException("resource is not enough to load next row, quit."); + } return delegate.next(); } } diff --git a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ResourceMonitorPlanTest.java b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ResourceMonitorPlanTest.java index de2e0d157b..c1fb77fe40 100644 --- a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ResourceMonitorPlanTest.java +++ b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/executor/ResourceMonitorPlanTest.java @@ -73,8 +73,26 @@ void openSuccess() { @Test void nextSuccess() { - monitorPlan.next(); - verify(plan, times(1)).next(); + when(resourceMonitor.isHealthy()).thenReturn(true); + + for (int i = 1; i <= 1000; i++) { + monitorPlan.next(); + } + verify(resourceMonitor, times(1)).isHealthy(); + verify(plan, times(1000)).next(); + } + + @Test + void nextExceedResourceLimit() { + when(resourceMonitor.isHealthy()).thenReturn(false); + + for (int i = 1; i < 1000; i++) { + monitorPlan.next(); + } + + IllegalStateException exception = + assertThrows(IllegalStateException.class, () -> monitorPlan.next()); + assertEquals("resource is not enough to load next row, quit.", exception.getMessage()); } @Test