From 19c6d41b0c0383dbdccc76601e826cd09e73682b Mon Sep 17 00:00:00 2001 From: Hojjat Jafarpour Date: Tue, 14 Nov 2017 11:59:23 -0800 Subject: [PATCH] KSQL 420 join criteria validation (#447) * Added checks for join criteria. * Applied review feedback. Better error messages for join criteria check. * Added more comments for new methods. --- .../io/confluent/ksql/analyzer/Analyzer.java | 98 +++++++++++++++---- .../ksql/analyzer/QueryAnalyzerTest.java | 27 +++++ .../integration/IntegrationTestHarness.java | 1 - .../ksql/integration/JoinIntTest.java | 2 +- .../ksql/parser/tree/NodeLocation.java | 5 + 5 files changed, 111 insertions(+), 22 deletions(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java index 4cac62fbab67..d487d001d285 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java @@ -54,6 +54,8 @@ import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.Pair; +import io.confluent.ksql.util.SchemaUtil; + import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; @@ -201,16 +203,7 @@ protected Node visitJoin(final Join node, final AnalysisContext context) { String leftAlias = left.getAlias(); String rightAlias = right.getAlias(); - StructuredDataSourceNode - leftSourceKafkaTopicNode = - new StructuredDataSourceNode(new PlanNodeId("KafkaTopic_Left"), - leftDataSource, - leftDataSource.getSchema()); - StructuredDataSourceNode - rightSourceKafkaTopicNode = - new StructuredDataSourceNode(new PlanNodeId("KafkaTopic_Right"), - rightDataSource, - rightDataSource.getSchema()); + JoinNode.Type joinType; switch (node.getType()) { @@ -233,16 +226,37 @@ protected Node visitJoin(final Join node, final AnalysisContext context) { throw new KsqlException("Join type is not supported: " + node.getType().name()); } + if (!node.getCriteria().isPresent()) { + throw new KsqlException(String.format("%s Join criteria is not set.", + node.getLocation().isPresent()? node.getLocation() + .get().toString(): "")); + } JoinOn joinOn = (JoinOn) (node.getCriteria().get()); ComparisonExpression comparisonExpression = (ComparisonExpression) joinOn.getExpression(); - String leftKeyFieldName = fetchKeyFieldName(comparisonExpression.getLeft()); - String rightKeyFieldName = fetchKeyFieldName(comparisonExpression.getRight()); + Pair leftSide = fetchKeyFieldName(comparisonExpression, leftAlias, + leftDataSource.getSchema()); + Pair rightSide = fetchKeyFieldName(comparisonExpression, rightAlias, + rightDataSource.getSchema()); + + String leftKeyFieldName = leftSide.getRight(); + String rightKeyFieldName = rightSide.getRight(); if (comparisonExpression.getType() != ComparisonExpression.Type.EQUAL) { - throw new KsqlException("Join criteria is not supported."); + throw new KsqlException("Only equality join criteria is supported."); } + StructuredDataSourceNode + leftSourceKafkaTopicNode = + new StructuredDataSourceNode(new PlanNodeId("KafkaTopic_Left"), + leftDataSource, + leftDataSource.getSchema()); + StructuredDataSourceNode + rightSourceKafkaTopicNode = + new StructuredDataSourceNode(new PlanNodeId("KafkaTopic_Right"), + rightDataSource, + rightDataSource.getSchema()); + JoinNode joinNode = new JoinNode(new PlanNodeId("Join"), joinType, leftSourceKafkaTopicNode, rightSourceKafkaTopicNode, leftKeyFieldName, rightKeyFieldName, leftAlias, @@ -251,20 +265,64 @@ protected Node visitJoin(final Join node, final AnalysisContext context) { return null; } - private String fetchKeyFieldName(Expression expression) { + /** + * From the join criteria expression fetch the key field corresponding to the given source + * alias. + * + * @param comparisonExpression + * @param sourceAlias + * @param sourceSchema + * @return + */ + private Pair fetchKeyFieldName(ComparisonExpression comparisonExpression, String sourceAlias, Schema + sourceSchema) { + Pair keyInfo = fetchKeyFieldNameFromExpr(comparisonExpression.getLeft(), + sourceAlias, sourceSchema); + if (keyInfo == null) { + keyInfo = fetchKeyFieldNameFromExpr(comparisonExpression.getRight(), sourceAlias, sourceSchema); + } + if (keyInfo == null) { + throw new KsqlException(String.format("%s : Invalid join criteria %s. Key for %s is not set" + + " correctly." + + " ", comparisonExpression + .getLocation().isPresent()? comparisonExpression + .getLocation().get().toString(): "", comparisonExpression, sourceAlias)); + } + return keyInfo; + } + + /** + * Given an expression and the source alias detects if the expression type is DereferenceExpression + * or QualifiedNameReference and if the variable prefix matches the source Alias. + * + * @param expression + * @param sourceAlias + * @param sourceSchema + * @return + */ + private Pair fetchKeyFieldNameFromExpr(Expression expression, String sourceAlias, + Schema sourceSchema) { if (expression instanceof DereferenceExpression) { DereferenceExpression - leftDereferenceExpression = + dereferenceExpression = (DereferenceExpression) expression; - return leftDereferenceExpression.getFieldName(); + String sourceAliasVal = dereferenceExpression.getBase().toString(); + if (sourceAliasVal.equalsIgnoreCase(sourceAlias)) { + String keyFieldName = dereferenceExpression.getFieldName(); + if (SchemaUtil.getFieldByName(sourceSchema, keyFieldName).isPresent()) { + return new Pair<>(sourceAliasVal, keyFieldName); + } + } } else if (expression instanceof QualifiedNameReference) { QualifiedNameReference - leftQualifiedNameReference = + qualifiedNameReference = (QualifiedNameReference) expression; - return leftQualifiedNameReference.getName().getSuffix(); - } else { - throw new KsqlException("Join criteria is not supported. Expression:" + expression); + String keyFieldName = qualifiedNameReference.getName().getSuffix(); + if (SchemaUtil.getFieldByName(sourceSchema, keyFieldName).isPresent()) { + return new Pair<>(sourceAlias, keyFieldName); + } } + return null; } private StructuredDataSource timestampColumn(AliasedRelation aliasedRelation, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerTest.java index e9011d2f7bc1..7a0302daa488 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/analyzer/QueryAnalyzerTest.java @@ -45,6 +45,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class QueryAnalyzerTest { @@ -134,4 +135,30 @@ public void shouldProcessHavingExpression() { new QualifiedNameReference(QualifiedName.of("KSQL_AGG_VARIABLE_1")), new LongLiteral("10")))); } + + @Test + public void shouldFailWithIncorrectJoinCriteria() { + final List statements = ksqlParser.buildAst( + "select * from test1 join test2 on test1.col1 = test2.coll;", + metaStore); + final Query query = (Query) statements.get(0); + try { + queryAnalyzer.analyize(query); + } catch (KsqlException ex) { + assertThat(ex.getMessage().trim(), equalTo("Line: 1, Col: 46 : Invalid join criteria (TEST1.COL1 = TEST2.COLL). Key for TEST2 is not set correctly.")); + } + } + + @Test + public void shouldPassJoinWithAnyCriteriaOrder() { + final List statements = ksqlParser.buildAst( + "select * from test1 left join test2 on test2.col2 = test1.col1;", + metaStore); + final Query query = (Query) statements.get(0); + final Analysis analysis = queryAnalyzer.analyize(query); + assertTrue(analysis.getJoin().isLeftJoin()); + assertThat(analysis.getJoin().getLeftKeyFieldName(), equalTo("COL1")); + assertThat(analysis.getJoin().getRightKeyFieldName(), equalTo("COL2")); + } + } \ No newline at end of file diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java index 1fd9ab0e812f..c5274167cbb9 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java @@ -70,7 +70,6 @@ public void createTopic(String topicName, int numPartitions, short replicatonFac * Topic topicName will be automatically created if it doesn't exist. * @param topicName * @param recordsToPublish - * @param schema * @param timestamp * @return * @throws InterruptedException diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/JoinIntTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/JoinIntTest.java index c13d49a055fe..00e0081ba6c8 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/JoinIntTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/JoinIntTest.java @@ -74,7 +74,7 @@ public void shouldLeftJoinOrderAndItems() throws Exception { final String queryString = String.format( "CREATE STREAM %s AS SELECT ORDERID, ITEMID, ORDERUNITS, DESCRIPTION FROM orders LEFT JOIN items " + - " on orders.ITEMID = item.ID WHERE orders.ITEMID = 'ITEM_1' ;", + " on orders.ITEMID = items.ID WHERE orders.ITEMID = 'ITEM_1' ;", testStreamName ); diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/NodeLocation.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/NodeLocation.java index 5d842e091ede..adc7248a4732 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/NodeLocation.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/NodeLocation.java @@ -33,4 +33,9 @@ public int getLineNumber() { public int getColumnNumber() { return charPositionInLine + 1; } + + @Override + public String toString() { + return String.format(" Line: %d, Col: %d", line, charPositionInLine + 1); + } }