Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ksql 386 ksql functions non static #370

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;

import org.apache.kafka.clients.admin.AdminClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,7 +44,7 @@ public class StandaloneExecutor {

public StandaloneExecutor(Map streamProperties) {
KsqlConfig ksqlConfig = new KsqlConfig(streamProperties);
ksqlEngine = new KsqlEngine(ksqlConfig, new KafkaTopicClientImpl(ksqlConfig.getKsqlAdminClientConfigProps()));
ksqlEngine = new KsqlEngine(ksqlConfig, new KafkaTopicClientImpl(AdminClient.create(ksqlConfig.getKsqlAdminClientConfigProps())));
}

public void executeStatements(String queries) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public class KsqlConfig extends AbstractConfig {
public static final String DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION =
"ksql.sink.window.change.log.additional.retention.default";

public static final String STREAM_INTERNAL_CHANGELOG_TOPIC_SUFFIX = "-changelog";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these need to be public? Looks like they could be package protected and don't need to be known by a user?


public static final String STREAM_INTERNAL_REPARTITION_TOPIC_SUFFIX = "-repartition";

public static final String FAIL_ON_DESERIALIZATION_ERROR_CONFIG = "fail.on.deserialization.error";

public static final String
Expand Down
30 changes: 25 additions & 5 deletions ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package io.confluent.ksql;

import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KafkaTopicClientImpl;
import io.confluent.ksql.util.KsqlConfig;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,9 +41,19 @@ public class KsqlContext {
private final KsqlEngine ksqlEngine;
private static final String APPLICATION_ID_OPTION_DEFAULT = "ksql_standalone_cli";
private static final String KAFKA_BOOTSTRAP_SERVER_OPTION_DEFAULT = "localhost:9092";
private final KafkaTopicClientImpl topicClient;
private final AdminClient adminClient;
private final KafkaTopicClient topicClient;


public static KsqlContext create() {
return new KsqlContext();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delegation, i.e, create(Collections.emptyMap())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, made the changes.

}

public KsqlContext() {
public static KsqlContext create(Map<String, Object> streamsProperties) {
return new KsqlContext(streamsProperties);
}

private KsqlContext() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

this(null);
}

Expand All @@ -50,7 +63,7 @@ public KsqlContext() {
*
* @param streamsProperties
*/
public KsqlContext(Map<String, Object> streamsProperties) {
private KsqlContext(Map<String, Object> streamsProperties) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put the logic in here into the create(Map) method and have it call the new constructor below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

if (streamsProperties == null) {
streamsProperties = new HashMap<>();
}
Expand All @@ -61,11 +74,17 @@ public KsqlContext(Map<String, Object> streamsProperties) {
streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVER_OPTION_DEFAULT);
}
KsqlConfig ksqlConfig = new KsqlConfig(streamsProperties);

topicClient = new KafkaTopicClientImpl(ksqlConfig.getKsqlAdminClientConfigProps());
adminClient = AdminClient.create(ksqlConfig.getKsqlAdminClientConfigProps());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it stands we can't test this class without starting up kafka etc. I suggest we change it so that we can unit test the class. We could add one or two static factory methods, i.e,
KsqlContext.create() and KsqlContext.create(Map<String, Object> properties) we than have a package private constructor KsqlContext(AdminClient, KafkaTopicClient, KsqlEngine)
We can then have unit tests of this class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I'll make the changes.

topicClient = new KafkaTopicClientImpl(adminClient);
ksqlEngine = new KsqlEngine(ksqlConfig, topicClient);
}

protected KsqlContext(KsqlEngine ksqlEngin, AdminClient adminClient, KafkaTopicClient
topicClient) {
this.ksqlEngine = ksqlEngin;
this.adminClient = adminClient;
this.topicClient = topicClient;
}

public MetaStore getMetaStore() {
return ksqlEngine.getMetaStore();
Expand Down Expand Up @@ -98,5 +117,6 @@ public void sql(String sql) throws Exception {
public void close() throws IOException {
ksqlEngine.close();
topicClient.close();
adminClient.close();
}
}
23 changes: 13 additions & 10 deletions ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.confluent.ksql.ddl.DdlConfig;
import io.confluent.ksql.ddl.commands.*;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.parser.exception.ParseFailedException;
import io.confluent.ksql.metastore.*;
import io.confluent.ksql.parser.KsqlParser;
Expand Down Expand Up @@ -57,7 +58,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class KsqlEngine implements Closeable {

Expand All @@ -78,6 +78,8 @@ public class KsqlEngine implements Closeable {
private final Map<Long, PersistentQueryMetadata> persistentQueries;
private final Set<QueryMetadata> liveQueries;

public final FunctionRegistry functionRegistry;

public KsqlEngine(final KsqlConfig ksqlConfig, final KafkaTopicClient topicClient) {
Objects.requireNonNull(ksqlConfig, "Streams properties map cannot be null as it may be mutated later on");

Expand All @@ -90,6 +92,7 @@ public KsqlEngine(final KsqlConfig ksqlConfig, final KafkaTopicClient topicClien

this.persistentQueries = new HashMap<>();
this.liveQueries = new HashSet<>();
this.functionRegistry = new FunctionRegistry();
}

/**
Expand Down Expand Up @@ -128,9 +131,7 @@ public List<QueryMetadata> buildMultipleQueries(
public List<QueryMetadata> planQueries(final boolean createNewAppId,
final List<Pair<String, Statement>> statementList,
final Map<String, Object> overriddenProperties,
final MetaStore tempMetaStore)
throws Exception {

final MetaStore tempMetaStore) throws Exception {
// Logical plan creation from the ASTs
List<Pair<String, PlanNode>> logicalPlans = queryEngine.buildLogicalPlans(tempMetaStore, statementList);

Expand Down Expand Up @@ -345,6 +346,10 @@ public MetaStore getMetaStore() {
return metaStore;
}

public FunctionRegistry getFunctionRegistry() {
return functionRegistry;
}

public KafkaTopicClient getTopicClient() {
return topicClient;
}
Expand All @@ -360,8 +365,7 @@ public boolean terminateQuery(final long queryId, final boolean closeStreams) {
}
liveQueries.remove(queryMetadata);
if (closeStreams) {
queryMetadata.getKafkaStreams().close(100L, TimeUnit.MILLISECONDS);
queryMetadata.getKafkaStreams().cleanUp();
queryMetadata.close();
}
return true;
}
Expand All @@ -388,8 +392,7 @@ public KsqlConfig getKsqlConfig() {
@Override
public void close() throws IOException {
for (QueryMetadata queryMetadata : liveQueries) {
queryMetadata.getKafkaStreams().close(100L, TimeUnit.MILLISECONDS);
queryMetadata.getKafkaStreams().cleanUp();
queryMetadata.close();
}
topicClient.close();
}
Expand All @@ -403,8 +406,8 @@ public boolean terminateAllQueries() {
for (QueryMetadata queryMetadata : liveQueries) {
if (queryMetadata instanceof PersistentQueryMetadata) {
PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) queryMetadata;
persistentQueryMetadata.getKafkaStreams().close(100L, TimeUnit.MILLISECONDS);
persistentQueryMetadata.getKafkaStreams().cleanUp();
persistentQueryMetadata.close();

}
}
} catch (Exception e) {
Expand Down
30 changes: 21 additions & 9 deletions ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ private PlanNode buildQueryLogicalPlan(final Query query, final MetaStore tempMe

AggregateAnalysis aggregateAnalysis = new AggregateAnalysis();
AggregateAnalyzer aggregateAnalyzer = new
AggregateAnalyzer(aggregateAnalysis, analysis);
AggregateExpressionRewriter aggregateExpressionRewriter = new AggregateExpressionRewriter();
AggregateAnalyzer(aggregateAnalysis, analysis, ksqlEngine.getFunctionRegistry());
AggregateExpressionRewriter aggregateExpressionRewriter =
new AggregateExpressionRewriter(ksqlEngine.getFunctionRegistry());
for (Expression expression: analysis.getSelectExpressions()) {
aggregateAnalyzer
.process(expression, new AnalysisContext(null));
Expand Down Expand Up @@ -160,7 +161,7 @@ private PlanNode buildQueryLogicalPlan(final Query query, final MetaStore tempMe


// Build a logical plan
PlanNode logicalPlan = new LogicalPlanner(analysis, aggregateAnalysis).buildPlan();
PlanNode logicalPlan = new LogicalPlanner(analysis, aggregateAnalysis, ksqlEngine.getFunctionRegistry()).buildPlan();
if (logicalPlan instanceof KsqlStructuredDataOutputNode) {
KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode =
(KsqlStructuredDataOutputNode) logicalPlan;
Expand Down Expand Up @@ -217,7 +218,9 @@ public void buildQueryPhysicalPlan(final List<QueryMetadata> physicalPlans,
KsqlConfig ksqlConfigClone = ksqlEngine.getKsqlConfig().clone();

// Build a physical plan, in this case a Kafka Streams DSL
PhysicalPlanBuilder physicalPlanBuilder = new PhysicalPlanBuilder(builder, ksqlConfigClone, ksqlEngine.getTopicClient(), new MetastoreUtil());
PhysicalPlanBuilder physicalPlanBuilder = new PhysicalPlanBuilder(builder, ksqlConfigClone,
ksqlEngine.getTopicClient(),
new MetastoreUtil(), ksqlEngine.getFunctionRegistry());
SchemaKStream schemaKStream = physicalPlanBuilder.buildPhysicalPlan(logicalPlan);

OutputNode outputNode = physicalPlanBuilder.getPlanSink();
Expand Down Expand Up @@ -291,7 +294,8 @@ private QueryMetadata buildPlanForBareQuery(boolean addUniqueTimeSuffix,
schemaKStream.getExecutionPlan(""),
schemaKStream.getQueue(),
(sourceSchemaKstream instanceof SchemaKTable) ?
DataSource.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM
DataSource.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a DataSourceType to SchemaKStream etc and then you can get rid of the instanceOf and call schemaKstream.dataSourceType() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss this a bit more.

applicationId
);
}

Expand All @@ -308,9 +312,15 @@ private QueryMetadata buildPlanForBareQuery(boolean addUniqueTimeSuffix,
* @param persistanceQueryPrefix
*/
private QueryMetadata buildPlanForStructuredOutputNode(boolean addUniqueTimeSuffix,
Pair<String, PlanNode> statementPlanPair, Map<String, Object> overriddenStreamsProperties,
boolean updateMetastore, StreamsBuilder builder, KsqlConfig ksqlConfigClone, SchemaKStream schemaKStream,
KsqlStructuredDataOutputNode outputNode, String serviceId, String persistanceQueryPrefix) {
Pair<String, PlanNode> statementPlanPair,
Map<String, Object> overriddenStreamsProperties,
boolean updateMetastore,
StreamsBuilder builder,
KsqlConfig ksqlConfigClone,
SchemaKStream schemaKStream,
KsqlStructuredDataOutputNode outputNode,
String serviceId,
String persistanceQueryPrefix) {

long queryId = getNextQueryId();

Expand Down Expand Up @@ -353,7 +363,9 @@ private QueryMetadata buildPlanForStructuredOutputNode(boolean addUniqueTimeSuff
streams, outputNode, schemaKStream
.getExecutionPlan(""), queryId,
(schemaKStream instanceof SchemaKTable) ? DataSource
.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM);
.DataSourceType.KTABLE : DataSource.DataSourceType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here

.KSTREAM,
applicationId);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.confluent.ksql.analyzer;

import io.confluent.ksql.function.KsqlFunctions;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.parser.tree.DereferenceExpression;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.parser.tree.FunctionCall;
Expand All @@ -30,6 +30,7 @@ public class AggregateAnalyzer extends DefaultTraversalVisitor<Node, AnalysisCon

private AggregateAnalysis aggregateAnalysis;
private Analysis analysis;
private FunctionRegistry functionRegistry;

private boolean hasAggregateFunction = false;

Expand All @@ -42,15 +43,17 @@ public void setHasAggregateFunction(boolean hasAggregateFunction) {
}

public AggregateAnalyzer(AggregateAnalysis aggregateAnalysis,
Analysis analysis) {
Analysis analysis,
FunctionRegistry functionRegistry) {
this.aggregateAnalysis = aggregateAnalysis;
this.analysis = analysis;
this.functionRegistry = functionRegistry;
}

@Override
protected Node visitFunctionCall(final FunctionCall node, final AnalysisContext context) {
String functionName = node.getName().getSuffix();
if (KsqlFunctions.isAnAggregateFunction(functionName)) {
if (functionRegistry.isAnAggregateFunction(functionName)) {
if (node.getArguments().isEmpty()) {
Expression argExpression;
if (analysis.getJoin() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package io.confluent.ksql.codegen;

import io.confluent.ksql.function.KsqlFunction;
import io.confluent.ksql.function.KsqlFunctions;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.function.udf.Kudf;
import io.confluent.ksql.parser.tree.AstVisitor;
import io.confluent.ksql.parser.tree.Expression;
Expand All @@ -36,17 +36,24 @@

public class CodeGenRunner {

public Map<String, Class> getParameterInfo(final Expression expression, final Schema schema) {
Visitor visitor = new Visitor(schema);
final Schema schema;
final FunctionRegistry functionRegistry;

public CodeGenRunner(Schema schema, FunctionRegistry functionRegistry) {
this.functionRegistry = functionRegistry;
this.schema = schema;
}

public Map<String, Class> getParameterInfo(final Expression expression) {
Visitor visitor = new Visitor(schema, functionRegistry);
visitor.process(expression, null);
return visitor.parameterMap;
}

public ExpressionMetadata buildCodeGenFromParseTree(
final Expression expression,
final Schema schema) throws Exception {
CodeGenRunner codeGenRunner = new CodeGenRunner();
Map<String, Class> parameterMap = codeGenRunner.getParameterInfo(expression, schema);
final Expression expression) throws Exception {
CodeGenRunner codeGenRunner = new CodeGenRunner(schema, functionRegistry);
Map<String, Class> parameterMap = codeGenRunner.getParameterInfo(expression);

String[] parameterNames = new String[parameterMap.size()];
Class[] parameterTypes = new Class[parameterMap.size()];
Expand All @@ -66,15 +73,16 @@ public ExpressionMetadata buildCodeGenFromParseTree(
index++;
}

String javaCode = new SqlToJavaVisitor().process(expression, schema);
String javaCode = new SqlToJavaVisitor(schema, functionRegistry).process(expression);

IExpressionEvaluator ee = CompilerFactoryFactory.getDefaultCompilerFactory().newExpressionEvaluator();

// The expression will have two "int" parameters: "a" and "b".
ee.setParameters(parameterNames, parameterTypes);

// And the expression (i.e. "result") type is also "int".
ExpressionTypeManager expressionTypeManager = new ExpressionTypeManager(schema);
ExpressionTypeManager expressionTypeManager = new ExpressionTypeManager(schema,
functionRegistry);
Schema expressionType = expressionTypeManager.getExpressionType(expression);

ee.setExpressionType(SchemaUtil.getJavaType(expressionType));
Expand All @@ -89,10 +97,12 @@ private class Visitor extends AstVisitor<Object, Object> {

final Schema schema;
final Map<String, Class> parameterMap;
final FunctionRegistry functionRegistry;

Visitor(Schema schema) {
Visitor(Schema schema, FunctionRegistry functionRegistry) {
this.schema = schema;
this.parameterMap = new HashMap<>();
this.functionRegistry = functionRegistry;
}

protected Object visitLikePredicate(LikePredicate node, Object context) {
Expand All @@ -102,7 +112,7 @@ protected Object visitLikePredicate(LikePredicate node, Object context) {

protected Object visitFunctionCall(FunctionCall node, Object context) {
String functionName = node.getName().getSuffix();
KsqlFunction ksqlFunction = KsqlFunctions.getFunction(functionName);
KsqlFunction ksqlFunction = functionRegistry.getFunction(functionName);
parameterMap.put(node.getName().getSuffix(),
ksqlFunction.getKudfClass());
for (Expression argExpr : node.getArguments()) {
Expand Down
Loading