Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ksql 386 ksql functions non static #370

Merged

Conversation

hjafarpour
Copy link
Contributor

This PR refactors the function meta data handling by using a Function registry.

@@ -20,6 +20,6 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.confluent</groupId>
<artifactId>build-tools</artifactId>
<version>4.0.0-SNAPSHOT</version>
<version>4.1.0-SNAPSHOT</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 4.1.0-SNAPSHOT? Shouldn't this branch remain on 4.0?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 4.0.1 using kafka 11.0.1 where the adminclient is fixed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be 4.0.0. I think it was messed up when I synced while Alex was preparing the branches. Will fix it.

@@ -23,7 +23,7 @@
<parent>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksql-parent</artifactId>
<version>4.0.0-SNAPSHOT</version>
<version>4.1.0-SNAPSHOT</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, will fix it :)

@@ -48,6 +48,10 @@
public static final String DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION =
"ksql.sink.window.change.log.additional.retention.default";

public static final String STREAM_INTERNAL_CHANGELOG_TOPIC_SUFFIX = "-changelog";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these need to be public? Looks like they could be package protected and don't need to be known by a user?

@@ -61,8 +64,8 @@ public KsqlContext() {
streamsProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVER_OPTION_DEFAULT);
}
KsqlConfig ksqlConfig = new KsqlConfig(streamsProperties);

topicClient = new KafkaTopicClientImpl(ksqlConfig.getKsqlAdminClientConfigProps());
adminClient = AdminClient.create(ksqlConfig.getKsqlAdminClientConfigProps());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it stands we can't test this class without starting up kafka etc. I suggest we change it so that we can unit test the class. We could add one or two static factory methods, i.e,
KsqlContext.create() and KsqlContext.create(Map<String, Object> properties) we than have a package private constructor KsqlContext(AdminClient, KafkaTopicClient, KsqlEngine)
We can then have unit tests of this class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! I'll make the changes.

final MetaStore tempMetaStore)
throws Exception {

public List<QueryMetadata> planQueries(final boolean createNewAppId, final List<Pair<String, Statement>> statementList, final Map<String, Object> overriddenProperties, final MetaStore tempMetaStore) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with so many params and a long line length it would be good to have 1 param per line. Makes the diffs easier to read

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

@@ -120,7 +123,7 @@ private Predicate getWindowedKeyPredicate() throws Exception {
CodeGenRunner codeGenRunner = new CodeGenRunner();
ExpressionMetadata
expressionEvaluator =
codeGenRunner.buildCodeGenFromParseTree(filterExpression, schema);
codeGenRunner.buildCodeGenFromParseTree(filterExpression, schema, ksqlFunctionRegistry);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass KSqlFunctionRegistry into constructor of CodeGenRunner ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! Done :)

import java.util.List;
import java.util.Set;

public class CleanUpUtil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where this class is used and there are no tests for it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will use it in the clean up task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code shouldn't be added without tests


public KafkaTopicClientImpl(Map<String, Object> adminClientConfig) {
this.adminClientConfig = adminClientConfig;
public KafkaTopicClientImpl(AdminClient adminClient) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: final

} catch (InterruptedException | ExecutionException e) {
throw new KafkaResponseGetFailedException("Failed to describe kafka topics", e);
}
}

public void deleteTopics(List<String> topicsToDelete) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests for this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add test when I finish the clean up task which this method is for.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

hasDeleteErrors = true;
}
}
if (hasDeleteErrors) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than having a flag it would be better to collect all the failures into a list and then throw an exception with all the topics that failed. Also, we shouldn't be throwing RuntimeException - at the very least it should be KsqlException, but perhaps a DeleteTopicsFailedException or something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, Done!

@@ -165,7 +172,7 @@ private static String formatQualifiedName(QualifiedName name) {
Boolean unmangleNames) {
StringBuilder builder = new StringBuilder("(");
String name = node.getName().getSuffix();
KsqlFunction ksqlFunction = KsqlFunctions.getFunction(name);
KsqlFunction ksqlFunction = ksqlFunctionRegistry.getFunction(name);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to remove the ksql prefixing everywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yer, consider it done :)

@@ -46,12 +46,16 @@
import java.util.List;
import java.util.Map;

public class KsqlFunctions {
public class KsqlFunctionRegistry {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can rename this to FunctionRegistry - we have an an explosion of KSQLEverything


public class KafkaTopicClientImpl implements KafkaTopicClient {
private static final Logger log = LoggerFactory.getLogger(KafkaTopicClient.class);
private final Map<String, Object> adminClientConfig;
private final AdminClient adminClient;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 4.0.1 using kafka 11.0.1 where the adminclient is fixed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use 4.0.0 and it has Kafka 1.0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the adminclient should be fixed there. I also reimported your changes to use singleton adminclinent

@@ -20,6 +20,6 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.confluent</groupId>
<artifactId>build-tools</artifactId>
<version>4.0.0-SNAPSHOT</version>
<version>4.1.0-SNAPSHOT</version>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 4.0.1 using kafka 11.0.1 where the adminclient is fixed?

@@ -69,12 +72,12 @@ private PlanNode buildLogicalPlan(String queryStr) {
analyzer.process(statements.get(0), new AnalysisContext(null));
AggregateAnalysis aggregateAnalysis = new AggregateAnalysis();
AggregateAnalyzer aggregateAnalyzer = new AggregateAnalyzer(aggregateAnalysis,
analysis);
analysis, ksqlFunctionRegistry);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would make me happy to rename this to functionRegistry

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely :) It's done!

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hjafarpour, still no tests for CleanupUtil and KafkaTopicClient.deleteTopics

@@ -50,7 +63,7 @@ public KsqlContext() {
*
* @param streamsProperties
*/
public KsqlContext(Map<String, Object> streamsProperties) {
private KsqlContext(Map<String, Object> streamsProperties) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put the logic in here into the create(Map) method and have it call the new constructor below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

return new KsqlContext(streamsProperties);
}

private KsqlContext() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!



public static KsqlContext create() {
return new KsqlContext();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delegation, i.e, create(Collections.emptyMap())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, made the changes.

@dguy
Copy link
Contributor

dguy commented Oct 17, 2017

@hjafarpour this PR doesn't look right. You have commits that are unrelated, i.e., those from @aayars

@hjafarpour
Copy link
Contributor Author

hjafarpour commented Oct 17, 2017

@dguy the commits by alex came from a merge I had to do with master.
Also the tests for CleanUp and delete topic will be part of the next PR that I am working on.

@dguy
Copy link
Contributor

dguy commented Oct 17, 2017

@hjafarpour why would you need to merge master into 4.0.x? It should be the other way around

Copy link
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hjafarpour hjafarpour merged commit de6772e into confluentinc:4.0.x Oct 17, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants