Skip to content

Commit

Permalink
[KYUUBI apache#4806][FLINK] Improve logs
Browse files Browse the repository at this point in the history
  • Loading branch information
link3280 committed Jul 22, 2023
1 parent fd78f32 commit e8026b8
Showing 1 changed file with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public EmbeddedExecutorFactory(
synchronized (bootstrapLock) {
// submittedJobIds would be always 1, because we create a new list to avoid concurrent access
// issues
LOGGER.debug("Bootstrapping EmbeddedExecutorFactory.");
EmbeddedExecutorFactory.submittedJobIds =
new ConcurrentLinkedQueue<>(checkNotNull(submittedJobIds));
EmbeddedExecutorFactory.bootstrapJobIds = submittedJobIds;
Expand All @@ -105,7 +106,7 @@ public String getName() {
@Override
public boolean isCompatibleWith(final Configuration configuration) {
// override Flink's implementation to allow usage in Kyuubi
LOGGER.debug("matching execution target: {}", configuration.get(DeploymentOptions.TARGET));
LOGGER.debug("Matching execution target: {}", configuration.get(DeploymentOptions.TARGET));
return configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase("yarn-application")
&& configuration.toMap().getOrDefault("yarn.tags", "").toLowerCase().contains("kyuubi");
}
Expand All @@ -119,6 +120,7 @@ public PipelineExecutor getExecutor(final Configuration configuration) {
int retry = 0;
while (bootstrapJobIds == null && retry < BOOTSTRAP_WAIT_RETRIES) {
try {
LOGGER.debug("Waiting for bootstrap to complete. Wait retries: {}.", retry);
bootstrapLock.wait(BOOTSTRAP_WAIT_INTERVAL);
retry++;
} catch (InterruptedException e) {
Expand All @@ -129,14 +131,14 @@ public PipelineExecutor getExecutor(final Configuration configuration) {
throw new RuntimeException(
"Bootstrap of Flink SQL engine timed out after "
+ BOOTSTRAP_WAIT_INTERVAL * BOOTSTRAP_WAIT_RETRIES
+ ". Please check the engine log for more details.");
+ " ms. Please check the engine log for more details.");
}
}
if (bootstrapJobIds.size() > 0) {
LOGGER.info("Submitting new Kyuubi job. Job already submitted: {}.", submittedJobIds.size());
LOGGER.info("Submitting new Kyuubi job. Job submitted: {}.", submittedJobIds.size());
executorJobIDs = submittedJobIds;
} else {
LOGGER.info("Bootstrapping Flink SQL engine.");
LOGGER.info("Bootstrapping Flink SQL engine with the initial SQL.");
executorJobIDs = bootstrapJobIds;
}
return new EmbeddedExecutor(
Expand Down

0 comments on commit e8026b8

Please sign in to comment.