Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: fix checkstyle errors and fail build if there are checkstyle violations #298

Merged
merged 1 commit into from
Sep 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
<suppressions>

<!-- Checkstyle 6.19 doesn't like properly-formatted annotations -->
<suppress files="(KsqlEntity|Cli|Ksql).java" checks="IndentationCheck"/>
<suppress files="(KsqlEntity|Cli|Ksql|AbstractCliCommands).java" checks="IndentationCheck"/>

<!-- can't work out how to resolve the issue with the do {..} while -->
<suppress files="CommandStore.java" checks="RightCurly"/>

<!-- Suppress some checks on all files -->
<suppress files="." checks="ExplicitInitializationCheck"/>
Expand All @@ -34,4 +37,4 @@
<suppress files="." checks="EmptyLineSeparator"/>
<suppress files="." checks="NeedBraces"/>

</suppressions>
</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void execute(String line) {
for (CliSpecificCommand cliSpecificCommand : cliSpecificCommands.values()) {
cliSpecificCommand.printHelp();
writer().println();
}
}
writer().println();
writer().println("Keyboard shortcuts:");
writer().println();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,13 @@ public String expandHistory(History history, String line) {
}

public JLineReader(Terminal terminal) {
Expander expander = new NoOpExpander();

// The combination of parser/expander here allow for multiple-line commands connected by '\\'
DefaultParser parser = new DefaultParser();
parser.setEofOnEscapedNewLine(true);
parser.setQuoteChars(new char[0]);
parser.setEscapeChars(new char[] {'\\'});

final Expander expander = new NoOpExpander();
// TODO: specify a completer to use here via a call to LineReaderBuilder.completer()
this.lineReader = LineReaderBuilder.builder()
.appName("KSQL")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ public boolean terminateAllQueries() {
for (QueryMetadata queryMetadata: liveQueries) {
if (queryMetadata instanceof PersistentQueryMetadata) {
PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) queryMetadata;
persistentQueryMetadata.getKafkaStreams().close(100l, TimeUnit.MILLISECONDS);
persistentQueryMetadata.getKafkaStreams().close(100L, TimeUnit.MILLISECONDS);
persistentQueryMetadata.getKafkaStreams().cleanUp();
}
}
Expand Down
8 changes: 4 additions & 4 deletions ksql-core/src/main/java/io/confluent/ksql/QueryEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,12 @@ public void buildQueryPhysicalPlan(final List<QueryMetadata> physicalPlans,
}
String serviceId = ksqlEngine.getKsqlConfig()
.get(KsqlConfig.KSQL_SERVICE_ID_CONFIG).toString();
String persistance_query_prefix = ksqlEngine.getKsqlConfig()
String persistanceQueryPrefix = ksqlEngine.getKsqlConfig()
.get(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG).toString();
String transient_query_prefix = ksqlEngine.getKsqlConfig()
String transientQueryPrefix = ksqlEngine.getKsqlConfig()
.get(KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG).toString();
if (isBareQuery) {
String applicationId = getBareQueryApplicationId(serviceId, transient_query_prefix);
String applicationId = getBareQueryApplicationId(serviceId, transientQueryPrefix);
if (addUniqueTimeSuffix) {
applicationId = addTimeSuffix(applicationId);
}
Expand All @@ -267,7 +267,7 @@ public void buildQueryPhysicalPlan(final List<QueryMetadata> physicalPlans,
} else if (outputNode instanceof KsqlStructuredDataOutputNode) {
long queryId = getNextQueryId();

String applicationId = serviceId + persistance_query_prefix +
String applicationId = serviceId + persistanceQueryPrefix +
queryId;
if (addUniqueTimeSuffix) {
applicationId = addTimeSuffix(applicationId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ public abstract class AbstractCreateStreamCommand implements DDLCommand {
public AbstractCreateStreamCommand(final AbstractStreamCreateStatement statement,
Map<String, Object> overriddenProperties,
KafkaTopicClient kafkaTopicClient) {
// TODO: get rid of toUpperCase in following code
Map<String, Expression> properties = statement.getProperties();
this.sourceName = statement.getName().getSuffix();
this.topicName = this.sourceName;
this.kafkaTopicClient = kafkaTopicClient;

// TODO: get rid of toUpperCase in following code
Map<String, Expression> properties = statement.getProperties();
validateWithClause(properties.keySet());

if (properties.containsKey(DdlConfig.TOPIC_NAME_PROPERTY) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,10 +591,10 @@ protected Void visitCreateTableAsSelect(CreateTableAsSelect node, Integer indent
if (!node.getProperties().isEmpty()) {
builder.append(" WITH (");
Joiner.on(", ").appendTo(builder,
transform(node.getProperties().entrySet(),
entry -> entry.getKey() + " = "
+ ExpressionFormatterQueryRewrite
.formatExpression(entry.getValue())));
transform(node.getProperties().entrySet(), entry -> entry.getKey()
+ " = "
+ ExpressionFormatterQueryRewrite
.formatExpression(entry.getValue())));
builder.append(")");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,25 +450,16 @@ public GenericRow apply(GenericRow aggValue, GenericRow newValue) {
builder
.stream(autoOffsetReset, Serdes.String(), genericRowSerde,
ksqlTable.getKsqlTopic().getKafkaTopicName())
.map(new KeyValueMapper<String, GenericRow, KeyValue<String,
GenericRow>>() {
@Override
public KeyValue<String, GenericRow> apply(String key, GenericRow row) {
if (row != null) {
row.getColumns().add(0, key);
.map((KeyValueMapper<String, GenericRow, KeyValue<String, GenericRow>>) (key, row) -> {
if (row != null) {
row.getColumns().add(0, key);

}
return new KeyValue<>(key, row);
}
return new KeyValue<>(key, row);
});
kstream = addTimestampColumn(kstream);
ktable = kstream.groupByKey(Serdes.String(), genericRowSerdeAfterRead)
.reduce(new Reducer<GenericRow>() {
@Override
public GenericRow apply(GenericRow aggValue, GenericRow newValue) {
return newValue;
}
}, ksqlTable.getStateStoreName());
.reduce((Reducer<GenericRow>) (aggValue, newValue) -> newValue, ksqlTable.getStateStoreName());
}

return new SchemaKTable(sourceNode.getSchema(), ktable,
Expand All @@ -482,16 +473,12 @@ public GenericRow apply(GenericRow aggValue, GenericRow newValue) {
builder
.stream(Serdes.String(), genericRowSerde,
ksqlStream.getKsqlTopic().getKafkaTopicName())
.map(new KeyValueMapper<String, GenericRow, KeyValue<String,
GenericRow>>() {
@Override
public KeyValue<String, GenericRow> apply(String key, GenericRow row) {
if (row != null) {
row.getColumns().add(0, key);
.map((KeyValueMapper<String, GenericRow, KeyValue<String, GenericRow>>) (key, row) -> {
if (row != null) {
row.getColumns().add(0, key);

}
return new KeyValue<>(key, row);
}
return new KeyValue<>(key, row);
});
kstream = addTimestampColumn(kstream);
return new SchemaKStream(sourceNode.getSchema(), kstream,
Expand Down
11 changes: 5 additions & 6 deletions ksql-core/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,23 +115,22 @@ public KsqlConfig(Map<?, ?> props) {
ksqlConfigProps.put(KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG, KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT);

if (props.containsKey(DEFAULT_SINK_NUMBER_OF_PARTITIONS)) {
ksqlConfigProps.put(SINK_NUMBER_OF_PARTITIONS, Integer.parseInt(props.get
(DEFAULT_SINK_NUMBER_OF_PARTITIONS).toString()));
ksqlConfigProps.put(SINK_NUMBER_OF_PARTITIONS,
Integer.parseInt(props.get(DEFAULT_SINK_NUMBER_OF_PARTITIONS).toString()));
} else {
ksqlConfigProps.put(SINK_NUMBER_OF_PARTITIONS, defaultSinkNumberOfPartitions);
}

if (props.containsKey(DEFAULT_SINK_NUMBER_OF_REPLICATIONS)) {
ksqlConfigProps.put(SINK_NUMBER_OF_REPLICATIONS, Short.parseShort(props.get
(DEFAULT_SINK_NUMBER_OF_REPLICATIONS).toString()));
ksqlConfigProps.put(SINK_NUMBER_OF_REPLICATIONS,
Short.parseShort(props.get(DEFAULT_SINK_NUMBER_OF_REPLICATIONS).toString()));
} else {
ksqlConfigProps.put(SINK_NUMBER_OF_REPLICATIONS, defaultSinkNumberOfReplications);
}

if (props.containsKey(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION)) {
ksqlConfigProps.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION,
Long.parseLong(props.get
(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION).toString()));
Long.parseLong(props.get(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION).toString()));
} else {
ksqlConfigProps.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION,
defaultSinkWindowChangeLogAdditionalRetention);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private void handleSessionSiblingField(GenericRecord randomAvroMessage, List<Obj
genericRowValues.add(randomAvroMessage.get(field.name()));
}
} catch (Exception err) {
genericRowValues.add(randomAvroMessage.get(field.name()));
genericRowValues.add(randomAvroMessage.get(field.name()));
}
}

Expand Down Expand Up @@ -226,14 +226,12 @@ private String handleSessionisationOfValue(SessionManager sessionManager, String
}

if (value != null) {
// System.out.println("1-New Session:" + value + " Sessions: " + sessionManager.getActiveSessionCount());
sessionManager.newSession(value);
} else {
value = sessionManager.recycleOldestExpired();
if (value == null) {
new RuntimeException("Ran out of tokens to rejuice - increase session-duration (300s), reduce-number of sessions(5), number of tokens in the avro template");
throw new RuntimeException("Ran out of tokens to rejuice - increase session-duration (300s), reduce-number of sessions(5), number of tokens in the avro template");
}
// System.out.println("2-New [Recycle] Session:" + value + " Tokens:" + allTokens.size());
sessionManager.newSession(value);
return value;
}
Expand Down
Loading