Skip to content

Commit

Permalink
KSQL 420 join criteria validation (#447)
Browse files Browse the repository at this point in the history
* Added checks for join criteria.

* Applied review feedback.
Better error messages for join criteria check.

* Added more comments for new methods.
  • Loading branch information
hjafarpour authored Nov 14, 2017
1 parent b37f144 commit 19c6d41
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 22 deletions.
98 changes: 78 additions & 20 deletions ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.SchemaUtil;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;

Expand Down Expand Up @@ -201,16 +203,7 @@ protected Node visitJoin(final Join node, final AnalysisContext context) {

String leftAlias = left.getAlias();
String rightAlias = right.getAlias();
StructuredDataSourceNode
leftSourceKafkaTopicNode =
new StructuredDataSourceNode(new PlanNodeId("KafkaTopic_Left"),
leftDataSource,
leftDataSource.getSchema());
StructuredDataSourceNode
rightSourceKafkaTopicNode =
new StructuredDataSourceNode(new PlanNodeId("KafkaTopic_Right"),
rightDataSource,
rightDataSource.getSchema());


JoinNode.Type joinType;
switch (node.getType()) {
Expand All @@ -233,16 +226,37 @@ protected Node visitJoin(final Join node, final AnalysisContext context) {
throw new KsqlException("Join type is not supported: " + node.getType().name());
}

if (!node.getCriteria().isPresent()) {
throw new KsqlException(String.format("%s Join criteria is not set.",
node.getLocation().isPresent()? node.getLocation()
.get().toString(): ""));
}
JoinOn joinOn = (JoinOn) (node.getCriteria().get());
ComparisonExpression comparisonExpression = (ComparisonExpression) joinOn.getExpression();

String leftKeyFieldName = fetchKeyFieldName(comparisonExpression.getLeft());
String rightKeyFieldName = fetchKeyFieldName(comparisonExpression.getRight());
Pair<String, String> leftSide = fetchKeyFieldName(comparisonExpression, leftAlias,
leftDataSource.getSchema());
Pair<String, String> rightSide = fetchKeyFieldName(comparisonExpression, rightAlias,
rightDataSource.getSchema());

String leftKeyFieldName = leftSide.getRight();
String rightKeyFieldName = rightSide.getRight();

if (comparisonExpression.getType() != ComparisonExpression.Type.EQUAL) {
throw new KsqlException("Join criteria is not supported.");
throw new KsqlException("Only equality join criteria is supported.");
}

StructuredDataSourceNode
leftSourceKafkaTopicNode =
new StructuredDataSourceNode(new PlanNodeId("KafkaTopic_Left"),
leftDataSource,
leftDataSource.getSchema());
StructuredDataSourceNode
rightSourceKafkaTopicNode =
new StructuredDataSourceNode(new PlanNodeId("KafkaTopic_Right"),
rightDataSource,
rightDataSource.getSchema());

JoinNode joinNode =
new JoinNode(new PlanNodeId("Join"), joinType, leftSourceKafkaTopicNode,
rightSourceKafkaTopicNode, leftKeyFieldName, rightKeyFieldName, leftAlias,
Expand All @@ -251,20 +265,64 @@ protected Node visitJoin(final Join node, final AnalysisContext context) {
return null;
}

private String fetchKeyFieldName(Expression expression) {
/**
* From the join criteria expression fetch the key field corresponding to the given source
* alias.
*
* @param comparisonExpression
* @param sourceAlias
* @param sourceSchema
* @return
*/
private Pair<String, String> fetchKeyFieldName(ComparisonExpression comparisonExpression, String sourceAlias, Schema
sourceSchema) {
Pair<String, String> keyInfo = fetchKeyFieldNameFromExpr(comparisonExpression.getLeft(),
sourceAlias, sourceSchema);
if (keyInfo == null) {
keyInfo = fetchKeyFieldNameFromExpr(comparisonExpression.getRight(), sourceAlias, sourceSchema);
}
if (keyInfo == null) {
throw new KsqlException(String.format("%s : Invalid join criteria %s. Key for %s is not set"
+ " correctly."
+ " ", comparisonExpression
.getLocation().isPresent()? comparisonExpression
.getLocation().get().toString(): "", comparisonExpression, sourceAlias));
}
return keyInfo;
}

/**
* Given an expression and the source alias detects if the expression type is DereferenceExpression
* or QualifiedNameReference and if the variable prefix matches the source Alias.
*
* @param expression
* @param sourceAlias
* @param sourceSchema
* @return
*/
private Pair<String, String> fetchKeyFieldNameFromExpr(Expression expression, String sourceAlias,
Schema sourceSchema) {
if (expression instanceof DereferenceExpression) {
DereferenceExpression
leftDereferenceExpression =
dereferenceExpression =
(DereferenceExpression) expression;
return leftDereferenceExpression.getFieldName();
String sourceAliasVal = dereferenceExpression.getBase().toString();
if (sourceAliasVal.equalsIgnoreCase(sourceAlias)) {
String keyFieldName = dereferenceExpression.getFieldName();
if (SchemaUtil.getFieldByName(sourceSchema, keyFieldName).isPresent()) {
return new Pair<>(sourceAliasVal, keyFieldName);
}
}
} else if (expression instanceof QualifiedNameReference) {
QualifiedNameReference
leftQualifiedNameReference =
qualifiedNameReference =
(QualifiedNameReference) expression;
return leftQualifiedNameReference.getName().getSuffix();
} else {
throw new KsqlException("Join criteria is not supported. Expression:" + expression);
String keyFieldName = qualifiedNameReference.getName().getSuffix();
if (SchemaUtil.getFieldByName(sourceSchema, keyFieldName).isPresent()) {
return new Pair<>(sourceAlias, keyFieldName);
}
}
return null;
}

private StructuredDataSource timestampColumn(AliasedRelation aliasedRelation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class QueryAnalyzerTest {
Expand Down Expand Up @@ -134,4 +135,30 @@ public void shouldProcessHavingExpression() {
new QualifiedNameReference(QualifiedName.of("KSQL_AGG_VARIABLE_1")),
new LongLiteral("10"))));
}

@Test
public void shouldFailWithIncorrectJoinCriteria() {
final List<Statement> statements = ksqlParser.buildAst(
"select * from test1 join test2 on test1.col1 = test2.coll;",
metaStore);
final Query query = (Query) statements.get(0);
try {
queryAnalyzer.analyize(query);
} catch (KsqlException ex) {
assertThat(ex.getMessage().trim(), equalTo("Line: 1, Col: 46 : Invalid join criteria (TEST1.COL1 = TEST2.COLL). Key for TEST2 is not set correctly."));
}
}

@Test
public void shouldPassJoinWithAnyCriteriaOrder() {
final List<Statement> statements = ksqlParser.buildAst(
"select * from test1 left join test2 on test2.col2 = test1.col1;",
metaStore);
final Query query = (Query) statements.get(0);
final Analysis analysis = queryAnalyzer.analyize(query);
assertTrue(analysis.getJoin().isLeftJoin());
assertThat(analysis.getJoin().getLeftKeyFieldName(), equalTo("COL1"));
assertThat(analysis.getJoin().getRightKeyFieldName(), equalTo("COL2"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public void createTopic(String topicName, int numPartitions, short replicatonFac
* Topic topicName will be automatically created if it doesn't exist.
* @param topicName
* @param recordsToPublish
* @param schema
* @param timestamp
* @return
* @throws InterruptedException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void shouldLeftJoinOrderAndItems() throws Exception {

final String queryString = String.format(
"CREATE STREAM %s AS SELECT ORDERID, ITEMID, ORDERUNITS, DESCRIPTION FROM orders LEFT JOIN items " +
" on orders.ITEMID = item.ID WHERE orders.ITEMID = 'ITEM_1' ;",
" on orders.ITEMID = items.ID WHERE orders.ITEMID = 'ITEM_1' ;",
testStreamName
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,9 @@ public int getLineNumber() {
public int getColumnNumber() {
return charPositionInLine + 1;
}

@Override
public String toString() {
return String.format(" Line: %d, Col: %d", line, charPositionInLine + 1);
}
}

0 comments on commit 19c6d41

Please sign in to comment.