Skip to content

Commit

Permalink
[Dataflow Streaming] Move throwExceptionOnLargeOutput out of Operat…
Browse files Browse the repository at this point in the history
…ionalLimits (#32407)
  • Loading branch information
arunpandianp committed Sep 9, 2024
1 parent 2a7755b commit 7a88e6f
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,11 @@ public class OperationalLimits {
public final long maxOutputKeyBytes;
// Maximum size of a single output element's serialized value.
public final long maxOutputValueBytes;
// Whether to throw an exception when processing output that violates any of the given limits.
public final boolean throwExceptionOnLargeOutput;

OperationalLimits(
long maxWorkItemCommitBytes,
long maxOutputKeyBytes,
long maxOutputValueBytes,
boolean throwExceptionOnLargeOutput) {
OperationalLimits(long maxWorkItemCommitBytes, long maxOutputKeyBytes, long maxOutputValueBytes) {
this.maxWorkItemCommitBytes = maxWorkItemCommitBytes;
this.maxOutputKeyBytes = maxOutputKeyBytes;
this.maxOutputValueBytes = maxOutputValueBytes;
this.throwExceptionOnLargeOutput = throwExceptionOnLargeOutput;
}

@AutoBuilder(ofClass = OperationalLimits.class)
Expand All @@ -49,16 +42,13 @@ public interface Builder {

Builder setMaxOutputValueBytes(long bytes);

Builder setThrowExceptionOnLargeOutput(boolean shouldThrow);

OperationalLimits build();
}

public static Builder builder() {
return new AutoBuilder_OperationalLimits_Builder()
.setMaxWorkItemCommitBytes(Long.MAX_VALUE)
.setMaxOutputKeyBytes(Long.MAX_VALUE)
.setMaxOutputValueBytes(Long.MAX_VALUE)
.setThrowExceptionOnLargeOutput(false);
.setMaxOutputValueBytes(Long.MAX_VALUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o
config ->
onPipelineConfig(
config,
options,
dispatcherClient::consumeWindmillDispatcherEndpoints,
operationalLimits::set));
computationStateCache = computationStateCacheFactory.apply(configFetcher);
Expand Down Expand Up @@ -515,7 +514,6 @@ static StreamingDataflowWorker forTesting(
config ->
onPipelineConfig(
config,
options,
windmillServer::setWindmillServiceEndpoints,
operationalLimits::set))
: new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
Expand Down Expand Up @@ -598,7 +596,6 @@ static StreamingDataflowWorker forTesting(

private static void onPipelineConfig(
StreamingEnginePipelineConfig config,
DataflowWorkerHarnessOptions options,
Consumer<ImmutableSet<HostAndPort>> consumeWindmillServiceEndpoints,
Consumer<OperationalLimits> operationalLimits) {

Expand All @@ -607,8 +604,6 @@ private static void onPipelineConfig(
.setMaxWorkItemCommitBytes(config.maxWorkItemCommitBytes())
.setMaxOutputKeyBytes(config.maxOutputKeyBytes())
.setMaxOutputValueBytes(config.maxOutputValueBytes())
.setThrowExceptionOnLargeOutput(
DataflowRunner.hasExperiment(options, "throw_exceptions_on_large_output"))
.build());

if (!config.windmillServiceEndpoints().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
private final ImmutableMap<String, String> stateNameMap;
private final WindmillStateCache.ForComputation stateCache;
private final ReaderCache readerCache;
private final boolean throwExceptionOnLargeOutput;
private volatile long backlogBytes;

/**
Expand Down Expand Up @@ -152,7 +153,8 @@ public StreamingModeExecutionContext(
MetricsContainerRegistry<StreamingStepMetricsContainer> metricsContainerRegistry,
DataflowExecutionStateTracker executionStateTracker,
StreamingModeExecutionStateRegistry executionStateRegistry,
long sinkByteLimit) {
long sinkByteLimit,
boolean throwExceptionOnLargeOutput) {
super(
counterFactory,
metricsContainerRegistry,
Expand All @@ -165,6 +167,7 @@ public StreamingModeExecutionContext(
this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
this.stateCache = stateCache;
this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN;
this.throwExceptionOnLargeOutput = throwExceptionOnLargeOutput;
}

@VisibleForTesting
Expand All @@ -181,7 +184,7 @@ public long getMaxOutputValueBytes() {
}

public boolean throwExceptionsForLargeOutput() {
return operationalLimits.throwExceptionOnLargeOutput;
return throwExceptionOnLargeOutput;
}

public boolean workIsFailed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ final class ComputationWorkExecutorFactory {
private static final Logger LOG = LoggerFactory.getLogger(ComputationWorkExecutorFactory.class);
private static final String DISABLE_SINK_BYTE_LIMIT_EXPERIMENT =
"disable_limiting_bundle_sink_bytes";
// Whether to throw an exception when processing output that violates any of the operational
// limits.
private static final String THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT =
"throw_exceptions_on_large_output";

private final DataflowWorkerHarnessOptions options;
private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory;
Expand All @@ -90,6 +94,7 @@ final class ComputationWorkExecutorFactory {

private final long maxSinkBytes;
private final IdGenerator idGenerator;
private final boolean throwExceptionOnLargeOutput;

ComputationWorkExecutorFactory(
DataflowWorkerHarnessOptions options,
Expand All @@ -113,6 +118,8 @@ final class ComputationWorkExecutorFactory {
hasExperiment(options, DISABLE_SINK_BYTE_LIMIT_EXPERIMENT)
? Long.MAX_VALUE
: StreamingDataflowWorker.MAX_SINK_BYTES;
this.throwExceptionOnLargeOutput =
hasExperiment(options, THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT);
}

private static Nodes.ParallelInstructionNode extractReadNode(
Expand Down Expand Up @@ -255,7 +262,8 @@ private StreamingModeExecutionContext createExecutionContext(
stageInfo.metricsContainerRegistry(),
executionStateTracker,
stageInfo.executionStateRegistry(),
maxSinkBytes);
maxSinkBytes,
throwExceptionOnLargeOutput);
}

private DataflowMapTaskExecutor createMapTaskExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1280,13 +1280,9 @@ public void testOutputKeyTooLargeException() throws Exception {

StreamingDataflowWorker worker =
makeWorker(
defaultWorkerParams()
defaultWorkerParams("--experiments=throw_exceptions_on_large_output")
.setInstructions(instructions)
.setOperationalLimits(
OperationalLimits.builder()
.setMaxOutputKeyBytes(15)
.setThrowExceptionOnLargeOutput(true)
.build())
.setOperationalLimits(OperationalLimits.builder().setMaxOutputKeyBytes(15).build())
.build());
worker.start();

Expand Down Expand Up @@ -1317,13 +1313,10 @@ public void testOutputValueTooLargeException() throws Exception {

StreamingDataflowWorker worker =
makeWorker(
defaultWorkerParams()
defaultWorkerParams("--experiments=throw_exceptions_on_large_output")
.setInstructions(instructions)
.setOperationalLimits(
OperationalLimits.builder()
.setMaxOutputValueBytes(15)
.setThrowExceptionOnLargeOutput(true)
.build())
OperationalLimits.builder().setMaxOutputValueBytes(15).build())
.build());
worker.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public void setUp() {
PipelineOptionsFactory.create(),
"test-work-item-id"),
executionStateRegistry,
Long.MAX_VALUE);
Long.MAX_VALUE,
/*throwExceptionOnLargeOutput=*/ false);
}

private static Work createMockWork(Windmill.WorkItem workItem, Watermarks watermarks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,8 @@ public void testReadUnboundedReader() throws Exception {
PipelineOptionsFactory.create(),
"test-work-item-id"),
executionStateRegistry,
Long.MAX_VALUE);
Long.MAX_VALUE,
/*throwExceptionOnLargeOutput=*/ false);

options.setNumWorkers(5);
int maxElements = 10;
Expand Down Expand Up @@ -978,7 +979,8 @@ public void testFailedWorkItemsAbort() throws Exception {
PipelineOptionsFactory.create(),
"test-work-item-id"),
executionStateRegistry,
Long.MAX_VALUE);
Long.MAX_VALUE,
/*throwExceptionOnLargeOutput=*/ false);

options.setNumWorkers(5);
int maxElements = 100;
Expand Down

0 comments on commit 7a88e6f

Please sign in to comment.