Skip to content

Commit

Permalink
Do not create new Executor everytime createRunner (#32272)
Browse files Browse the repository at this point in the history
* Do not create new Executor everytime createRunner

* reset executorService after shutdown

* Switch to use newScheduledThreadPool; guard ses with AtomicReference

* Partially revert changes on flink and samza runner
  • Loading branch information
Abacn committed Aug 21, 2024
1 parent 512b52a commit ed4c03e
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.joda.time.Duration;
Expand Down Expand Up @@ -126,7 +127,11 @@ public void initializeState(StateInitializationContext context) throws Exception
// this will implicitly be keyed like the StateInternalsFactory
TimerInternalsFactory<byte[]> timerInternalsFactory = key -> timerInternals;

executorService = Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory());
if (this.executorService == null) {
this.executorService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("flink-sdf-executor-%d").build());
}

((ProcessFn) doFn).setStateInternalsFactory(stateInternalsFactory);
((ProcessFn) doFn).setTimerInternalsFactory(timerInternalsFactory);
Expand Down Expand Up @@ -191,10 +196,12 @@ public void close() throws Exception {
"The scheduled executor service did not properly terminate. Shutting "
+ "it down now.");
executorService.shutdownNow();
executorService = null;
}
} catch (InterruptedException e) {
LOG.debug("Could not properly await the termination of the scheduled executor service.", e);
executorService.shutdownNow();
executorService = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.KeyedWorkItem;
Expand Down Expand Up @@ -56,6 +58,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.joda.time.Duration;
import org.joda.time.Instant;

Expand Down Expand Up @@ -115,7 +118,10 @@ private static class ProcessFnExtractor implements UserParDoFnFactory.DoFnExtrac
private static class SplittableDoFnRunnerFactory<
InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>
implements DoFnRunnerFactory<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> {
private final AtomicReference<ScheduledExecutorService> ses = new AtomicReference<>();

@Override
@SuppressWarnings("nullness") // nullable atomic reference guaranteed nonnull when get
public DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> createRunner(
DoFn<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> fn,
PipelineOptions options,
Expand All @@ -131,6 +137,13 @@ public DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> crea
OutputManager outputManager,
DoFnSchemaInformation doFnSchemaInformation,
Map<String, PCollectionView<?>> sideInputMapping) {
if (this.ses.get() == null) {
this.ses.compareAndSet(
null,
Executors.newScheduledThreadPool(
Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder().setNameFormat("df-sdf-executor-%d").build()));
}
ProcessFn<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> processFn =
(ProcessFn<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>) fn;
processFn.setStateInternalsFactory(key -> (StateInternals) stepContext.stateInternals());
Expand Down Expand Up @@ -162,7 +175,7 @@ public <T> void outputWindowedValue(
}
},
sideInputReader,
Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
ses.get(),
// Commit at least once every 10 seconds or 10k records. This keeps the watermark
// advancing smoothly, and ensures that not too much work will have to be reprocessed
// in the event of a crash.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.KeyedWorkItem;
Expand Down Expand Up @@ -49,9 +50,11 @@
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.samza.config.Config;
import org.apache.samza.context.Context;
import org.apache.samza.operators.Scheduler;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand Down Expand Up @@ -81,6 +84,7 @@ public class SplittableParDoProcessKeyedElementsOp<
private transient SamzaTimerInternalsFactory<byte[]> timerInternalsFactory;
private transient DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> fnRunner;
private transient SamzaPipelineOptions pipelineOptions;
private transient @MonotonicNonNull ScheduledExecutorService ses = null;

public SplittableParDoProcessKeyedElementsOp(
TupleTag<OutputT> mainOutputTag,
Expand Down Expand Up @@ -137,6 +141,12 @@ public void open(
isBounded,
pipelineOptions);

if (this.ses == null) {
this.ses =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("samza-sdf-executor-%d").build());
}

final KeyedInternals<byte[]> keyedInternals =
new KeyedInternals<>(stateInternalsFactory, timerInternalsFactory);

Expand Down Expand Up @@ -172,7 +182,7 @@ public <AdditionalOutputT> void outputWindowedValue(
}
},
NullSideInputReader.empty(),
Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
ses,
10000,
Duration.standardSeconds(10),
() -> {
Expand Down

0 comments on commit ed4c03e

Please sign in to comment.