Skip to content

Commit

Permalink
[Spark Runner] Add new experiment that provides concurrent bounded ou…
Browse files Browse the repository at this point in the history
…tput for SDFs (resolves #23852) (#24837)

Co-authored-by: Moritz Mack <mmack@talend.com>
Co-authored-by: Jan Lukavský <je.ik@seznam.cz>
  • Loading branch information
3 people committed Feb 2, 2023
1 parent d7264d4 commit 01aa470
Show file tree
Hide file tree
Showing 9 changed files with 754 additions and 178 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
present in 2.43.0 (up to 1.8.0_342, 11.0.16, 17.0.2 for respective Java versions). This is accompanied
by an explicit re-enabling of TLSv1 and TLSv1.1 for Java 8 and Java 11.
* Add UDF metrics support for Samza portable mode.
* Option for SparkRunner to avoid the need of SDF output to fit in memory ([#23852](https://github.com/apache/beam/issues/23852)).
This helps e.g. with ParquetIO reads. Turn the feature on by adding experiment `use_bounded_concurrent_output_for_sdf`.

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public PCollectionTuple expand(PCollection<KV<byte[], KV<InputT, RestrictionT>>>
}
}

static class NaiveProcessFn<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>
public static class NaiveProcessFn<
InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>
extends DoFn<KV<InputT, RestrictionT>, OutputT> {
private final DoFn<InputT, OutputT> fn;
private final Map<String, PCollectionView<?>> sideInputMapping;
Expand Down
41 changes: 29 additions & 12 deletions runners/spark/spark_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -245,22 +245,22 @@ hadoopVersions.each { kv ->
}
}

def validatesRunnerBatch = tasks.register("validatesRunnerBatch", Test) {
def applyBatchValidatesRunnerSetup = { Test it ->
group = "Verification"
// Disable gradle cache
outputs.upToDateWhen { false }
systemProperties sparkTestProperties(["--enableSparkMetricSinks":"false"])
jvmArgs += sparkTestJvmArgs()
jvmArgs '-Xmx3g'

classpath = configurations.validatesRunner
testClassesDirs = files(
project(":sdks:java:core").sourceSets.test.output.classesDirs,
project(":runners:core-java").sourceSets.test.output.classesDirs,
it.outputs.upToDateWhen { false }
it.jvmArgs += sparkTestJvmArgs()
it.jvmArgs '-Xmx3g'

it.classpath = project.configurations.validatesRunner
it.testClassesDirs = project.files(
project(":sdks:java:core").sourceSets.test.output.classesDirs,
project(":runners:core-java").sourceSets.test.output.classesDirs,
)

maxParallelForks 4
useJUnit {
it.maxParallelForks 4

it.useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
Expand Down Expand Up @@ -288,6 +288,22 @@ def validatesRunnerBatch = tasks.register("validatesRunnerBatch", Test) {
}
}

def validatesRunnerBatch = tasks.register("validatesRunnerBatch", Test) {
applyBatchValidatesRunnerSetup(it)
systemProperties sparkTestProperties(["--enableSparkMetricSinks":"false"])
}

def validatesRunnerBatchWithBoundedSDFExperiment = tasks.register("validatesRunnerBatchWithBoundedSDFExperiment", Test) {
applyBatchValidatesRunnerSetup(it)
systemProperties sparkTestProperties(["--enableSparkMetricSinks":"false", "--experiments":"use_bounded_concurrent_output_for_sdf"])

// experiment concerns only SDF implementation, therefore avoid needlessly rerunning other tests
it.filter {
includeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest'
}
}


def validatesRunnerStreaming = tasks.register("validatesRunnerStreaming", Test) {
group = "Verification"
// Disable gradle cache
Expand Down Expand Up @@ -413,6 +429,7 @@ tasks.register("validatesRunner") {
group = "Verification"
description "Validates Spark runner"
dependsOn validatesRunnerBatch
dependsOn validatesRunnerBatchWithBoundedSDFExperiment
dependsOn validatesRunnerStreaming
// It should be uncommented once all "validatesStructuredStreamingRunnerBatch" tests will pass.
// Otherwise, it breaks Spark runner ValidatesRunner tests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.LinkedListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.util.AccumulatorV2;
import scala.Tuple2;
Expand Down Expand Up @@ -80,6 +77,7 @@ public class MultiDoFnFunction<InputT, OutputT>
private final boolean stateful;
private final DoFnSchemaInformation doFnSchemaInformation;
private final Map<String, PCollectionView<?>> sideInputMapping;
private final boolean useBoundedConcurrentOutput;

/**
* @param metricsAccum The Spark {@link AccumulatorV2} that backs the Beam metrics.
Expand All @@ -92,6 +90,7 @@ public class MultiDoFnFunction<InputT, OutputT>
* @param sideInputs Side inputs used in this {@link DoFn}.
* @param windowingStrategy Input {@link WindowingStrategy}.
* @param stateful Stateful {@link DoFn}.
* @param useBoundedConcurrentOutput If it should use bounded output for processing.
*/
public MultiDoFnFunction(
MetricsContainerStepMapAccumulator metricsAccum,
Expand All @@ -106,7 +105,8 @@ public MultiDoFnFunction(
WindowingStrategy<?, ?> windowingStrategy,
boolean stateful,
DoFnSchemaInformation doFnSchemaInformation,
Map<String, PCollectionView<?>> sideInputMapping) {
Map<String, PCollectionView<?>> sideInputMapping,
boolean useBoundedConcurrentOutput) {
this.metricsAccum = metricsAccum;
this.stepName = stepName;
this.doFn = SerializableUtils.clone(doFn);
Expand All @@ -120,17 +120,29 @@ public MultiDoFnFunction(
this.stateful = stateful;
this.doFnSchemaInformation = doFnSchemaInformation;
this.sideInputMapping = sideInputMapping;
this.useBoundedConcurrentOutput = useBoundedConcurrentOutput;
}

@Override
public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> call(Iterator<WindowedValue<InputT>> iter)
throws Exception {
if (!wasSetupCalled && iter.hasNext()) {
DoFnInvokers.tryInvokeSetupFor(doFn, options.get());
wasSetupCalled = true;

if (iter.hasNext()) {
if (!wasSetupCalled) {
DoFnInvokers.tryInvokeSetupFor(doFn, options.get());
wasSetupCalled = true;
}
} else {
// empty bundle
return Collections.emptyIterator();
}

DoFnOutputManager outputManager = new DoFnOutputManager();
SparkInputDataProcessor<InputT, OutputT, Tuple2<TupleTag<?>, WindowedValue<?>>> processor;
if (useBoundedConcurrentOutput) {
processor = SparkInputDataProcessor.createBounded();
} else {
processor = SparkInputDataProcessor.createUnbounded();
}

final InMemoryTimerInternals timerInternals;
final StepContext context;
Expand Down Expand Up @@ -159,15 +171,15 @@ public TimerInternals timerInternals() {
};
} else {
timerInternals = null;
context = new SparkProcessContext.NoOpStepContext();
context = new SparkNoOpStepContext();
}

final DoFnRunner<InputT, OutputT> doFnRunner =
DoFnRunners.simpleRunner(
options.get(),
doFn,
CachedSideInputReader.of(new SparkSideInputReader(sideInputs)),
outputManager,
processor.getOutputManager(),
mainOutputTag,
additionalOutputTags,
context,
Expand All @@ -180,14 +192,15 @@ public TimerInternals timerInternals() {
DoFnRunnerWithMetrics<InputT, OutputT> doFnRunnerWithMetrics =
new DoFnRunnerWithMetrics<>(stepName, doFnRunner, metricsAccum);

return new SparkProcessContext<>(
SparkProcessContext<Object, InputT, OutputT> ctx =
new SparkProcessContext<>(
stepName,
doFn,
doFnRunnerWithMetrics,
outputManager,
key,
stateful ? new TimerDataIterator(timerInternals) : Collections.emptyIterator())
.processPartition(iter)
.iterator();
stateful ? new TimerDataIterator(timerInternals) : Collections.emptyIterator());

return processor.createOutputIterator(iter, ctx);
}

private static class TimerDataIterator implements Iterator<TimerInternals.TimerData> {
Expand Down Expand Up @@ -238,29 +251,16 @@ public void remove() {
}
}

private class DoFnOutputManager
implements SparkProcessContext.SparkOutputManager<Tuple2<TupleTag<?>, WindowedValue<?>>> {

private final Multimap<TupleTag<?>, WindowedValue<?>> outputs = LinkedListMultimap.create();

@Override
public void clear() {
outputs.clear();
}
private static class SparkNoOpStepContext implements StepContext {

@Override
public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> iterator() {
Iterator<Map.Entry<TupleTag<?>, WindowedValue<?>>> entryIter = outputs.entries().iterator();
return Iterators.transform(entryIter, this.entryToTupleFn());
}

private <K, V> Function<Map.Entry<K, V>, Tuple2<K, V>> entryToTupleFn() {
return en -> new Tuple2<>(en.getKey(), en.getValue());
public StateInternals stateInternals() {
throw new UnsupportedOperationException("stateInternals not supported");
}

@Override
public synchronized <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
outputs.put(tag, output);
public TimerInternals timerInternals() {
throw new UnsupportedOperationException("timerInternals not supported");
}
}
}
Loading

0 comments on commit 01aa470

Please sign in to comment.