From 1cb530742a2cdb513695016a37c02ccb46315d8c Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Fri, 15 Sep 2017 11:01:10 +0100 Subject: [PATCH] fix checkstyle errors --- checkstyle/suppressions.xml | 7 +- .../confluent/ksql/cli/console/Console.java | 2 +- .../ksql/cli/console/JLineReader.java | 3 +- .../java/io/confluent/ksql/KsqlEngine.java | 2 +- .../java/io/confluent/ksql/QueryEngine.java | 8 +- .../commands/AbstractCreateStreamCommand.java | 4 +- .../rewrite/SqlFormatterQueryRewrite.java | 8 +- .../ksql/physical/PhysicalPlanBuilder.java | 31 +-- .../io/confluent/ksql/util/KsqlConfig.java | 11 +- .../ksql/datagen/DataGenProducer.java | 6 +- .../ksql/datagen/SessionManager.java | 225 +++++++++--------- .../ksql/rest/server/KsqlRestApplication.java | 4 +- .../rest/server/computation/CommandStore.java | 2 +- pom.xml | 4 +- 14 files changed, 156 insertions(+), 161 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 1d2463df3aaf..cb6c5f33cd96 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -7,7 +7,10 @@ - + + + + @@ -34,4 +37,4 @@ - \ No newline at end of file + diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index f523179d1987..6905bd555804 100644 --- a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -214,7 +214,7 @@ public void execute(String line) { for (CliSpecificCommand cliSpecificCommand : cliSpecificCommands.values()) { cliSpecificCommand.printHelp(); writer().println(); - } + } writer().println(); writer().println("Keyboard shortcuts:"); writer().println(); diff --git a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/JLineReader.java b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/JLineReader.java index 716c71b242d6..e187a5e88ca5 100644 --- a/ksql-cli/src/main/java/io/confluent/ksql/cli/console/JLineReader.java +++ b/ksql-cli/src/main/java/io/confluent/ksql/cli/console/JLineReader.java @@ -53,14 +53,13 @@ public String expandHistory(History history, String line) { } public JLineReader(Terminal terminal) { - Expander expander = new NoOpExpander(); - // The combination of parser/expander here allow for multiple-line commands connected by '\\' DefaultParser parser = new DefaultParser(); parser.setEofOnEscapedNewLine(true); parser.setQuoteChars(new char[0]); parser.setEscapeChars(new char[] {'\\'}); + final Expander expander = new NoOpExpander(); // TODO: specify a completer to use here via a call to LineReaderBuilder.completer() this.lineReader = LineReaderBuilder.builder() .appName("KSQL") diff --git a/ksql-core/src/main/java/io/confluent/ksql/KsqlEngine.java b/ksql-core/src/main/java/io/confluent/ksql/KsqlEngine.java index 4deff716ece9..50bf2b16536a 100644 --- a/ksql-core/src/main/java/io/confluent/ksql/KsqlEngine.java +++ b/ksql-core/src/main/java/io/confluent/ksql/KsqlEngine.java @@ -406,7 +406,7 @@ public boolean terminateAllQueries() { for (QueryMetadata queryMetadata: liveQueries) { if (queryMetadata instanceof PersistentQueryMetadata) { PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) queryMetadata; - persistentQueryMetadata.getKafkaStreams().close(100l, TimeUnit.MILLISECONDS); + persistentQueryMetadata.getKafkaStreams().close(100L, TimeUnit.MILLISECONDS); persistentQueryMetadata.getKafkaStreams().cleanUp(); } } diff --git a/ksql-core/src/main/java/io/confluent/ksql/QueryEngine.java b/ksql-core/src/main/java/io/confluent/ksql/QueryEngine.java index a7f7c79ad6ed..11e3516da983 100644 --- a/ksql-core/src/main/java/io/confluent/ksql/QueryEngine.java +++ b/ksql-core/src/main/java/io/confluent/ksql/QueryEngine.java @@ -236,12 +236,12 @@ public void buildQueryPhysicalPlan(final List physicalPlans, } String serviceId = ksqlEngine.getKsqlConfig() .get(KsqlConfig.KSQL_SERVICE_ID_CONFIG).toString(); - String persistance_query_prefix = ksqlEngine.getKsqlConfig() + String persistanceQueryPrefix = ksqlEngine.getKsqlConfig() .get(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG).toString(); - String transient_query_prefix = ksqlEngine.getKsqlConfig() + String transientQueryPrefix = ksqlEngine.getKsqlConfig() .get(KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG).toString(); if (isBareQuery) { - String applicationId = getBareQueryApplicationId(serviceId, transient_query_prefix); + String applicationId = getBareQueryApplicationId(serviceId, transientQueryPrefix); if (addUniqueTimeSuffix) { applicationId = addTimeSuffix(applicationId); } @@ -267,7 +267,7 @@ public void buildQueryPhysicalPlan(final List physicalPlans, } else if (outputNode instanceof KsqlStructuredDataOutputNode) { long queryId = getNextQueryId(); - String applicationId = serviceId + persistance_query_prefix + + String applicationId = serviceId + persistanceQueryPrefix + queryId; if (addUniqueTimeSuffix) { applicationId = addTimeSuffix(applicationId); diff --git a/ksql-core/src/main/java/io/confluent/ksql/ddl/commands/AbstractCreateStreamCommand.java b/ksql-core/src/main/java/io/confluent/ksql/ddl/commands/AbstractCreateStreamCommand.java index 376532106041..c0e258b70c9f 100644 --- a/ksql-core/src/main/java/io/confluent/ksql/ddl/commands/AbstractCreateStreamCommand.java +++ b/ksql-core/src/main/java/io/confluent/ksql/ddl/commands/AbstractCreateStreamCommand.java @@ -53,12 +53,12 @@ public abstract class AbstractCreateStreamCommand implements DDLCommand { public AbstractCreateStreamCommand(final AbstractStreamCreateStatement statement, Map overriddenProperties, KafkaTopicClient kafkaTopicClient) { - // TODO: get rid of toUpperCase in following code - Map properties = statement.getProperties(); this.sourceName = statement.getName().getSuffix(); this.topicName = this.sourceName; this.kafkaTopicClient = kafkaTopicClient; + // TODO: get rid of toUpperCase in following code + Map properties = statement.getProperties(); validateWithClause(properties.keySet()); if (properties.containsKey(DdlConfig.TOPIC_NAME_PROPERTY) && diff --git a/ksql-core/src/main/java/io/confluent/ksql/parser/rewrite/SqlFormatterQueryRewrite.java b/ksql-core/src/main/java/io/confluent/ksql/parser/rewrite/SqlFormatterQueryRewrite.java index 185122547019..8181bbf2d367 100644 --- a/ksql-core/src/main/java/io/confluent/ksql/parser/rewrite/SqlFormatterQueryRewrite.java +++ b/ksql-core/src/main/java/io/confluent/ksql/parser/rewrite/SqlFormatterQueryRewrite.java @@ -591,10 +591,10 @@ protected Void visitCreateTableAsSelect(CreateTableAsSelect node, Integer indent if (!node.getProperties().isEmpty()) { builder.append(" WITH ("); Joiner.on(", ").appendTo(builder, - transform(node.getProperties().entrySet(), - entry -> entry.getKey() + " = " - + ExpressionFormatterQueryRewrite - .formatExpression(entry.getValue()))); + transform(node.getProperties().entrySet(), entry -> entry.getKey() + + " = " + + ExpressionFormatterQueryRewrite + .formatExpression(entry.getValue()))); builder.append(")"); } diff --git a/ksql-core/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java b/ksql-core/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java index ff216190a333..71dec2aa2a42 100644 --- a/ksql-core/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java +++ b/ksql-core/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java @@ -450,25 +450,16 @@ public GenericRow apply(GenericRow aggValue, GenericRow newValue) { builder .stream(autoOffsetReset, Serdes.String(), genericRowSerde, ksqlTable.getKsqlTopic().getKafkaTopicName()) - .map(new KeyValueMapper>() { - @Override - public KeyValue apply(String key, GenericRow row) { - if (row != null) { - row.getColumns().add(0, key); + .map((KeyValueMapper>) (key, row) -> { + if (row != null) { + row.getColumns().add(0, key); - } - return new KeyValue<>(key, row); } + return new KeyValue<>(key, row); }); kstream = addTimestampColumn(kstream); ktable = kstream.groupByKey(Serdes.String(), genericRowSerdeAfterRead) - .reduce(new Reducer() { - @Override - public GenericRow apply(GenericRow aggValue, GenericRow newValue) { - return newValue; - } - }, ksqlTable.getStateStoreName()); + .reduce((Reducer) (aggValue, newValue) -> newValue, ksqlTable.getStateStoreName()); } return new SchemaKTable(sourceNode.getSchema(), ktable, @@ -482,16 +473,12 @@ public GenericRow apply(GenericRow aggValue, GenericRow newValue) { builder .stream(Serdes.String(), genericRowSerde, ksqlStream.getKsqlTopic().getKafkaTopicName()) - .map(new KeyValueMapper>() { - @Override - public KeyValue apply(String key, GenericRow row) { - if (row != null) { - row.getColumns().add(0, key); + .map((KeyValueMapper>) (key, row) -> { + if (row != null) { + row.getColumns().add(0, key); - } - return new KeyValue<>(key, row); } + return new KeyValue<>(key, row); }); kstream = addTimestampColumn(kstream); return new SchemaKStream(sourceNode.getSchema(), kstream, diff --git a/ksql-core/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-core/src/main/java/io/confluent/ksql/util/KsqlConfig.java index d67d592ad289..82be170b0c93 100644 --- a/ksql-core/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-core/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -115,23 +115,22 @@ public KsqlConfig(Map props) { ksqlConfigProps.put(KSQL_TABLE_STATESTORE_NAME_SUFFIX_CONFIG, KSQL_TABLE_STATESTORE_NAME_SUFFIX_DEFAULT); if (props.containsKey(DEFAULT_SINK_NUMBER_OF_PARTITIONS)) { - ksqlConfigProps.put(SINK_NUMBER_OF_PARTITIONS, Integer.parseInt(props.get - (DEFAULT_SINK_NUMBER_OF_PARTITIONS).toString())); + ksqlConfigProps.put(SINK_NUMBER_OF_PARTITIONS, + Integer.parseInt(props.get(DEFAULT_SINK_NUMBER_OF_PARTITIONS).toString())); } else { ksqlConfigProps.put(SINK_NUMBER_OF_PARTITIONS, defaultSinkNumberOfPartitions); } if (props.containsKey(DEFAULT_SINK_NUMBER_OF_REPLICATIONS)) { - ksqlConfigProps.put(SINK_NUMBER_OF_REPLICATIONS, Short.parseShort(props.get - (DEFAULT_SINK_NUMBER_OF_REPLICATIONS).toString())); + ksqlConfigProps.put(SINK_NUMBER_OF_REPLICATIONS, + Short.parseShort(props.get(DEFAULT_SINK_NUMBER_OF_REPLICATIONS).toString())); } else { ksqlConfigProps.put(SINK_NUMBER_OF_REPLICATIONS, defaultSinkNumberOfReplications); } if (props.containsKey(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION)) { ksqlConfigProps.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION, - Long.parseLong(props.get - (DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION).toString())); + Long.parseLong(props.get(DEFAULT_SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION).toString())); } else { ksqlConfigProps.put(SINK_WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION, defaultSinkWindowChangeLogAdditionalRetention); diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java index 0329cb7fac8f..a8dc63cd40b0 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java @@ -134,7 +134,7 @@ private void handleSessionSiblingField(GenericRecord randomAvroMessage, List oldest = null; + for (Map.Entry entry : expiredSessions.entrySet()) { + if (oldest == null || (entry.getValue().created < oldest.getValue().created)) { + oldest = entry; + } } - public boolean isActiveAndExpire(String sessionId) { - boolean b = isActive(sessionId); - if (b) { - SessionObject sessionObject = activeSessions.get(sessionId); - if (sessionObject.isExpired()){ - System.out.println("***Expired:" + sessionId); - SessionObject removed = activeSessions.remove(sessionId); - expiredSessions.put(sessionId, removed); - return false; - } - - } - return b; + if (oldest != null) { + expiredSessions.remove(oldest.getKey()); + return oldest.getKey(); + } + return null; + } + + public String getRandomActiveToken() { + int randomIndex = (int) (Math.random() * activeSessions.size()); + return new ArrayList(activeSessions.keySet()).get(randomIndex); + } + + public String getActiveSessionThatHasExpired() { + String expiredToken = null; + for (String s : activeSessions.keySet()) { + if (activeSessions.get(s).isExpired()) { + expiredToken = s; + } + } + if (expiredToken != null) { + expiredSessions.put(expiredToken, activeSessions.remove(expiredToken)); } + return expiredToken; + } - public void newSession(String sessionToken) { - if (activeSessions.containsKey(sessionToken)) throw new RuntimeException("Session" + sessionToken + " already exists"); - activeSessions.putIfAbsent(sessionToken, new SessionObject(maxSessionDurationSeconds)); + public String getToken(String s) { + if (activeSessions.containsKey(s)) { + return s; } - public boolean isExpiredSession(String sessionId) { - return expiredSessions.containsKey(sessionId); + // MaxedOut = then reuse active key + if (activeSessions.size() == maxSessions) { + int randomIndex = (int) (Math.random() * activeSessions.size()); + return new ArrayList(activeSessions.keySet()).get(randomIndex); } - public String recycleOldestExpired() { - Map.Entry oldest = null; - for (Map.Entry entry : expiredSessions.entrySet()) { - if (oldest == null || (entry.getValue().created < oldest.getValue().created)) { - oldest = entry; - } - } - if (oldest != null) { - expiredSessions.remove(oldest.getKey()); - return oldest.getKey(); - } - return null; - } - public String getRandomActiveToken(){ - int randomIndex = (int) (Math.random() * activeSessions.size()); - return new ArrayList(activeSessions.keySet()).get(randomIndex); + // we have a new sessionId, =- if it is expired then we will allow reuse + if (expiredSessions.containsKey(s)) { + expiredSessions.remove(s); + return s; } - public String getActiveSessionThatHasExpired(){ - String expiredToken = null; - for (String s : activeSessions.keySet()) { - if (activeSessions.get(s).isExpired()) { - expiredToken = s; - } - } - if (expiredToken != null) { - expiredSessions.put(expiredToken, activeSessions.remove(expiredToken)); - } - - return expiredToken; - } - - public String getToken(String s) { - if (activeSessions.containsKey(s)) return s; - // MaxedOut = then reuse active key - if (activeSessions.size() == maxSessions) { - int randomIndex = (int) (Math.random() * activeSessions.size()); - return new ArrayList(activeSessions.keySet()).get(randomIndex); - } + return s; + } - // we have a new sessionId, =- if it is expired then we will allow reuse - if (expiredSessions.containsKey(s)) { - expiredSessions.remove(s); - return s; - } + public int getActiveSessionCount() { + return activeSessions.size(); + } - return s; + public String randomActiveSession() { + if (activeSessions.size() == 0) { + return null; } + return activeSessions.keySet().iterator().next(); + } - public int getActiveSessionCount() { - return activeSessions.size(); - } + public int getMaxSessions() { + return maxSessions; + } - public String randomActiveSession() { - if (activeSessions.size() == 0) return null; - return activeSessions.keySet().iterator().next(); + public static class SessionObject { + + public SessionObject(int duration) { + this.sessionDurationSecs = duration; } - public int getMaxSessions() { - return maxSessions; + long created = System.currentTimeMillis(); + private long sessionDurationSecs = 300; + + public boolean isExpired() { + return (System.currentTimeMillis() - created) / 1000 > sessionDurationSecs; } - public static class SessionObject { - public SessionObject(int duration) { - this.sessionDurationSecs = duration; - } - long created = System.currentTimeMillis(); - private long sessionDurationSecs = 300; - - public boolean isExpired() { - return (System.currentTimeMillis() - created)/1000 > sessionDurationSecs; - } - - @Override - public String toString() { - return "Session:" + new Date(created).toString(); - } + @Override + public String toString() { + return "Session:" + new Date(created).toString(); } + } - Map expiredSessions = new HashMap(); - Map activeSessions = new HashMap(); + Map expiredSessions = new HashMap(); + Map activeSessions = new HashMap(); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 5ce9c8a9e8e7..7f8523a19288 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -213,8 +213,8 @@ public static KsqlRestApplication buildApplication(KsqlRestConfig restConfig, bo try { short replicationFactor = 1; if(restConfig.getOriginals().containsKey(KsqlConfig.DEFAULT_SINK_NUMBER_OF_REPLICATIONS)) { - replicationFactor = Short.parseShort(restConfig.getOriginals().get - (KsqlConfig.DEFAULT_SINK_NUMBER_OF_REPLICATIONS).toString()); + replicationFactor = Short.parseShort(restConfig.getOriginals() + .get(KsqlConfig.DEFAULT_SINK_NUMBER_OF_REPLICATIONS).toString()); } client.createTopic(commandTopic, 1, replicationFactor); } catch (KafkaTopicException e) { diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java index 26ea5bd02624..b982eea80204 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java @@ -156,7 +156,7 @@ private List> getAllPriorCommandRecords() { for (ConsumerRecord record : records) { result.add(record); TopicPartition recordTopicPartition = - new TopicPartition(record.topic(), record.partition()); + new TopicPartition(record.topic(), record.partition()); Long currentOffset = currentOffsets.get(recordTopicPartition); if (currentOffset == null || currentOffset < record.offset()) { currentOffsets.put(recordTopicPartition, record.offset()); diff --git a/pom.xml b/pom.xml index 943fe0356201..cf72f674a9e3 100644 --- a/pom.xml +++ b/pom.xml @@ -205,8 +205,8 @@ org.apache.maven.plugins maven-checkstyle-plugin - false - false + true + true