Skip to content

Commit

Permalink
[KYUUBI apache#5080][FLINK] Fix EmbeddedExecutorFactory not thread-sa…
Browse files Browse the repository at this point in the history
…fe during bootstrap

### _Why are the changes needed?_
As titled.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

Closes apache#5082 from link3280/KYUUBI-5080.

Closes apache#5080

e8026b8 [Paul Lin] [KYUUBI apache#4806][FLINK] Improve logs
fd78f32 [Paul Lin] [KYUUBI apache#4806][FLINK] Fix gateway NPE
a0a7c44 [Cheng Pan] Update externals/kyuubi-flink-sql-engine/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
50830d4 [Paul Lin] [KYUUBI apache#5080][FLINK] Fix EmbeddedExecutorFactory not thread-safe during bootstrap

Lead-authored-by: Paul Lin <paullin3280@gmail.com>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
2 people authored and zhaohehuhu committed Jul 24, 2023
1 parent 07216c0 commit 89fdc7f
Showing 1 changed file with 39 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public class EmbeddedExecutorFactory implements PipelineExecutorFactory {

private static ScheduledExecutor retryExecutor;

private static final Object bootstrapLock = new Object();

private static final long BOOTSTRAP_WAIT_INTERVAL = 10_000L;

private static final int BOOTSTRAP_WAIT_RETRIES = 3;

private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedExecutorFactory.class);

public EmbeddedExecutorFactory() {
Expand Down Expand Up @@ -79,13 +85,17 @@ public EmbeddedExecutorFactory(
checkState(EmbeddedExecutorFactory.submittedJobIds == null);
checkState(EmbeddedExecutorFactory.dispatcherGateway == null);
checkState(EmbeddedExecutorFactory.retryExecutor == null);
// submittedJobIds would be always 1, because we create a new list to avoid concurrent access
// issues
EmbeddedExecutorFactory.submittedJobIds =
new ConcurrentLinkedQueue<>(checkNotNull(submittedJobIds));
EmbeddedExecutorFactory.bootstrapJobIds = submittedJobIds;
EmbeddedExecutorFactory.dispatcherGateway = checkNotNull(dispatcherGateway);
EmbeddedExecutorFactory.retryExecutor = checkNotNull(retryExecutor);
synchronized (bootstrapLock) {
// submittedJobIds would be always 1, because we create a new list to avoid concurrent access
// issues
LOGGER.debug("Bootstrapping EmbeddedExecutorFactory.");
EmbeddedExecutorFactory.submittedJobIds =
new ConcurrentLinkedQueue<>(checkNotNull(submittedJobIds));
EmbeddedExecutorFactory.bootstrapJobIds = submittedJobIds;
EmbeddedExecutorFactory.dispatcherGateway = checkNotNull(dispatcherGateway);
EmbeddedExecutorFactory.retryExecutor = checkNotNull(retryExecutor);
bootstrapLock.notifyAll();
}
}

@Override
Expand All @@ -96,7 +106,7 @@ public String getName() {
@Override
public boolean isCompatibleWith(final Configuration configuration) {
// override Flink's implementation to allow usage in Kyuubi
LOGGER.debug("matching execution target: {}", configuration.get(DeploymentOptions.TARGET));
LOGGER.debug("Matching execution target: {}", configuration.get(DeploymentOptions.TARGET));
return configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase("yarn-application")
&& configuration.toMap().getOrDefault("yarn.tags", "").toLowerCase().contains("kyuubi");
}
Expand All @@ -105,11 +115,30 @@ public boolean isCompatibleWith(final Configuration configuration) {
public PipelineExecutor getExecutor(final Configuration configuration) {
checkNotNull(configuration);
Collection<JobID> executorJobIDs;
synchronized (bootstrapLock) {
// wait in a loop to avoid spurious wakeups
int retry = 0;
while (bootstrapJobIds == null && retry < BOOTSTRAP_WAIT_RETRIES) {
try {
LOGGER.debug("Waiting for bootstrap to complete. Wait retries: {}.", retry);
bootstrapLock.wait(BOOTSTRAP_WAIT_INTERVAL);
retry++;
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while waiting for bootstrap.", e);
}
}
if (bootstrapJobIds == null) {
throw new RuntimeException(
"Bootstrap of Flink SQL engine timed out after "
+ BOOTSTRAP_WAIT_INTERVAL * BOOTSTRAP_WAIT_RETRIES
+ " ms. Please check the engine log for more details.");
}
}
if (bootstrapJobIds.size() > 0) {
LOGGER.info("Submitting new Kyuubi job. Job already submitted: {}.", submittedJobIds.size());
LOGGER.info("Submitting new Kyuubi job. Job submitted: {}.", submittedJobIds.size());
executorJobIDs = submittedJobIds;
} else {
LOGGER.info("Bootstrapping Flink SQL engine.");
LOGGER.info("Bootstrapping Flink SQL engine with the initial SQL.");
executorJobIDs = bootstrapJobIds;
}
return new EmbeddedExecutor(
Expand Down

0 comments on commit 89fdc7f

Please sign in to comment.