Skip to content

Commit

Permalink
updates for line length style check
Browse files Browse the repository at this point in the history
  • Loading branch information
norwood committed Jan 19, 2018
1 parent 967d138 commit 14994e5
Show file tree
Hide file tree
Showing 79 changed files with 4,179 additions and 2,311 deletions.
3 changes: 1 addition & 2 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@
<!-- These checks may be included in the future -->
<suppress files="." checks="OverloadMethodsDeclarationOrder"/>
<suppress files="." checks="NonEmptyAtclauseDescription"/>
<suppress files="." checks="LineLengthCheck"/>
<suppress files="." checks="MethodLengthCheck"/>
<suppress files="." checks="EmptyLineSeparator"/>
<suppress files="." checks="NeedBraces"/>

</suppressions>
</suppressions>
129 changes: 84 additions & 45 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,6 @@

package io.confluent.ksql.cli;

import io.confluent.ksql.KsqlEngine;
import io.confluent.ksql.cli.console.CliSpecificCommand;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ErrorMessageEntity;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.util.CliUtils;
import io.confluent.ksql.ddl.DdlConfig;
import io.confluent.ksql.parser.AstBuilder;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.SqlBaseParser;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.cli.console.Console;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.CommonUtils;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Version;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -55,7 +34,12 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Scanner;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -64,6 +48,28 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import io.confluent.ksql.KsqlEngine;
import io.confluent.ksql.cli.console.CliSpecificCommand;
import io.confluent.ksql.cli.console.Console;
import io.confluent.ksql.ddl.DdlConfig;
import io.confluent.ksql.parser.AstBuilder;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.SqlBaseParser;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.ErrorMessageEntity;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.PropertiesList;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.util.CliUtils;
import io.confluent.ksql.util.CommonUtils;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Version;

public class Cli implements Closeable, AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(Cli.class);
Expand All @@ -88,10 +94,10 @@ public Cli(
Objects.requireNonNull(restClient, "Must provide the CLI with a REST client");
Objects.requireNonNull(terminal, "Must provide the CLI with a terminal");

this.streamedQueryRowLimit = streamedQueryRowLimit;
this.streamedQueryRowLimit = streamedQueryRowLimit;
this.streamedQueryTimeoutMs = streamedQueryTimeoutMs;
this.restClient = restClient;
this.terminal = terminal;
this.terminal = terminal;

this.queryStreamExecutorService = Executors.newSingleThreadExecutor();
}
Expand Down Expand Up @@ -151,7 +157,8 @@ private void displayWelcomeMessage() {
int logoWidth = 33;
String copyrightMessage = "Copyright 2017 Confluent Inc.";
String helpReminderMessage = "Having trouble? "
+ "Type 'help' (case-insensitive) for a rundown of how things work!";
+ "Type 'help' (case-insensitive) for a rundown of how things "
+ "work!";
// Don't want to display the logo if it'll just end up getting wrapped and looking hideous
if (terminal.getWidth() >= logoWidth) {
// Want to center the logo, but in the case of something like a fullscreen terminal, just
Expand All @@ -161,7 +168,10 @@ private void displayWelcomeMessage() {
// Math.min(terminal.getWidth(), helpReminderMessage.length())
int paddedLogoWidth = Math.min(terminal.getWidth(), helpReminderMessage.length());
int paddingWidth = (paddedLogoWidth - logoWidth) / 2;
String leftPadding = new String(new byte[paddingWidth], StandardCharsets.UTF_8).replaceAll(".", " ");
String leftPadding = new String(
new byte[paddingWidth],
StandardCharsets.UTF_8
).replaceAll(".", " ");
terminal.writer().printf("%s======================================%n", leftPadding);
terminal.writer().printf("%s= _ __ _____ ____ _ =%n", leftPadding);
terminal.writer().printf("%s= | |/ // ____|/ __ \\| | =%n", leftPadding);
Expand Down Expand Up @@ -234,7 +244,8 @@ public void handleLine(String line) throws Exception {
}

String[] commandArgs = trimmedLine.split("\\s+", 2);
CliSpecificCommand cliSpecificCommand = terminal.getCliSpecificCommands().get(commandArgs[0].toLowerCase());
CliSpecificCommand cliSpecificCommand =
terminal.getCliSpecificCommands().get(commandArgs[0].toLowerCase());
if (cliSpecificCommand != null) {
cliSpecificCommand.execute(commandArgs.length > 1 ? commandArgs[1] : "");
} else {
Expand All @@ -245,9 +256,10 @@ public void handleLine(String line) throws Exception {
/**
* Attempt to read a logical line of input from the user. Can span multiple physical lines, as
* long as all but the last end with '\\'.
*
* @return The parsed, logical line.
* @throws EndOfFileException If there is no more input available from the user.
* @throws IOException If any other I/O error occurs.
* @throws IOException If any other I/O error occurs.
*/
private String readLine() throws IOException {
while (true) {
Expand Down Expand Up @@ -279,7 +291,11 @@ private void handleStatements(String line)
String statementText = KsqlEngine.getStatementString(statementContext);
if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext
|| statementContext.statement() instanceof SqlBaseParser.PrintTopicContext) {
consecutiveStatements = printOrDisplayQueryResults(consecutiveStatements, statementContext, statementText);
consecutiveStatements = printOrDisplayQueryResults(
consecutiveStatements,
statementContext,
statementText
);

} else if (statementContext.statement() instanceof SqlBaseParser.ListPropertiesContext) {
listProperties(statementText);
Expand All @@ -304,7 +320,11 @@ private void handleStatements(String line)
}
}

private void registerTopic(StringBuilder consecutiveStatements, SqlBaseParser.SingleStatementContext statementContext, String statementText) {
private void registerTopic(
StringBuilder consecutiveStatements,
SqlBaseParser.SingleStatementContext statementContext,
String statementText
) {
CliUtils cliUtils = new CliUtils();
Optional<String> avroSchema = cliUtils.getAvroSchemaIfAvroTopic(
(SqlBaseParser.RegisterTopicContext) statementContext.statement());
Expand All @@ -314,24 +334,36 @@ private void registerTopic(StringBuilder consecutiveStatements, SqlBaseParser.Si
consecutiveStatements.append(statementText);
}

private void runScript(SqlBaseParser.SingleStatementContext statementContext, String statementText) throws IOException {
private void runScript(
SqlBaseParser.SingleStatementContext statementContext,
String statementText
) throws IOException {
SqlBaseParser.RunScriptContext runScriptContext =
(SqlBaseParser.RunScriptContext) statementContext.statement();
String schemaFilePath = AstBuilder.unquote(runScriptContext.STRING().getText(), "'");
String fileContent;
try {
fileContent = new String(Files.readAllBytes(Paths.get(schemaFilePath)), StandardCharsets.UTF_8);
fileContent = new String(
Files.readAllBytes(Paths.get(schemaFilePath)),
StandardCharsets.UTF_8
);
} catch (IOException e) {
throw new KsqlException(" Could not read statements from file: " + schemaFilePath + ". "
+ "Details: " + e.getMessage(), e);
throw new KsqlException(
" Could not read statements from file: " + schemaFilePath + ". " + "Details: "
+ e.getMessage(),
e
);
}
setProperty(DdlConfig.SCHEMA_FILE_CONTENT_PROPERTY, fileContent);
printKsqlResponse(
restClient.makeKsqlRequest(statementText)
);
}

private StringBuilder unsetProperty(StringBuilder consecutiveStatements, SqlBaseParser.SingleStatementContext statementContext) throws IOException {
private StringBuilder unsetProperty(
StringBuilder consecutiveStatements,
SqlBaseParser.SingleStatementContext statementContext
) throws IOException {
if (consecutiveStatements.length() != 0) {
printKsqlResponse(
restClient.makeKsqlRequest(consecutiveStatements.toString())
Expand All @@ -345,7 +377,11 @@ private StringBuilder unsetProperty(StringBuilder consecutiveStatements, SqlBase
return consecutiveStatements;
}

private StringBuilder printOrDisplayQueryResults(StringBuilder consecutiveStatements, SqlBaseParser.SingleStatementContext statementContext, String statementText) throws IOException, InterruptedException, ExecutionException {
private StringBuilder printOrDisplayQueryResults(
StringBuilder consecutiveStatements,
SqlBaseParser.SingleStatementContext statementContext,
String statementText
) throws IOException, InterruptedException, ExecutionException {
if (consecutiveStatements.length() != 0) {
printKsqlResponse(
restClient.makeKsqlRequest(consecutiveStatements.toString())
Expand Down Expand Up @@ -373,7 +409,7 @@ private void listProperties(String statementText) throws IOException {
PropertiesList propertiesList = (PropertiesList) ksqlEntityList.get(0);
propertiesList.getProperties().putAll(restClient.getLocalProperties());
terminal.printKsqlEntityList(
Collections.singletonList(propertiesList)
Collections.singletonList(propertiesList)
);
}

Expand All @@ -388,7 +424,10 @@ private void printKsqlResponse(RestResponse<KsqlEntityList> response) throws IOE
LOGGER.error(errorMsg.getErrorMessage().getMessage());
noErrorFromServer = false;
} else if (entity instanceof CommandStatusEntity &&
(((CommandStatusEntity) entity).getCommandStatus().getStatus() == CommandStatus.Status.ERROR)) {
(
((CommandStatusEntity) entity).getCommandStatus().getStatus()
== CommandStatus.Status.ERROR
)) {
String fullMessage = ((CommandStatusEntity) entity).getCommandStatus().getMessage();
terminal.printError(fullMessage.split("\n")[0], fullMessage);
noErrorFromServer = false;
Expand Down Expand Up @@ -468,7 +507,10 @@ private void handlePrintedTopic(String printTopic)
restClient.makePrintTopicRequest(printTopic);

if (topicResponse.isSuccessful()) {
try (Scanner topicStreamScanner = new Scanner(topicResponse.getResponse(), StandardCharsets.UTF_8.name())) {
try (Scanner topicStreamScanner = new Scanner(
topicResponse.getResponse(),
StandardCharsets.UTF_8.name()
)) {
Future<?> topicPrintFuture = queryStreamExecutorService.submit(() -> {
while (topicStreamScanner.hasNextLine()) {
String line = topicStreamScanner.nextLine();
Expand Down Expand Up @@ -512,8 +554,7 @@ private void setProperty(String property, String value) {
parsedProperty = property;
} else if (property.startsWith(StreamsConfig.CONSUMER_PREFIX)) {
parsedProperty = property.substring(StreamsConfig.CONSUMER_PREFIX.length());
ConfigDef.ConfigKey configKey =
CONSUMER_CONFIG_DEF.configKeys().get(parsedProperty);
ConfigDef.ConfigKey configKey = CONSUMER_CONFIG_DEF.configKeys().get(parsedProperty);
if (configKey == null) {
throw new IllegalArgumentException(String.format(
"Invalid consumer property: '%s'",
Expand All @@ -523,8 +564,7 @@ private void setProperty(String property, String value) {
type = configKey.type;
} else if (property.startsWith(StreamsConfig.PRODUCER_PREFIX)) {
parsedProperty = property.substring(StreamsConfig.PRODUCER_PREFIX.length());
ConfigDef.ConfigKey configKey =
PRODUCER_CONFIG_DEF.configKeys().get(parsedProperty);
ConfigDef.ConfigKey configKey = PRODUCER_CONFIG_DEF.configKeys().get(parsedProperty);
if (configKey == null) {
throw new IllegalArgumentException(String.format(
"Invalid producer property: '%s'",
Expand Down Expand Up @@ -595,5 +635,4 @@ private static ConfigDef getConfigDef(Class<? extends AbstractConfig> classs) {
return null;
}
}

}
}
36 changes: 23 additions & 13 deletions ksql-cli/src/main/java/io/confluent/ksql/cli/RemoteCli.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@

package io.confluent.ksql.cli;

import io.confluent.ksql.rest.client.KsqlRestClient;
import java.io.PrintWriter;

import javax.ws.rs.ProcessingException;

import io.confluent.ksql.cli.console.CliSpecificCommand;
import io.confluent.ksql.cli.console.Console;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.client.exception.KsqlRestClientException;
import io.confluent.ksql.util.CommonUtils;

import javax.ws.rs.ProcessingException;

import java.io.PrintWriter;

public class RemoteCli extends Cli {

public RemoteCli(
Expand All @@ -44,20 +44,28 @@ public RemoteCli(

validateClient(terminal.writer(), restClient);

terminal.registerCliSpecificCommand(new RemoteCliSpecificCommand(restClient, terminal.writer()));
terminal.registerCliSpecificCommand(new RemoteCliSpecificCommand(
restClient,
terminal.writer()
));
}

// Visible for testing
public boolean hasUserCredentials() {
return restClient.hasUserCredentials();
}

private static void validateClient(final PrintWriter writer,
final KsqlRestClient restClient) {
private static void validateClient(
final PrintWriter writer,
final KsqlRestClient restClient
) {
try {
RestResponse restResponse = restClient.makeRootRequest();
if (restResponse.isErroneous()) {
writer.format("Couldn't connect to the KSQL server: %s\n\n", restResponse.getErrorMessage().getMessage());
writer.format(
"Couldn't connect to the KSQL server: %s\n\n",
restResponse.getErrorMessage().getMessage()
);
}
} catch (IllegalArgumentException exception) {
writer.println("Server URL must begin with protocol (e.g., http:// or https://)");
Expand All @@ -76,12 +84,14 @@ private static void validateClient(final PrintWriter writer,
}

static class RemoteCliSpecificCommand implements CliSpecificCommand {

private final KsqlRestClient restClient;
private final PrintWriter writer;

RemoteCliSpecificCommand(final KsqlRestClient restClient,
final PrintWriter writer) {

RemoteCliSpecificCommand(
final KsqlRestClient restClient,
final PrintWriter writer
) {
this.writer = writer;
this.restClient = restClient;
}
Expand All @@ -96,7 +106,7 @@ public void printHelp() {
writer.println("\tserver: Show the current server");
writer.println("\tserver <server>: Change the current server to <server>");
writer.println("\t example: "
+ "\"server http://my.awesome.server.com:9098\""
+ "\"server http://my.awesome.server.com:9098\""
);
}

Expand Down
Loading

0 comments on commit 14994e5

Please sign in to comment.