Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable async processing for SDF on Spark runner #23852 #24837

Merged
merged 13 commits into from
Feb 2, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public PCollectionTuple expand(PCollection<KV<byte[], KV<InputT, RestrictionT>>>
}
}

static class NaiveProcessFn<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>
public static class NaiveProcessFn<
InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>
extends DoFn<KV<InputT, RestrictionT>, OutputT> {
private final DoFn<InputT, OutputT> fn;
private final Map<String, PCollectionView<?>> sideInputMapping;
Expand Down
41 changes: 29 additions & 12 deletions runners/spark/spark_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -245,22 +245,22 @@ hadoopVersions.each { kv ->
}
}

def validatesRunnerBatch = tasks.register("validatesRunnerBatch", Test) {
def applyBatchValidatesRunnerSetup = { Test it ->
group = "Verification"
mosche marked this conversation as resolved.
Show resolved Hide resolved
// Disable gradle cache
outputs.upToDateWhen { false }
systemProperties sparkTestProperties(["--enableSparkMetricSinks":"false"])
jvmArgs += sparkTestJvmArgs()
jvmArgs '-Xmx3g'

classpath = configurations.validatesRunner
testClassesDirs = files(
project(":sdks:java:core").sourceSets.test.output.classesDirs,
project(":runners:core-java").sourceSets.test.output.classesDirs,
it.outputs.upToDateWhen { false }
it.jvmArgs += sparkTestJvmArgs()
it.jvmArgs '-Xmx3g'

it.classpath = project.configurations.validatesRunner
it.testClassesDirs = project.files(
project(":sdks:java:core").sourceSets.test.output.classesDirs,
project(":runners:core-java").sourceSets.test.output.classesDirs,
)

maxParallelForks 4
useJUnit {
it.maxParallelForks 4

it.useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
Expand Down Expand Up @@ -288,6 +288,22 @@ def validatesRunnerBatch = tasks.register("validatesRunnerBatch", Test) {
}
}

def validatesRunnerBatch = tasks.register("validatesRunnerBatch", Test) {
applyBatchValidatesRunnerSetup(it)
systemProperties sparkTestProperties(["--enableSparkMetricSinks":"false"])
}

def validatesRunnerBatchWithBoundedSDFExperiment = tasks.register("validatesRunnerBatchWithBoundedSDFExperiment", Test) {
applyBatchValidatesRunnerSetup(it)
systemProperties sparkTestProperties(["--enableSparkMetricSinks":"false", "--experiments":"use_bounded_output_for_sdf"])

// experiment concerns only SDF implementation, therefore avoid needlessly rerunning other tests
it.filter {
includeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest'
}
}


def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test) {
group = "Verification"
// Disable gradle cache
Expand Down Expand Up @@ -413,6 +429,7 @@ tasks.register("validatesRunner") {
group = "Verification"
description "Validates Spark runner"
dependsOn validatesRunnerBatch
dependsOn validatesRunnerBatchWithBoundedSDFExperiment
dependsOn validatesRunnerStreaming
// It should be uncommented once all "validatesStructuredStreamingRunnerBatch" tests will pass.
// Otherwise, it breaks Spark runner ValidatesRunner tests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.LinkedListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.util.AccumulatorV2;
import scala.Tuple2;
Expand Down Expand Up @@ -80,6 +77,7 @@ public class MultiDoFnFunction<InputT, OutputT>
private final boolean stateful;
private final DoFnSchemaInformation doFnSchemaInformation;
private final Map<String, PCollectionView<?>> sideInputMapping;
private final boolean useBoundedConcurrentOutput;

/**
* @param metricsAccum The Spark {@link AccumulatorV2} that backs the Beam metrics.
Expand All @@ -92,6 +90,7 @@ public class MultiDoFnFunction<InputT, OutputT>
* @param sideInputs Side inputs used in this {@link DoFn}.
* @param windowingStrategy Input {@link WindowingStrategy}.
* @param stateful Stateful {@link DoFn}.
* @param useBoundedConcurrentOutput If it should use bounded output for processing.
*/
public MultiDoFnFunction(
MetricsContainerStepMapAccumulator metricsAccum,
Expand All @@ -106,7 +105,8 @@ public MultiDoFnFunction(
WindowingStrategy<?, ?> windowingStrategy,
boolean stateful,
DoFnSchemaInformation doFnSchemaInformation,
Map<String, PCollectionView<?>> sideInputMapping) {
Map<String, PCollectionView<?>> sideInputMapping,
boolean useBoundedConcurrentOutput) {
this.metricsAccum = metricsAccum;
this.stepName = stepName;
this.doFn = SerializableUtils.clone(doFn);
Expand All @@ -120,17 +120,29 @@ public MultiDoFnFunction(
this.stateful = stateful;
this.doFnSchemaInformation = doFnSchemaInformation;
this.sideInputMapping = sideInputMapping;
this.useBoundedConcurrentOutput = useBoundedConcurrentOutput;
}

@Override
public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> call(Iterator<WindowedValue<InputT>> iter)
throws Exception {
if (!wasSetupCalled && iter.hasNext()) {
DoFnInvokers.tryInvokeSetupFor(doFn, options.get());
wasSetupCalled = true;

if (iter.hasNext()) {
if (!wasSetupCalled) {
DoFnInvokers.tryInvokeSetupFor(doFn, options.get());
wasSetupCalled = true;
}
} else {
// empty bundle
return Collections.emptyIterator();
}

DoFnOutputManager outputManager = new DoFnOutputManager();
SparkInputDataProcessor<InputT, OutputT, Tuple2<TupleTag<?>, WindowedValue<?>>> processor;
mosche marked this conversation as resolved.
Show resolved Hide resolved
if (useBoundedConcurrentOutput) {
processor = SparkInputDataProcessor.createBounded();
} else {
processor = SparkInputDataProcessor.createUnbounded();
}

final InMemoryTimerInternals timerInternals;
final StepContext context;
Expand Down Expand Up @@ -159,15 +171,15 @@ public TimerInternals timerInternals() {
};
} else {
timerInternals = null;
context = new SparkProcessContext.NoOpStepContext();
context = new SparkNoOpStepContext();
}

final DoFnRunner<InputT, OutputT> doFnRunner =
DoFnRunners.simpleRunner(
options.get(),
doFn,
CachedSideInputReader.of(new SparkSideInputReader(sideInputs)),
outputManager,
processor.getOutputManager(),
mainOutputTag,
additionalOutputTags,
context,
Expand All @@ -180,14 +192,15 @@ public TimerInternals timerInternals() {
DoFnRunnerWithMetrics<InputT, OutputT> doFnRunnerWithMetrics =
new DoFnRunnerWithMetrics<>(stepName, doFnRunner, metricsAccum);

return new SparkProcessContext<>(
SparkProcessContext<Object, InputT, OutputT> ctx =
new SparkProcessContext<>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just throwing it out here as option to consider ... considering there's already a lot of moving pieces involved and the context has just become a container without functionality, everything in there except for the runner could already be passed to the processor when initializing it (some for the input iterator as well). The only thing that then has to be passed to process is the runner itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually can not be put into constructor, because processor is responsible for providing correct instance of output manager which is needed doFnRunner construction. Therefore I did choose to wrap these into container and pass to processor as context object.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because processor is responsible for providing correct instance of output manager which is needed doFnRunner construction

That's more or less what I meant, you can but everything into the constructor of the processor except the runner itself. E.g. process could look like this then:

processor.process(iter, doFnRunnerWithMetrics)

or even as below if you pass the input iterator into the constructor as well.

processor.process(doFnRunnerWithMetrics)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JozoVilcek Please feel free to discard or ignore! This code here is quickly hacked together and just meant to demonstrate what I had in mind above. SparkInputDataProcessor was replaced by SparkOutputManager, no SparkProcessContext and output iterator needed anymore.

stepName,
doFn,
doFnRunnerWithMetrics,
outputManager,
key,
stateful ? new TimerDataIterator(timerInternals) : Collections.emptyIterator())
.processPartition(iter)
.iterator();
stateful ? new TimerDataIterator(timerInternals) : Collections.emptyIterator());

return processor.createOutputIterator(iter, ctx);
}

private static class TimerDataIterator implements Iterator<TimerInternals.TimerData> {
Expand Down Expand Up @@ -238,29 +251,16 @@ public void remove() {
}
}

private class DoFnOutputManager
implements SparkProcessContext.SparkOutputManager<Tuple2<TupleTag<?>, WindowedValue<?>>> {

private final Multimap<TupleTag<?>, WindowedValue<?>> outputs = LinkedListMultimap.create();

@Override
public void clear() {
outputs.clear();
}
private static class SparkNoOpStepContext implements StepContext {

@Override
public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> iterator() {
Iterator<Map.Entry<TupleTag<?>, WindowedValue<?>>> entryIter = outputs.entries().iterator();
return Iterators.transform(entryIter, this.entryToTupleFn());
}

private <K, V> Function<Map.Entry<K, V>, Tuple2<K, V>> entryToTupleFn() {
return en -> new Tuple2<>(en.getKey(), en.getValue());
public StateInternals stateInternals() {
throw new UnsupportedOperationException("stateInternals not supported");
}

@Override
public synchronized <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
outputs.put(tag, output);
public TimerInternals timerInternals() {
throw new UnsupportedOperationException("timerInternals not supported");
}
}
}
Loading