From d6e4edbbbf464c4af3c32e6daa4948ab074e8c56 Mon Sep 17 00:00:00 2001 From: Sergei Lilichenko Date: Mon, 25 Mar 2024 08:59:04 -0700 Subject: [PATCH 1/7] Initial check-in of the ordered processing extension in Java. --- sdks/java/extensions/ordered/build.gradle | 33 + .../sdk/extensions/ordered/EventExaminer.java | 62 ++ .../sdk/extensions/ordered/MutableState.java | 43 ++ .../ordered/OrderedEventProcessor.java | 657 ++++++++++++++++++ .../ordered/OrderedEventProcessorResult.java | 107 +++ .../ordered/OrderedProcessingHandler.java | 112 +++ .../ordered/OrderedProcessingStatus.java | 130 ++++ .../extensions/ordered/ProcessingState.java | 345 +++++++++ .../extensions/ordered/UnprocessedEvent.java | 77 ++ .../sdk/extensions/ordered/package-info.java | 23 + .../beam/sdk/extensions/ordered/Event.java | 47 ++ .../ordered/OrderedEventProcessorTest.java | 605 ++++++++++++++++ .../StringBufferOrderedProcessingHandler.java | 42 ++ .../ordered/StringBuilderState.java | 142 ++++ .../ordered/StringEventExaminer.java | 46 ++ settings.gradle.kts | 1 + 16 files changed, 2472 insertions(+) create mode 100644 sdks/java/extensions/ordered/build.gradle create mode 100644 sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/EventExaminer.java create mode 100644 sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/MutableState.java create mode 100644 sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java create mode 100644 sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorResult.java create mode 100644 sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java create mode 100644 sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingStatus.java create mode 100644 sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessingState.java create mode 100644 sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/UnprocessedEvent.java create mode 100644 sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/package-info.java create mode 100644 sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/Event.java create mode 100644 sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java create mode 100644 sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBufferOrderedProcessingHandler.java create mode 100644 sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBuilderState.java create mode 100644 sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringEventExaminer.java diff --git a/sdks/java/extensions/ordered/build.gradle b/sdks/java/extensions/ordered/build.gradle new file mode 100644 index 0000000000000..c7872c33c4a33 --- /dev/null +++ b/sdks/java/extensions/ordered/build.gradle @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +plugins { id 'org.apache.beam.module' } +applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.sorter') + +description = "Apache Beam :: SDKs :: Java :: Extensions :: Ordered" + +dependencies { + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.slf4j_api + implementation library.java.joda_time + implementation library.java.joda_time + implementation library.java.vendored_guava_32_1_2_jre + testImplementation library.java.junit + testImplementation library.java.hamcrest + testImplementation project(path: ':sdks:java:core') + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") +} \ No newline at end of file diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/EventExaminer.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/EventExaminer.java new file mode 100644 index 0000000000000..b434269a475ab --- /dev/null +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/EventExaminer.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import java.io.Serializable; +import org.checkerframework.checker.nullness.qual.NonNull; + +/** + * Classes extending this interface will be called by {@link OrderedEventProcessor} to examine every + * incoming event. + * + * @param + * @param + */ +public interface EventExaminer> + extends Serializable { + + /** + * Is this event the first expected event for the given key and window? + * + * @param sequenceNumber the sequence number of the event as defined by the key of the input + * PCollection to {@link OrderedEventProcessor} + * @param event being processed + * @return true if this is the initial sequence. + */ + boolean isInitialEvent(long sequenceNumber, EventT event); + + /** + * If the event was the first event in the sequence, create the state to hold the required data + * needed for processing. This data will be persisted. + * + * @param event the first event in the sequence. + * @return the state to persist. + */ + @NonNull + StateT createStateOnInitialEvent(EventT event); + + /** + * Is this event the last expected event for a given key and window? + * + * @param sequenceNumber of the event + * @param event being processed + * @return true if the last event. There are cases where it's impossible to know whether it's the + * last event. True should be returned in those cases. + */ + boolean isLastEvent(long sequenceNumber, EventT event); +} diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/MutableState.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/MutableState.java new file mode 100644 index 0000000000000..13891cbaec728 --- /dev/null +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/MutableState.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import java.io.Serializable; + +/** Mutable state mutates when events apply to it. It is stored in a Beam state. */ +public interface MutableState extends Serializable { + + /** + * The interface assumes that events will mutate the state without the possibility of throwing an + * error. + * + *

TODO: this might be too simplistic and a mechanism for failure of applying the event to a + * state would need to be created. + * + * @param event to be processed + */ + void mutate(EventT event); + + /** + * This method is called after each state mutation. + * + * @return Result of the processing. Can be null if nothing needs to be output after this + * mutation. + */ + ResultT produceResult(); +} diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java new file mode 100644 index 0000000000000..5dcaa3f949ca3 --- /dev/null +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java @@ -0,0 +1,657 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import com.google.auto.value.AutoValue; +import java.util.Arrays; +import java.util.Iterator; +import javax.annotation.Nullable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.BooleanCoder; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.extensions.ordered.ProcessingState.ProcessingStateCoder; +import org.apache.beam.sdk.extensions.ordered.UnprocessedEvent.Reason; +import org.apache.beam.sdk.extensions.ordered.UnprocessedEvent.UnprocessedEventCoder; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.state.OrderedListState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Transform for processing ordered events. Events are grouped by the key and within each key they + * are applied according to the provided sequence. Events which arrive out of sequence are buffered + * and processed after all the missing events for a given key have arrived. + * + * @param + * @param + * @param + */ +@AutoValue +@SuppressWarnings({"nullness", "TypeNameShadowing"}) +public abstract class OrderedEventProcessor< + EventT, EventKeyT, ResultT, StateT extends MutableState> + extends PTransform< + PCollection>>, + OrderedEventProcessorResult> { + + public static < + EventTypeT, + EventKeyTypeT, + ResultTypeT, + StateTypeT extends MutableState> + OrderedEventProcessor create( + OrderedProcessingHandler handler) { + return new AutoValue_OrderedEventProcessor<>(handler); + } + + @Nullable + abstract OrderedProcessingHandler getHandler(); + + @Override + public OrderedEventProcessorResult expand( + PCollection>> input) { + final TupleTag> mainOutput = + new TupleTag>("mainOutput") {}; + final TupleTag> statusOutput = + new TupleTag>("status") {}; + + final TupleTag>>> unprocessedEventOutput = + new TupleTag>>>("unprocessed-events") {}; + + OrderedProcessingHandler handler = getHandler(); + Pipeline pipeline = input.getPipeline(); + + Coder keyCoder; + try { + keyCoder = handler.getKeyCoder(pipeline, input.getCoder()); + } catch (CannotProvideCoderException e) { + throw new RuntimeException("Unable to get key coder", e); + } + + Coder eventCoder; + try { + eventCoder = handler.getEventCoder(pipeline, input.getCoder()); + } catch (CannotProvideCoderException e) { + throw new RuntimeException("Unable to get event coder", e); + } + + Coder stateCoder; + try { + stateCoder = handler.getStateCoder(pipeline); + } catch (CannotProvideCoderException e) { + throw new RuntimeException("Unable to get state coder", e); + } + + Coder resultCoder; + try { + resultCoder = handler.getResultCoder(pipeline); + } catch (CannotProvideCoderException e) { + throw new RuntimeException("Unable to get result coder", e); + } + + PCollectionTuple processingResult = + input.apply( + ParDo.of( + new OrderedProcessorDoFn<>( + handler.getEventExaminer(), + eventCoder, + stateCoder, + keyCoder, + mainOutput, + statusOutput, + handler.getStatusUpdateFrequency(), + unprocessedEventOutput, + handler.isProduceStatusUpdateOnEveryEvent(), + input.isBounded() == IsBounded.BOUNDED + ? Integer.MAX_VALUE + : handler.getMaxOutputElementsPerBundle())) + .withOutputTags( + mainOutput, + TupleTagList.of(Arrays.asList(statusOutput, unprocessedEventOutput)))); + + KvCoder mainOutputCoder = KvCoder.of(keyCoder, resultCoder); + KvCoder processingStatusCoder = + KvCoder.of(keyCoder, getOrderedProcessingStatusCoder(pipeline)); + KvCoder>> unprocessedEventsCoder = + KvCoder.of( + keyCoder, KvCoder.of(VarLongCoder.of(), new UnprocessedEventCoder<>(eventCoder))); + return new OrderedEventProcessorResult<>( + pipeline, + processingResult.get(mainOutput).setCoder(mainOutputCoder), + mainOutput, + processingResult.get(statusOutput).setCoder(processingStatusCoder), + statusOutput, + processingResult.get(unprocessedEventOutput).setCoder(unprocessedEventsCoder), + unprocessedEventOutput); + } + + private static Coder getOrderedProcessingStatusCoder(Pipeline pipeline) { + SchemaRegistry schemaRegistry = pipeline.getSchemaRegistry(); + Coder result; + try { + result = + SchemaCoder.of( + schemaRegistry.getSchema(OrderedProcessingStatus.class), + TypeDescriptor.of(OrderedProcessingStatus.class), + schemaRegistry.getToRowFunction(OrderedProcessingStatus.class), + schemaRegistry.getFromRowFunction(OrderedProcessingStatus.class)); + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + return result; + } + + /** + * Main DoFn for processing ordered events. + * + * @param + * @param + * @param + */ + static class OrderedProcessorDoFn< + EventTypeT, + EventKeyTypeT, + ResultTypeT, + StateTypeT extends MutableState> + extends DoFn>, KV> { + + private static final Logger LOG = LoggerFactory.getLogger(OrderedProcessorDoFn.class); + + private static final String PROCESSING_STATE = "processingState"; + private static final String MUTABLE_STATE = "mutableState"; + private static final String BUFFERED_EVENTS = "bufferedEvents"; + private static final String STATUS_EMISSION_TIMER = "statusTimer"; + private static final String LARGE_BATCH_EMISSION_TIMER = "largeBatchTimer"; + private static final String WINDOW_CLOSED = "windowClosed"; + private final EventExaminer eventExaminer; + + @StateId(BUFFERED_EVENTS) + @SuppressWarnings("unused") + private final StateSpec> bufferedEventsSpec; + + @StateId(PROCESSING_STATE) + @SuppressWarnings("unused") + private final StateSpec>> processingStateSpec; + + @SuppressWarnings("unused") + @StateId(MUTABLE_STATE) + private final StateSpec> mutableStateSpec; + + @StateId(WINDOW_CLOSED) + @SuppressWarnings("unused") + private final StateSpec> windowClosedSpec; + + @TimerId(STATUS_EMISSION_TIMER) + @SuppressWarnings("unused") + private final TimerSpec statusEmissionTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @TimerId(LARGE_BATCH_EMISSION_TIMER) + @SuppressWarnings("unused") + private final TimerSpec largeBatchEmissionTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + private final TupleTag> statusTupleTag; + private final Duration statusUpdateFrequency; + + private final TupleTag> mainOutputTupleTag; + private final TupleTag>>> + unprocessedEventsTupleTag; + private final boolean produceStatusUpdateOnEveryEvent; + + private final long maxNumberOfResultsToProduce; + + private Long numberOfResultsBeforeBundleStart; + + /** + * Stateful DoFn to do the bulk of processing. + * + * @param eventExaminer + * @param eventCoder + * @param stateCoder + * @param keyCoder + * @param mainOutputTupleTag + * @param statusTupleTag + * @param statusUpdateFrequency + * @param unprocessedEventTupleTag + * @param produceStatusUpdateOnEveryEvent + * @param maxNumberOfResultsToProduce + */ + OrderedProcessorDoFn( + EventExaminer eventExaminer, + Coder eventCoder, + Coder stateCoder, + Coder keyCoder, + TupleTag> mainOutputTupleTag, + TupleTag> statusTupleTag, + Duration statusUpdateFrequency, + TupleTag>>> + unprocessedEventTupleTag, + boolean produceStatusUpdateOnEveryEvent, + long maxNumberOfResultsToProduce) { + this.eventExaminer = eventExaminer; + this.bufferedEventsSpec = StateSpecs.orderedList(eventCoder); + this.mutableStateSpec = StateSpecs.value(stateCoder); + this.processingStateSpec = StateSpecs.value(ProcessingStateCoder.of(keyCoder)); + this.windowClosedSpec = StateSpecs.value(BooleanCoder.of()); + this.mainOutputTupleTag = mainOutputTupleTag; + this.statusTupleTag = statusTupleTag; + this.unprocessedEventsTupleTag = unprocessedEventTupleTag; + this.statusUpdateFrequency = statusUpdateFrequency; + this.produceStatusUpdateOnEveryEvent = produceStatusUpdateOnEveryEvent; + this.maxNumberOfResultsToProduce = maxNumberOfResultsToProduce; + } + + @StartBundle + public void onBundleStart() { + numberOfResultsBeforeBundleStart = null; + } + + @FinishBundle + public void onBundleFinish() { + // This might be necessary because this field is also used in a Timer + numberOfResultsBeforeBundleStart = null; + } + + @ProcessElement + public void processElement( + @StateId(BUFFERED_EVENTS) OrderedListState bufferedEventsState, + @AlwaysFetched @StateId(PROCESSING_STATE) + ValueState> processingStateState, + @StateId(MUTABLE_STATE) ValueState mutableStateState, + @TimerId(STATUS_EMISSION_TIMER) Timer statusEmissionTimer, + @TimerId(LARGE_BATCH_EMISSION_TIMER) Timer largeBatchEmissionTimer, + @Element KV> eventAndSequence, + MultiOutputReceiver outputReceiver, + BoundedWindow window) { + + EventKeyTypeT key = eventAndSequence.getKey(); + long sequence = eventAndSequence.getValue().getKey(); + EventTypeT event = eventAndSequence.getValue().getValue(); + + ProcessingState processingState = processingStateState.read(); + + if (processingState == null) { + // This is the first time we see this key/window pair + processingState = new ProcessingState<>(key); + if (statusUpdateFrequency != null) { + // Set up the timer to produce the status of the processing on a regular basis + statusEmissionTimer.offset(statusUpdateFrequency).setRelative(); + } + } + + if (numberOfResultsBeforeBundleStart == null) { + // Per key processing is synchronized by Beam. There is no need to have it here. + numberOfResultsBeforeBundleStart = processingState.getResultCount(); + } + + processingState.recordReceived(); + + StateTypeT state = + processNewEvent( + sequence, + event, + processingState, + mutableStateState, + bufferedEventsState, + outputReceiver); + + processBufferedEvents( + processingState, state, bufferedEventsState, outputReceiver, largeBatchEmissionTimer); + + saveStates( + processingStateState, + processingState, + mutableStateState, + state, + outputReceiver, + window.maxTimestamp()); + + checkIfProcessingIsCompleted(processingState); + } + + private boolean checkIfProcessingIsCompleted(ProcessingState processingState) { + boolean result = processingState.isProcessingCompleted(); + if (result) { + LOG.info("Processing for key '" + processingState.getKey() + "' is completed."); + } + return result; + } + + private void saveStates( + ValueState> processingStatusState, + ProcessingState processingStatus, + ValueState currentStateState, + StateTypeT state, + MultiOutputReceiver outputReceiver, + Instant windowTimestamp) { + // There is always a change to the processing status + processingStatusState.write(processingStatus); + + // Stored state may not have changes if the element was out of sequence. + if (state != null) { + currentStateState.write(state); + } + + if (produceStatusUpdateOnEveryEvent) { + // During pipeline draining the window timestamp is set to a large value in the future. + // Producing an event before that results in error, that's why this logic exist. + Instant statusTimestamp = Instant.now(); + statusTimestamp = + statusTimestamp.isAfter(windowTimestamp) ? statusTimestamp : windowTimestamp; + + emitProcessingStatus(processingStatus, outputReceiver, statusTimestamp); + } + } + + private void emitProcessingStatus( + ProcessingState processingState, + MultiOutputReceiver outputReceiver, + Instant statusTimestamp) { + outputReceiver + .get(statusTupleTag) + .outputWithTimestamp( + KV.of( + processingState.getKey(), + OrderedProcessingStatus.create( + processingState.getLastOutputSequence(), + processingState.getBufferedRecordCount(), + processingState.getEarliestBufferedSequence(), + processingState.getLatestBufferedSequence(), + processingState.getRecordsReceived(), + processingState.getResultCount(), + processingState.getDuplicates(), + processingState.isLastEventReceived())), + statusTimestamp); + } + + /** + * Process the just received event. + * + * @return newly created or updated State. If null is returned - the event wasn't processed. + */ + private StateTypeT processNewEvent( + long currentSequence, + EventTypeT currentEvent, + ProcessingState processingState, + ValueState currentStateState, + OrderedListState bufferedEventsState, + MultiOutputReceiver outputReceiver) { + if (currentSequence == Long.MAX_VALUE) { + LOG.error( + "Received an event with " + + currentSequence + + " as the sequence number. " + + "It will be dropped because it needs to be less than Long.MAX_VALUE."); + return null; + } + + if (processingState.hasAlreadyBeenProcessed(currentSequence)) { + outputReceiver + .get(unprocessedEventsTupleTag) + .output( + KV.of( + processingState.getKey(), + KV.of( + currentSequence, UnprocessedEvent.create(currentEvent, Reason.duplicate)))); + return null; + } + + StateTypeT state; + boolean thisIsTheLastEvent = eventExaminer.isLastEvent(currentSequence, currentEvent); + if (eventExaminer.isInitialEvent(currentSequence, currentEvent)) { + // First event of the key/window + // What if it's a duplicate event - it will reset everything. Shall we drop/DLQ anything + // that's before the processingState.lastOutputSequence? + try { + state = eventExaminer.createStateOnInitialEvent(currentEvent); + } catch (Exception e) { + // TODO: Handle exception in a better way - DLQ. + // Initial state creator can be pretty heavy - remote calls, etc.. + throw new RuntimeException(e); + } + + processingState.eventAccepted(currentSequence, thisIsTheLastEvent); + + ResultTypeT result = state.produceResult(); + if (result != null) { + outputReceiver.get(mainOutputTupleTag).output(KV.of(processingState.getKey(), result)); + processingState.resultProduced(); + } + + // Nothing else to do. We will attempt to process buffered events later. + return state; + } + + if (processingState.isNextEvent(currentSequence)) { + // Event matches expected sequence + state = currentStateState.read(); + + state.mutate(currentEvent); + ResultTypeT result = state.produceResult(); + if (result != null) { + outputReceiver.get(mainOutputTupleTag).output(KV.of(processingState.getKey(), result)); + processingState.resultProduced(); + } + processingState.eventAccepted(currentSequence, thisIsTheLastEvent); + + return state; + } + + // Event is not ready to be processed yet + Instant eventTimestamp = Instant.ofEpochMilli(currentSequence); + bufferedEventsState.add(TimestampedValue.of(currentEvent, eventTimestamp)); + processingState.eventBuffered(currentSequence, thisIsTheLastEvent); + + // This will signal that the state hasn't been mutated and we don't need to save it. + return null; + } + + /** Process buffered events. */ + private void processBufferedEvents( + ProcessingState processingState, + StateTypeT state, + OrderedListState bufferedEventsState, + MultiOutputReceiver outputReceiver, + Timer largeBatchEmissionTimer) { + if (state == null) { + // Only when the current event caused a state mutation and the state is passed to this + // method should we attempt to process buffered events + return; + } + + if (!processingState.readyToProcessBufferedEvents()) { + return; + } + + if (exceededMaxResultCountForBundle(processingState, largeBatchEmissionTimer)) { + // No point in trying to process buffered events + return; + } + + Instant startRange = Instant.ofEpochMilli(processingState.getEarliestBufferedSequence()); + Instant endRange = Instant.ofEpochMilli(processingState.getLatestBufferedSequence() + 1); + Instant endClearRange = null; + + // readRange is efficiently implemented and will bring records in batches + Iterable> events = + bufferedEventsState.readRange(startRange, endRange); + + Iterator> bufferedEventsIterator = events.iterator(); + while (bufferedEventsIterator.hasNext()) { + TimestampedValue timestampedEvent = bufferedEventsIterator.next(); + Instant eventTimestamp = timestampedEvent.getTimestamp(); + long eventSequence = eventTimestamp.getMillis(); + + EventTypeT bufferedEvent = timestampedEvent.getValue(); + if (processingState.checkForDuplicateBatchedEvent(eventSequence)) { + outputReceiver + .get(unprocessedEventsTupleTag) + .output( + KV.of( + processingState.getKey(), + KV.of( + eventSequence, + UnprocessedEvent.create(bufferedEvent, Reason.duplicate)))); + continue; + } + + if (eventSequence > processingState.getLastOutputSequence() + 1) { + processingState.foundSequenceGap(eventSequence); + // Records will be cleared up to this element + endClearRange = Instant.ofEpochMilli(eventSequence); + break; + } + + // This check needs to be done after we checked for sequence gap and before we + // attempt to process the next element which can result in a new result. + if (exceededMaxResultCountForBundle(processingState, largeBatchEmissionTimer)) { + endClearRange = Instant.ofEpochMilli(eventSequence); + break; + } + + state.mutate(bufferedEvent); + ResultTypeT result = state.produceResult(); + if (result != null) { + outputReceiver.get(mainOutputTupleTag).output(KV.of(processingState.getKey(), result)); + processingState.resultProduced(); + } + processingState.processedBufferedEvent(eventSequence); + // Remove this record also + endClearRange = Instant.ofEpochMilli(eventSequence + 1); + } + + bufferedEventsState.clearRange(startRange, endClearRange); + } + + private boolean exceededMaxResultCountForBundle( + ProcessingState processingState, Timer largeBatchEmissionTimer) { + boolean exceeded = + processingState.resultsProducedInBundle(numberOfResultsBeforeBundleStart) + >= maxNumberOfResultsToProduce; + if (exceeded) { + LOG.info( + "Setting the timer to output next batch of events for key '" + + processingState.getKey() + + "'"); + // TODO: this work fine for global windows. Need to check what happens for other types of + // windows. + // See GroupIntoBatches for examples on how to hold the timestamp. + // TODO: test that on draining the pipeline all the results are still produced correctly. + largeBatchEmissionTimer.offset(Duration.millis(1)).setRelative(); + } + return exceeded; + } + + @OnTimer(LARGE_BATCH_EMISSION_TIMER) + public void onBatchEmission( + OnTimerContext context, + @StateId(BUFFERED_EVENTS) OrderedListState bufferedEventsState, + @AlwaysFetched @StateId(PROCESSING_STATE) + ValueState> processingStatusState, + @AlwaysFetched @StateId(MUTABLE_STATE) ValueState currentStateState, + @TimerId(LARGE_BATCH_EMISSION_TIMER) Timer largeBatchEmissionTimer, + MultiOutputReceiver outputReceiver) { + ProcessingState processingState = processingStatusState.read(); + if (processingState == null) { + LOG.warn("Processing state is empty. Ignore it if the pipeline is being cancelled."); + return; + } + StateTypeT state = currentStateState.read(); + if (state == null) { + LOG.warn("Mutable state is empty. Ignore it if the pipeline is being cancelled."); + return; + } + + LOG.debug("Starting to process batch for key '" + processingState.getKey() + "'"); + + this.numberOfResultsBeforeBundleStart = processingState.getResultCount(); + + processBufferedEvents( + processingState, state, bufferedEventsState, outputReceiver, largeBatchEmissionTimer); + + saveStates( + processingStatusState, + processingState, + currentStateState, + state, + outputReceiver, + // TODO: validate that this is correct. + context.window().maxTimestamp()); + + checkIfProcessingIsCompleted(processingState); + } + + @OnTimer(STATUS_EMISSION_TIMER) + @SuppressWarnings("unused") + public void onStatusEmission( + MultiOutputReceiver outputReceiver, + @TimerId(STATUS_EMISSION_TIMER) Timer statusEmissionTimer, + @StateId(WINDOW_CLOSED) ValueState windowClosedState, + @StateId(PROCESSING_STATE) + ValueState> processingStateState) { + + ProcessingState currentState = processingStateState.read(); + if (currentState == null) { + // This could happen if the state has been purged already during the draining. + // It means that there is nothing that we can do and we just need to return. + LOG.warn( + "Current processing state is null in onStatusEmission() - most likely the pipeline is shutting down."); + return; + } + + emitProcessingStatus(currentState, outputReceiver, Instant.now()); + + Boolean windowClosed = windowClosedState.read(); + if (!currentState.isProcessingCompleted() + // Stop producing statuses if we are finished for a particular key + && (windowClosed == null || !windowClosed)) { + statusEmissionTimer.offset(statusUpdateFrequency).setRelative(); + } + } + + @OnWindowExpiration + public void onWindowExpiration(@StateId(WINDOW_CLOSED) ValueState windowClosedState) { + windowClosedState.write(true); + } + } +} diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorResult.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorResult.java new file mode 100644 index 0000000000000..f61df6254b253 --- /dev/null +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorResult.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +/** + * The result of the ordered processing. Two PCollections are returned: + *

  • output - the key/value of the mutated states + *
  • processingStatuses - the key/value of the status of processing for a particular key + * + * @param + * @param + */ +public class OrderedEventProcessorResult implements POutput { + + private final PCollection> outputPCollection; + private final TupleTag> outputPCollectionTupleTag; + + private final PCollection> eventProcessingStatusPCollection; + private final TupleTag> eventProcessingStatusTupleTag; + + private final PCollection>>> + unprocessedEventPCollection; + private final TupleTag>>> unprocessedEventTupleTag; + + OrderedEventProcessorResult( + Pipeline pipeline, + PCollection> outputPCollection, + TupleTag> outputPCollectionTupleTag, + PCollection> eventProcessingStatusPCollection, + TupleTag> eventProcessingStatusTupleTag, + PCollection>>> unprocessedEventPCollection, + TupleTag>>> unprocessedEventTupleTag) { + + this.pipeline = pipeline; + this.outputPCollection = outputPCollection; + this.outputPCollectionTupleTag = outputPCollectionTupleTag; + this.eventProcessingStatusPCollection = eventProcessingStatusPCollection; + this.eventProcessingStatusTupleTag = eventProcessingStatusTupleTag; + this.unprocessedEventPCollection = unprocessedEventPCollection; + this.unprocessedEventTupleTag = unprocessedEventTupleTag; + } + + private final Pipeline pipeline; + + @Override + public Pipeline getPipeline() { + return pipeline; + } + + @Override + public Map, PValue> expand() { + return ImmutableMap.of( + eventProcessingStatusTupleTag, + eventProcessingStatusPCollection, + outputPCollectionTupleTag, + outputPCollection, + unprocessedEventTupleTag, + unprocessedEvents()); + } + + @Override + public void finishSpecifyingOutput( + String transformName, PInput input, PTransform transform) {} + + /** + * @return processing status for a particular key. The elements will have the timestamp of the + * instant the status was emitted. + */ + public PCollection> processingStatuses() { + return eventProcessingStatusPCollection; + } + + /** @return processed states keyed by the original key */ + public PCollection> output() { + return outputPCollection; + } + + public PCollection>>> unprocessedEvents() { + return unprocessedEventPCollection; + } +} diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java new file mode 100644 index 0000000000000..8409d0555d42a --- /dev/null +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import java.io.Serializable; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.values.KV; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.joda.time.Duration; + +public abstract class OrderedProcessingHandler< + EventT, KeyT, StateT extends MutableState, ResultT> + implements Serializable { + + public static final int DEFAULT_STATUS_UPDATE_FREQUENCY_SECONDS = 5; + private static final boolean DEFAULT_PRODUCE_STATUS_UPDATE_ON_EVERY_EVENT = false; + public static final int DEFAULT_MAX_ELEMENTS_TO_OUTPUT = 10_000; + + private final Class eventTClass; + private final Class keyTClass; + private final Class stateTClass; + private final Class resultTClass; + + private int maxOutputElementsPerBundle = DEFAULT_MAX_ELEMENTS_TO_OUTPUT; + private Duration statusUpdateFrequency = + Duration.standardSeconds(DEFAULT_STATUS_UPDATE_FREQUENCY_SECONDS); + private boolean produceStatusUpdateOnEveryEvent = DEFAULT_PRODUCE_STATUS_UPDATE_ON_EVERY_EVENT; + + public OrderedProcessingHandler( + Class eventTClass, + Class keyTClass, + Class stateTClass, + Class resultTClass) { + this.eventTClass = eventTClass; + this.keyTClass = keyTClass; + this.stateTClass = stateTClass; + this.resultTClass = resultTClass; + } + + public abstract @NonNull EventExaminer getEventExaminer(); + + public @NonNull Coder getEventCoder( + Pipeline pipeline, Coder>> inputCoder) + throws CannotProvideCoderException { + if (KvCoder.class.isAssignableFrom(inputCoder.getClass())) { + Coder> valueCoder = + ((KvCoder>) inputCoder).getValueCoder(); + if (KV.class.isAssignableFrom(valueCoder.getClass())) { + return ((KvCoder) valueCoder).getValueCoder(); + } + } + return pipeline.getCoderRegistry().getCoder(eventTClass); + } + + public Coder getStateCoder(Pipeline pipeline) throws CannotProvideCoderException { + return pipeline.getCoderRegistry().getCoder(stateTClass); + } + + public Coder getKeyCoder(Pipeline pipeline, Coder>> inputCoder) + throws CannotProvideCoderException { + if (KvCoder.class.isAssignableFrom(inputCoder.getClass())) { + return ((KvCoder>) inputCoder).getKeyCoder(); + } + return pipeline.getCoderRegistry().getCoder(keyTClass); + } + + public Coder getResultCoder(Pipeline pipeline) throws CannotProvideCoderException { + return pipeline.getCoderRegistry().getCoder(resultTClass); + } + + public Duration getStatusUpdateFrequency() { + return statusUpdateFrequency; + } + + public void setStatusUpdateFrequency(Duration statusUpdateFrequency) { + this.statusUpdateFrequency = statusUpdateFrequency; + } + + public boolean isProduceStatusUpdateOnEveryEvent() { + return produceStatusUpdateOnEveryEvent; + } + + public int getMaxOutputElementsPerBundle() { + return maxOutputElementsPerBundle; + } + + public void setMaxOutputElementsPerBundle(int maxOutputElementsPerBundle) { + this.maxOutputElementsPerBundle = maxOutputElementsPerBundle; + } + + public void setProduceStatusUpdateOnEveryEvent(boolean value) { + this.produceStatusUpdateOnEveryEvent = value; + } +} diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingStatus.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingStatus.java new file mode 100644 index 0000000000000..cb1f7b4c423a2 --- /dev/null +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingStatus.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import com.google.auto.value.AutoValue; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.joda.time.Instant; + +/** Indicates the status of ordered processing for a particular key. */ +@AutoValue +@DefaultSchema(AutoValueSchema.class) +public abstract class OrderedProcessingStatus { + + public static OrderedProcessingStatus create( + Long lastOutputSequence, + long numberOfBufferedEvents, + Long earliestBufferedSequence, + Long latestBufferedSequence, + long numberOfReceivedEvents, + long resultCount, + long duplicateCount, + boolean lastEventReceived) { + return new AutoValue_OrderedProcessingStatus.Builder() + .setLastProcessedSequence(lastOutputSequence) + .setNumberOfBufferedEvents(numberOfBufferedEvents) + .setEarliestBufferedSequence(earliestBufferedSequence) + .setLatestBufferedSequence(latestBufferedSequence) + .setNumberOfReceivedEvents(numberOfReceivedEvents) + .setLastEventReceived(lastEventReceived) + .setDuplicateCount(duplicateCount) + .setResultCount(resultCount) + .setStatusDate(Instant.now()) + .build(); + } + + @Nullable + public abstract Long getLastProcessedSequence(); + + public abstract long getNumberOfBufferedEvents(); + + @Nullable + public abstract Long getEarliestBufferedSequence(); + + @Nullable + public abstract Long getLatestBufferedSequence(); + + public abstract long getNumberOfReceivedEvents(); + + public abstract long getDuplicateCount(); + + public abstract long getResultCount(); + + public abstract boolean isLastEventReceived(); + + public abstract Instant getStatusDate(); + + @Override + public final boolean equals(@Nullable Object obj) { + if (obj == null) { + return false; + } + if (!OrderedProcessingStatus.class.isAssignableFrom(obj.getClass())) { + return false; + } + OrderedProcessingStatus that = (OrderedProcessingStatus) obj; + boolean result = + Objects.equals(this.getEarliestBufferedSequence(), that.getEarliestBufferedSequence()) + && Objects.equals(this.getLastProcessedSequence(), that.getLastProcessedSequence()) + && Objects.equals(this.getLatestBufferedSequence(), that.getLatestBufferedSequence()) + && this.getNumberOfBufferedEvents() == that.getNumberOfBufferedEvents() + && this.getDuplicateCount() == that.getDuplicateCount() + && this.getResultCount() == that.getResultCount() + && this.getNumberOfReceivedEvents() == that.getNumberOfReceivedEvents(); + return result; + } + + @Override + public final int hashCode() { + return Objects.hash( + this.getEarliestBufferedSequence(), + this.getLastProcessedSequence(), + this.getLatestBufferedSequence(), + this.getNumberOfBufferedEvents(), + this.getNumberOfReceivedEvents(), + this.getDuplicateCount(), + this.getResultCount()); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setLastProcessedSequence(Long value); + + public abstract Builder setNumberOfBufferedEvents(long value); + + public abstract Builder setEarliestBufferedSequence(Long value); + + public abstract Builder setLatestBufferedSequence(Long value); + + public abstract Builder setNumberOfReceivedEvents(long value); + + public abstract Builder setDuplicateCount(long value); + + public abstract Builder setResultCount(long value); + + public abstract Builder setLastEventReceived(boolean value); + + public abstract Builder setStatusDate(Instant value); + + public abstract OrderedProcessingStatus build(); + } +} diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessingState.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessingState.java new file mode 100644 index 0000000000000..646ead9848257 --- /dev/null +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessingState.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.BooleanCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.initialization.qual.Initialized; + +/** + * Class used to store the status of processing for a particular key. + * + * @param + */ +class ProcessingState { + + @Nullable private Long lastOutputSequence; + @Nullable private Long latestBufferedSequence; + @Nullable private Long earliestBufferedSequence; + private long bufferedRecordCount; + + private boolean lastEventReceived; + + private long recordsReceived; + + private long duplicates; + + private long resultCount; + + private KeyT key; + + public ProcessingState(KeyT key) { + this.key = key; + this.bufferedRecordCount = 0; + this.lastOutputSequence = null; + this.earliestBufferedSequence = null; + this.latestBufferedSequence = null; + } + + /** + * Only to be used by the coder. + * + * @param key + * @param lastOutputSequence + * @param earliestBufferedSequence + * @param latestBufferedSequence + * @param bufferedRecordCount + */ + ProcessingState( + KeyT key, + @Nullable Long lastOutputSequence, + @Nullable Long earliestBufferedSequence, + @Nullable Long latestBufferedSequence, + long bufferedRecordCount, + long recordsReceived, + long duplicates, + long resultCount, + boolean lastEventReceived) { + this(key); + this.lastOutputSequence = lastOutputSequence; + this.earliestBufferedSequence = earliestBufferedSequence; + this.latestBufferedSequence = latestBufferedSequence; + this.bufferedRecordCount = bufferedRecordCount; + this.recordsReceived = recordsReceived; + this.duplicates = duplicates; + this.resultCount = resultCount; + this.lastEventReceived = lastEventReceived; + } + + @Nullable + public Long getLastOutputSequence() { + return lastOutputSequence; + } + + @Nullable + public Long getLatestBufferedSequence() { + return latestBufferedSequence; + } + + @Nullable + public Long getEarliestBufferedSequence() { + return earliestBufferedSequence; + } + + public long getBufferedRecordCount() { + return bufferedRecordCount; + } + + public long getRecordsReceived() { + return recordsReceived; + } + + public boolean isLastEventReceived() { + return lastEventReceived; + } + + public long getResultCount() { + return resultCount; + } + + public long getDuplicates() { + return duplicates; + } + + public KeyT getKey() { + return key; + } + + /** + * Current event matched the sequence and was processed. + * + * @param sequence + * @param lastEvent + */ + public void eventAccepted(long sequence, boolean lastEvent) { + this.lastOutputSequence = sequence; + setLastEventReceived(lastEvent); + } + + private void setLastEventReceived(boolean lastEvent) { + // Only one last event can be received. + this.lastEventReceived = this.lastEventReceived ? true : lastEvent; + } + + /** + * New event added to the buffer. + * + * @param sequenceNumber of the event + * @param isLastEvent + */ + void eventBuffered(long sequenceNumber, boolean isLastEvent) { + bufferedRecordCount++; + latestBufferedSequence = + Math.max( + sequenceNumber, + latestBufferedSequence == null ? Long.MIN_VALUE : latestBufferedSequence); + earliestBufferedSequence = + Math.min( + sequenceNumber, + earliestBufferedSequence == null ? Long.MAX_VALUE : earliestBufferedSequence); + + setLastEventReceived(isLastEvent); + } + + /** + * An event was processed and removed from the buffer. + * + * @param sequence of the processed event + */ + public void processedBufferedEvent(long sequence) { + bufferedRecordCount--; + lastOutputSequence = sequence; + + if (bufferedRecordCount == 0) { + earliestBufferedSequence = latestBufferedSequence = null; + } else { + // We don't know for sure that it's the earliest record yet, but OrderedEventProcessor will + // read the next + // buffered event and call foundSequenceGap() and adjust this value. + earliestBufferedSequence = sequence + 1; + } + } + + /** + * A set of records was pulled from the buffer, but it turned out that the element is not + * sequential. + * + * @param newEarliestSequence + */ + public void foundSequenceGap(long newEarliestSequence) { + earliestBufferedSequence = newEarliestSequence; + } + + @Override + public boolean equals(@Nullable @Initialized Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ProcessingState)) { + return false; + } + ProcessingState that = (ProcessingState) o; + return bufferedRecordCount == that.bufferedRecordCount + && lastEventReceived == that.lastEventReceived + && recordsReceived == that.recordsReceived + && duplicates == that.duplicates + && Objects.equals(lastOutputSequence, that.lastOutputSequence) + && Objects.equals(latestBufferedSequence, that.latestBufferedSequence) + && Objects.equals(earliestBufferedSequence, that.earliestBufferedSequence) + && Objects.equals(key, that.key) + && resultCount == that.resultCount; + } + + @Override + public int hashCode() { + return Objects.hash( + lastOutputSequence, + latestBufferedSequence, + earliestBufferedSequence, + bufferedRecordCount, + lastEventReceived, + recordsReceived, + duplicates, + resultCount, + key); + } + + public boolean isProcessingCompleted() { + return lastEventReceived && bufferedRecordCount == 0; + } + + public void recordReceived() { + recordsReceived++; + } + + public boolean isNextEvent(long sequence) { + return lastOutputSequence != null && sequence == lastOutputSequence + 1; + } + + public boolean hasAlreadyBeenProcessed(long currentSequence) { + boolean result = lastOutputSequence != null && lastOutputSequence >= currentSequence; + if (result) { + duplicates++; + } + return result; + } + + public boolean checkForDuplicateBatchedEvent(long currentSequence) { + boolean result = lastOutputSequence != null && lastOutputSequence == currentSequence; + if (result) { + duplicates++; + if (--bufferedRecordCount == 0) { + earliestBufferedSequence = latestBufferedSequence = null; + } + } + return result; + } + + public boolean readyToProcessBufferedEvents() { + return earliestBufferedSequence != null + && lastOutputSequence != null + && earliestBufferedSequence == lastOutputSequence + 1; + } + + public void resultProduced() { + resultCount++; + } + + public long resultsProducedInBundle(long numberOfResultsBeforeBundleStart) { + return resultCount - numberOfResultsBeforeBundleStart; + } + + /** + * Coder for the processing status. + * + * @param + */ + static class ProcessingStateCoder extends Coder> { + + private static final NullableCoder NULLABLE_LONG_CODER = + NullableCoder.of(VarLongCoder.of()); + private static final Coder LONG_CODER = VarLongCoder.of(); + private static final VarIntCoder INTEGER_CODER = VarIntCoder.of(); + private static final BooleanCoder BOOLEAN_CODER = BooleanCoder.of(); + + private Coder keyCoder; + + private ProcessingStateCoder(Coder keyCoder) { + this.keyCoder = keyCoder; + } + + public static ProcessingStateCoder of(Coder keyCoder) { + return new ProcessingStateCoder<>(keyCoder); + } + + @Override + public void encode(ProcessingState value, OutputStream outStream) throws IOException { + NULLABLE_LONG_CODER.encode(value.getLastOutputSequence(), outStream); + NULLABLE_LONG_CODER.encode(value.getEarliestBufferedSequence(), outStream); + NULLABLE_LONG_CODER.encode(value.getLatestBufferedSequence(), outStream); + LONG_CODER.encode(value.getBufferedRecordCount(), outStream); + LONG_CODER.encode(value.getRecordsReceived(), outStream); + LONG_CODER.encode(value.getDuplicates(), outStream); + LONG_CODER.encode(value.getResultCount(), outStream); + BOOLEAN_CODER.encode(value.isLastEventReceived(), outStream); + keyCoder.encode(value.getKey(), outStream); + } + + @Override + public ProcessingState decode(InputStream inStream) throws IOException { + Long lastOutputSequence = NULLABLE_LONG_CODER.decode(inStream); + Long earliestBufferedSequence = NULLABLE_LONG_CODER.decode(inStream); + Long latestBufferedSequence = NULLABLE_LONG_CODER.decode(inStream); + int bufferedRecordCount = INTEGER_CODER.decode(inStream); + long recordsReceivedCount = LONG_CODER.decode(inStream); + long duplicates = LONG_CODER.decode(inStream); + long resultCount = LONG_CODER.decode(inStream); + boolean isLastEventReceived = BOOLEAN_CODER.decode(inStream); + KeyT key = keyCoder.decode(inStream); + + return new ProcessingState<>( + key, + lastOutputSequence, + earliestBufferedSequence, + latestBufferedSequence, + bufferedRecordCount, + recordsReceivedCount, + duplicates, + resultCount, + isLastEventReceived); + } + + @Override + public List> getCoderArguments() { + return ImmutableList.of(); + } + + @Override + public void verifyDeterministic() {} + } +} diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/UnprocessedEvent.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/UnprocessedEvent.java new file mode 100644 index 0000000000000..b1a8a3699fc63 --- /dev/null +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/UnprocessedEvent.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.ByteCoder; +import org.apache.beam.sdk.coders.Coder; + +@AutoValue +public abstract class UnprocessedEvent { + + public static UnprocessedEvent create(EventT event, Reason reason) { + return new AutoValue_UnprocessedEvent<>(event, reason); + } + + public enum Reason { + duplicate, + buffered + }; + + public abstract EventT getEvent(); + + public abstract Reason getReason(); + + static class UnprocessedEventCoder extends Coder> { + + private final Coder eventCoder; + + UnprocessedEventCoder(Coder eventCoder) { + this.eventCoder = eventCoder; + } + + @Override + public void encode(UnprocessedEvent value, OutputStream outStream) throws IOException { + ByteCoder.of().encode((byte) value.getReason().ordinal(), outStream); + eventCoder.encode(value.getEvent(), outStream); + } + + @Override + public UnprocessedEvent decode(InputStream inputStream) throws IOException { + Reason reason = Reason.values()[ByteCoder.of().decode(inputStream)]; + EventT event = eventCoder.decode(inputStream); + return UnprocessedEvent.create(event, reason); + } + + @Override + public List> getCoderArguments() { + return Arrays.asList(eventCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + verifyDeterministic( + this, "Unprocessed event coder requires deterministic event coder", eventCoder); + } + } +} diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/package-info.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/package-info.java new file mode 100644 index 0000000000000..f9d7e3d67bff1 --- /dev/null +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Provides a transform for ordered processing. + * + * @see org.apache.beam.sdk.extensions.ordered.OrderedEventProcessor + */ +package org.apache.beam.sdk.extensions.ordered; diff --git a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/Event.java b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/Event.java new file mode 100644 index 0000000000000..3cf879d8239b1 --- /dev/null +++ b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/Event.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; + +/** + * Event class to be used in testing. + * + *

    The event simulate a string being emitted for a particular key, e.g., sensor id or customer + * id. + */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class Event implements Serializable { + + public static Event create(long sequence, String groupId, String value) { + return new AutoValue_Event(sequence, groupId, value); + } + + /** @return event sequence number */ + public abstract long getSequence(); + + /** @return the group id event is associated with */ + public abstract String getKey(); + + /** @return value of the event */ + public abstract String getValue(); +} diff --git a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java new file mode 100644 index 0000000000000..8a71551527b3e --- /dev/null +++ b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java @@ -0,0 +1,605 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.extensions.ordered.UnprocessedEvent.Reason; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Ordered Processing tests use the same testing scenario. Events are sent in or out of sequence. + * Each event is a string for a particular key. The output is a concatenation of all strings. + * + *

    TODO: add tests for outputting buffered events in case of drainage. TODO: add batch processing + * in parallel with streaming processing + */ +@RunWith(JUnit4.class) +public class OrderedEventProcessorTest { + + public static final boolean LAST_EVENT_RECEIVED = true; + public static final int EMISSION_FREQUENCY_ON_EVERY_ELEMENT = 1; + public static final int INITIAL_SEQUENCE_OF_0 = 0; + public static final boolean DONT_PRODUCE_STATUS_ON_EVERY_EVENT = false; + public static final int LARGE_MAX_RESULTS_PER_OUTPUT = 1000; + public static final int EMISSION_FREQUENCY_ON_EVERY_OTHER_EVENT = 2; + public static final boolean PRODUCE_STATUS_ON_EVERY_EVENT = true; + @Rule public final transient TestPipeline p = TestPipeline.create(); + + static class MapEventsToKV extends DoFn>> { + + @ProcessElement + public void convert( + @Element Event event, OutputReceiver>> outputReceiver) { + outputReceiver.output(KV.of(event.getKey(), KV.of(event.getSequence(), event.getValue()))); + } + } + + static class MapStringBufferStateToString + extends DoFn, KV> { + + @ProcessElement + public void map( + @Element KV element, + OutputReceiver> outputReceiver) { + outputReceiver.output(KV.of(element.getKey(), element.getValue().toString())); + } + } + + @Test + public void testPerfectOrderingProcessing() throws CannotProvideCoderException { + Event[] events = { + Event.create(0, "id-1", "a"), + Event.create(1, "id-1", "b"), + Event.create(2, "id-1", "c"), + Event.create(3, "id-1", "d"), + Event.create(0, "id-2", "a"), + Event.create(1, "id-2", "b") + }; + + Collection> expectedStatuses = new ArrayList<>(); + expectedStatuses.add( + KV.of( + "id-1", + OrderedProcessingStatus.create( + 3L, + 0, + null, + null, + 4, + Arrays.stream(events).filter(e -> e.getKey().equals("id-1")).count(), + 0, + false))); + expectedStatuses.add( + KV.of( + "id-2", + OrderedProcessingStatus.create( + 1L, + 0, + null, + null, + 2, + Arrays.stream(events).filter(e -> e.getKey().equals("id-2")).count(), + 0, + false))); + + Collection> expectedOutput = new ArrayList<>(); + expectedOutput.add(KV.of("id-1", "a")); + expectedOutput.add(KV.of("id-1", "ab")); + expectedOutput.add(KV.of("id-1", "abc")); + expectedOutput.add(KV.of("id-1", "abcd")); + expectedOutput.add(KV.of("id-2", "a")); + expectedOutput.add(KV.of("id-2", "ab")); + + testStreamingProcessing( + events, + expectedStatuses, + expectedOutput, + EMISSION_FREQUENCY_ON_EVERY_ELEMENT, + INITIAL_SEQUENCE_OF_0, + LARGE_MAX_RESULTS_PER_OUTPUT, + DONT_PRODUCE_STATUS_ON_EVERY_EVENT); + } + + @Test + public void testOutOfSequenceProcessing() throws CannotProvideCoderException { + Event[] events = { + Event.create(2, "id-1", "c"), + Event.create(1, "id-1", "b"), + Event.create(0, "id-1", "a"), + Event.create(3, "id-1", "d"), + Event.create(1, "id-2", "b"), + Event.create(2, "id-2", "c"), + Event.create(4, "id-2", "e"), + Event.create(0, "id-2", "a"), + Event.create(3, "id-2", "d") + }; + + Collection> expectedStatuses = new ArrayList<>(); + expectedStatuses.add( + KV.of( + "id-1", + OrderedProcessingStatus.create( + 3L, + 0, + null, + null, + 4, + Arrays.stream(events).filter(e -> e.getKey().equals("id-1")).count(), + 0, + false))); + expectedStatuses.add( + KV.of( + "id-2", + OrderedProcessingStatus.create( + 4L, + 0, + null, + null, + 5, + Arrays.stream(events).filter(e -> e.getKey().equals("id-2")).count(), + 0, + false))); + + Collection> expectedOutput = new ArrayList<>(); + expectedOutput.add(KV.of("id-1", "a")); + expectedOutput.add(KV.of("id-1", "ab")); + expectedOutput.add(KV.of("id-1", "abc")); + expectedOutput.add(KV.of("id-1", "abcd")); + expectedOutput.add(KV.of("id-2", "a")); + expectedOutput.add(KV.of("id-2", "ab")); + expectedOutput.add(KV.of("id-2", "abc")); + expectedOutput.add(KV.of("id-2", "abcd")); + expectedOutput.add(KV.of("id-2", "abcde")); + + testStreamingProcessing( + events, + expectedStatuses, + expectedOutput, + EMISSION_FREQUENCY_ON_EVERY_ELEMENT, + INITIAL_SEQUENCE_OF_0, + LARGE_MAX_RESULTS_PER_OUTPUT, + DONT_PRODUCE_STATUS_ON_EVERY_EVENT); + } + + @Test + public void testUnfinishedProcessing() throws CannotProvideCoderException { + Event[] events = { + Event.create(2, "id-1", "c"), + // Excluded Event.create(1, "id-1", "b"), + Event.create(0, "id-1", "a"), + Event.create(3, "id-1", "d"), + Event.create(0, "id-2", "a"), + Event.create(1, "id-2", "b"), + }; + + Collection> expectedStatuses = new ArrayList<>(); + expectedStatuses.add( + KV.of("id-1", OrderedProcessingStatus.create(0L, 2, 2L, 3L, 3, 1L, 0, false))); + expectedStatuses.add( + KV.of("id-2", OrderedProcessingStatus.create(1L, 0, null, null, 2, 2L, 0, false))); + + Collection> expectedOutput = new ArrayList<>(); + expectedOutput.add(KV.of("id-1", "a")); + expectedOutput.add(KV.of("id-2", "a")); + expectedOutput.add(KV.of("id-2", "ab")); + + testStreamingProcessing(events, expectedStatuses, expectedOutput, 1, 0, 1000, false); + } + + @Test + public void testHandlingOfDuplicateSequences() throws CannotProvideCoderException { + Event[] events = { + Event.create(3, "id-1", "d"), + Event.create(2, "id-1", "c"), + // Duplicates to be buffered + Event.create(3, "id-1", "d"), + Event.create(3, "id-1", "d"), + Event.create(0, "id-1", "a"), + Event.create(1, "id-1", "b"), + + // Duplicates after the events are processed + Event.create(1, "id-1", "b"), + Event.create(3, "id-1", "d"), + }; + int resultCount = 4; + int duplicateCount = 4; + + Collection> expectedStatuses = new ArrayList<>(); + expectedStatuses.add( + KV.of( + "id-1", + OrderedProcessingStatus.create( + 3L, 0, null, null, events.length, resultCount, duplicateCount, false))); + + Collection> expectedOutput = new ArrayList<>(); + expectedOutput.add(KV.of("id-1", "a")); + expectedOutput.add(KV.of("id-1", "ab")); + expectedOutput.add(KV.of("id-1", "abc")); + expectedOutput.add(KV.of("id-1", "abcd")); + + Collection>>> duplicates = new ArrayList<>(); + duplicates.add(KV.of("id-1", KV.of(3L, UnprocessedEvent.create("d", Reason.duplicate)))); + duplicates.add(KV.of("id-1", KV.of(3L, UnprocessedEvent.create("d", Reason.duplicate)))); + duplicates.add(KV.of("id-1", KV.of(1L, UnprocessedEvent.create("b", Reason.duplicate)))); + duplicates.add(KV.of("id-1", KV.of(3L, UnprocessedEvent.create("d", Reason.duplicate)))); + + testStreamingProcessing( + events, + expectedStatuses, + expectedOutput, + duplicates, + EMISSION_FREQUENCY_ON_EVERY_ELEMENT, + INITIAL_SEQUENCE_OF_0, + LARGE_MAX_RESULTS_PER_OUTPUT, + DONT_PRODUCE_STATUS_ON_EVERY_EVENT); + } + + @Test + public void testProcessingWithEveryOtherResultEmission() throws CannotProvideCoderException { + Event[] events = { + Event.create(2, "id-1", "c"), + Event.create(1, "id-1", "b"), + Event.create(0, "id-1", "a"), + Event.create(3, "id-1", "d"), + Event.create(0, "id-2", "a"), + Event.create(1, "id-2", "b"), + }; + + Collection> expectedStatuses = new ArrayList<>(); + expectedStatuses.add( + KV.of("id-1", OrderedProcessingStatus.create(3L, 0, null, null, 4, 2L, 0, false))); + expectedStatuses.add( + KV.of("id-2", OrderedProcessingStatus.create(1L, 0, null, null, 2, 1L, 0, false))); + + Collection> expectedOutput = new ArrayList<>(); + expectedOutput.add(KV.of("id-1", "a")); + // Skipped KV.of("id-1", "ab"), + expectedOutput.add(KV.of("id-1", "abc")); + // Skipped KV.of("id-1", "abcd"), + expectedOutput.add(KV.of("id-2", "a")); + // Skipped KV.of("id-2", "ab") + testStreamingProcessing( + events, + expectedStatuses, + expectedOutput, + EMISSION_FREQUENCY_ON_EVERY_OTHER_EVENT, + INITIAL_SEQUENCE_OF_0, + LARGE_MAX_RESULTS_PER_OUTPUT, + DONT_PRODUCE_STATUS_ON_EVERY_EVENT); + } + + @Test + public void testLargeBufferedOutputInTimer() throws CannotProvideCoderException { + int maxResultsPerOutput = 100; + + // Array of sequences starting with 2 and the last element - 1. + // Output will be buffered until the last event arrives + long[] sequences = new long[maxResultsPerOutput * 3]; + for (int i = 0; i < sequences.length - 1; i++) { + sequences[i] = i + 2L; + } + sequences[sequences.length - 1] = 1; + + List events = new ArrayList<>(sequences.length); + Collection> expectedOutput = new ArrayList<>(sequences.length); + Collection> expectedStatuses = + new ArrayList<>(sequences.length + 10); + + StringBuilder output = new StringBuilder(); + String outputPerElement = "."; + String key = "id-1"; + + int bufferedEventCount = 0; + + for (long sequence : sequences) { + ++bufferedEventCount; + + events.add(Event.create(sequence, key, outputPerElement)); + output.append(outputPerElement); + expectedOutput.add(KV.of(key, output.toString())); + + if (bufferedEventCount < sequences.length) { + // Last event will result in a batch of events being produced. That's why it's excluded + // here. + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + null, bufferedEventCount, 2L, sequence, bufferedEventCount, 0L, 0, false))); + } + } + + // Statuses produced by the batched processing + for (int i = maxResultsPerOutput; i < sequences.length; i += maxResultsPerOutput) { + long lastOutputSequence = i; + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + lastOutputSequence, + sequences.length - lastOutputSequence, + lastOutputSequence + 1, + (long) sequences.length, + sequences.length, + lastOutputSequence, + 0, + false))); + } + + // -- Final status - indicates that everything has been fully processed + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + (long) sequences.length, + 0, + null, + null, + sequences.length, + sequences.length, + 0, + false))); + + testStreamingProcessing( + events.toArray(new Event[events.size()]), + expectedStatuses, + expectedOutput, + EMISSION_FREQUENCY_ON_EVERY_ELEMENT, + 1L /* This dataset assumes 1 as the starting sequence */, + maxResultsPerOutput, + PRODUCE_STATUS_ON_EVERY_EVENT); + } + + @Test + public void testSequenceGapProcessingInBufferedOutput() throws CannotProvideCoderException { + int maxResultsPerOutput = 3; + + long[] sequences = new long[] {2, 3, 7, 8, 9, 10, 1, 4, 5, 6}; + + List events = new ArrayList<>(sequences.length); + List> expectedOutput = new ArrayList<>(sequences.length); + + StringBuilder output = new StringBuilder(); + String outputPerElement = "."; + String key = "id-1"; + + for (long sequence : sequences) { + events.add(Event.create(sequence, key, outputPerElement)); + output.append(outputPerElement); + expectedOutput.add(KV.of(key, output.toString())); + } + + int numberOfReceivedEvents = 0; + Collection> expectedStatuses = new ArrayList<>(); + + // First elements are out-of-sequence and they just get buffered. Earliest and latest sequence + // numbers keep changing. + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + null, 1, 2L, 2L, ++numberOfReceivedEvents, 0L, 0, false))); + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + null, 2, 2L, 3L, ++numberOfReceivedEvents, 0L, 0, false))); + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + null, 3, 2L, 7L, ++numberOfReceivedEvents, 0L, 0, false))); + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + null, 4, 2L, 8L, ++numberOfReceivedEvents, 0L, 0, false))); + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + null, 5, 2L, 9L, ++numberOfReceivedEvents, 0L, 0, false))); + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + null, 6, 2L, 10L, ++numberOfReceivedEvents, 0L, 0, false))); + // --- 1 has appeared and caused the batch to be sent out. + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + 3L, 4, 7L, 10L, ++numberOfReceivedEvents, 3L, 0, false))); + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + 4L, 4, 7L, 10L, ++numberOfReceivedEvents, 4L, 0, false))); + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + 5L, 4, 7L, 10L, ++numberOfReceivedEvents, 5L, 0, false))); + // --- 6 came and 6, 7, and 8 got output + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + 8L, 2, 9L, 10L, ++numberOfReceivedEvents, 8L, 0, false))); + // Last timer run produces the final status. Number of received events doesn't + // increase, + // this is the result of a timer processing + expectedStatuses.add( + KV.of( + key, + OrderedProcessingStatus.create( + 10L, 0, null, null, numberOfReceivedEvents, 10L, 0, false))); + + testStreamingProcessing( + events.toArray(new Event[events.size()]), + expectedStatuses, + expectedOutput, + EMISSION_FREQUENCY_ON_EVERY_ELEMENT, + 1L /* This dataset assumes 1 as the starting sequence */, + maxResultsPerOutput, + PRODUCE_STATUS_ON_EVERY_EVENT); + } + + @Test + public void testHandlingOfMaxSequenceNumber() throws CannotProvideCoderException { + Event[] events = { + Event.create(0, "id-1", "a"), + Event.create(1, "id-1", "b"), + Event.create(Long.MAX_VALUE, "id-1", "c") + }; + + Collection> expectedStatuses = new ArrayList<>(); + expectedStatuses.add( + KV.of("id-1", OrderedProcessingStatus.create(1L, 0, null, null, 3, 2, 0, false))); + + Collection> expectedOutput = new ArrayList<>(); + expectedOutput.add(KV.of("id-1", "a")); + expectedOutput.add(KV.of("id-1", "ab")); + + testStreamingProcessing( + events, + expectedStatuses, + expectedOutput, + EMISSION_FREQUENCY_ON_EVERY_ELEMENT, + INITIAL_SEQUENCE_OF_0, + LARGE_MAX_RESULTS_PER_OUTPUT, + DONT_PRODUCE_STATUS_ON_EVERY_EVENT); + } + + @Test + public void testProcessingOfTheLastInput() throws CannotProvideCoderException { + Event[] events = { + Event.create(0, "id-1", "a"), + Event.create(1, "id-1", "b"), + Event.create(2, "id-1", StringEventExaminer.LAST_INPUT) + }; + + Collection> expectedStatuses = new ArrayList<>(); + expectedStatuses.add( + KV.of( + "id-1", + OrderedProcessingStatus.create( + 2L, 0, null, null, events.length, events.length, 0, LAST_EVENT_RECEIVED))); + + Collection> expectedOutput = new ArrayList<>(); + expectedOutput.add(KV.of("id-1", "a")); + expectedOutput.add(KV.of("id-1", "ab")); + expectedOutput.add(KV.of("id-1", "ab" + StringEventExaminer.LAST_INPUT)); + + testStreamingProcessing(events, expectedStatuses, expectedOutput, 1, 0, 1000, false); + } + + private void testStreamingProcessing( + Event[] events, + Collection> expectedStatuses, + Collection> expectedOutput, + int emissionFrequency, + long initialSequence, + int maxResultsPerOutput, + boolean produceStatusOnEveryEvent) + throws CannotProvideCoderException { + testStreamingProcessing( + events, + expectedStatuses, + expectedOutput, + Collections.emptySet() /* no duplicates */, + emissionFrequency, + initialSequence, + maxResultsPerOutput, + produceStatusOnEveryEvent); + } + + private void testStreamingProcessing( + Event[] events, + Collection> expectedStatuses, + Collection> expectedOutput, + Collection>>> expectedDuplicates, + int emissionFrequency, + long initialSequence, + int maxResultsPerOutput, + boolean produceStatusOnEveryEvent) + throws CannotProvideCoderException { + Instant now = Instant.now().minus(Duration.standardMinutes(20)); + TestStream.Builder messageFlow = + TestStream.create(p.getCoderRegistry().getCoder(Event.class)).advanceWatermarkTo(now); + + int delayInMilliseconds = 0; + for (Event e : events) { + messageFlow = + messageFlow + .advanceWatermarkTo(now.plus(Duration.millis(++delayInMilliseconds))) + .addElements(e); + } + + // Needed to force the processing time based timers. + messageFlow = messageFlow.advanceProcessingTime(Duration.standardMinutes(15)); + + PCollection>> input = + p.apply("Create Events", messageFlow.advanceWatermarkToInfinity()) + .apply("To KV", ParDo.of(new MapEventsToKV())); + + StringBufferOrderedProcessingHandler handler = + new StringBufferOrderedProcessingHandler(emissionFrequency, initialSequence); + handler.setMaxOutputElementsPerBundle(maxResultsPerOutput); + if (produceStatusOnEveryEvent) { + handler.setProduceStatusUpdateOnEveryEvent(true); + // This disables status updates emitted on timers. Needed for simpler testing when per event + // update is needed. + handler.setStatusUpdateFrequency(null); + } else { + handler.setStatusUpdateFrequency(Duration.standardMinutes(5)); + } + OrderedEventProcessor orderedEventProcessor = + OrderedEventProcessor.create(handler); + + OrderedEventProcessorResult processingResult = + input.apply("Process Events", orderedEventProcessor); + + PAssert.that("Output matches", processingResult.output()).containsInAnyOrder(expectedOutput); + + PAssert.that("Statuses match", processingResult.processingStatuses()) + .containsInAnyOrder(expectedStatuses); + + PAssert.that("Unprocessed events match", processingResult.unprocessedEvents()) + .containsInAnyOrder(expectedDuplicates); + + p.run(); + } +} diff --git a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBufferOrderedProcessingHandler.java b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBufferOrderedProcessingHandler.java new file mode 100644 index 0000000000000..72f3a3cf21b68 --- /dev/null +++ b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBufferOrderedProcessingHandler.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import org.checkerframework.checker.nullness.qual.NonNull; + +/** + * Ordered processing handler used for testing. + * + *

    It uses all the defaults of the parent class. + */ +public class StringBufferOrderedProcessingHandler + extends OrderedProcessingHandler { + + private final EventExaminer eventExaminer; + + public StringBufferOrderedProcessingHandler(int emissionFrequency, long initialSequence) { + super(String.class, String.class, StringBuilderState.class, String.class); + this.eventExaminer = new StringEventExaminer(initialSequence, emissionFrequency); + } + + @Override + @NonNull + public EventExaminer getEventExaminer() { + return eventExaminer; + } +} diff --git a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBuilderState.java b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBuilderState.java new file mode 100644 index 0000000000000..3d65c36b8c24f --- /dev/null +++ b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBuilderState.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.Objects; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.extensions.ordered.StringBuilderState.StringBuilderStateCoder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +/** + * State used for processing test events. Uses StringBuilder to accumulate the output. + * + *

    Can be configured to produce output with certain frequency. + */ +@DefaultCoder(StringBuilderStateCoder.class) +class StringBuilderState implements MutableState { + + private int emissionFrequency = 1; + private long currentlyEmittedElementNumber; + + private final StringBuilder state = new StringBuilder(); + + StringBuilderState(String initialEvent, int emissionFrequency) { + this(initialEvent, emissionFrequency, 0L); + } + + StringBuilderState( + String initialEvent, int emissionFrequency, long currentlyEmittedElementNumber) { + this.emissionFrequency = emissionFrequency; + this.currentlyEmittedElementNumber = currentlyEmittedElementNumber; + mutate(initialEvent); + } + + @Override + public void mutate(String event) { + state.append(event); + } + + @Override + public String produceResult() { + return currentlyEmittedElementNumber++ % emissionFrequency == 0 ? state.toString() : null; + } + + @Override + public String toString() { + return state.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof StringBuilderState)) { + return false; + } + StringBuilderState that = (StringBuilderState) o; + return emissionFrequency == that.emissionFrequency + && currentlyEmittedElementNumber == that.currentlyEmittedElementNumber + && state.toString().equals(that.state.toString()); + } + + @Override + public int hashCode() { + return Objects.hash(state); + } + + /** Coder for the StringBuilderState. */ + static class StringBuilderStateCoder extends Coder { + + private static final Coder STRING_CODER = StringUtf8Coder.of(); + private static final Coder LONG_CODER = VarLongCoder.of(); + private static final Coder INT_CODER = VarIntCoder.of(); + + @Override + public void encode( + StringBuilderState value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) + throws IOException { + INT_CODER.encode(value.emissionFrequency, outStream); + LONG_CODER.encode(value.currentlyEmittedElementNumber, outStream); + STRING_CODER.encode(value.state.toString(), outStream); + } + + @Override + public StringBuilderState decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) + throws IOException { + int emissionFrequency = INT_CODER.decode(inStream); + long currentlyEmittedElementNumber = LONG_CODER.decode(inStream); + String decoded = STRING_CODER.decode(inStream); + StringBuilderState result = + new StringBuilderState(decoded, emissionFrequency, currentlyEmittedElementNumber); + return result; + } + + @Override + public @UnknownKeyFor @NonNull @Initialized List< + ? extends + @UnknownKeyFor @NonNull @Initialized Coder<@UnknownKeyFor @NonNull @Initialized ?>> + getCoderArguments() { + return ImmutableList.of(); + } + + @Override + public void verifyDeterministic() {} + + @Override + public boolean consistentWithEquals() { + return true; + } + + @Override + public @UnknownKeyFor @NonNull @Initialized Object structuralValue(StringBuilderState value) { + return super.structuralValue(value); + } + } +} diff --git a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringEventExaminer.java b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringEventExaminer.java new file mode 100644 index 0000000000000..7cf9a6e70572d --- /dev/null +++ b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringEventExaminer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ordered; + +/** Test event examiner. */ +class StringEventExaminer implements EventExaminer { + + public static final String LAST_INPUT = "z"; + private final long initialSequence; + private final int emissionFrequency; + + public StringEventExaminer(long initialSequence, int emissionFrequency) { + this.initialSequence = initialSequence; + this.emissionFrequency = emissionFrequency; + } + + @Override + public boolean isInitialEvent(long sequenceNumber, String input) { + return sequenceNumber == initialSequence; + } + + @Override + public StringBuilderState createStateOnInitialEvent(String input) { + return new StringBuilderState(input, emissionFrequency); + } + + @Override + public boolean isLastEvent(long sequenceNumber, String input) { + return input.equals(LAST_INPUT); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 712d230271d2e..02ea0097dcad5 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -185,6 +185,7 @@ include(":sdks:java:extensions:google-cloud-platform-core") include(":sdks:java:extensions:jackson") include(":sdks:java:extensions:join-library") include(":sdks:java:extensions:ml") +include(":sdks:java:extensions:ordered") include(":sdks:java:extensions:protobuf") include(":sdks:java:extensions:python") include(":sdks:java:extensions:sbe") From 5e68598eba24e55688f950d2fccc1b58776b29b3 Mon Sep 17 00:00:00 2001 From: Sergei Lilichenko Date: Tue, 26 Mar 2024 16:05:29 -0700 Subject: [PATCH 2/7] Address PR comments. --- .../sdk/extensions/ordered/EventExaminer.java | 2 +- .../sdk/extensions/ordered/MutableState.java | 3 - .../ordered/OrderedEventProcessor.java | 19 +-- .../ordered/OrderedProcessingHandler.java | 122 ++++++++++++++++-- 4 files changed, 119 insertions(+), 27 deletions(-) diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/EventExaminer.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/EventExaminer.java index b434269a475ab..1e4fe75655178 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/EventExaminer.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/EventExaminer.java @@ -56,7 +56,7 @@ public interface EventExaminer> * @param sequenceNumber of the event * @param event being processed * @return true if the last event. There are cases where it's impossible to know whether it's the - * last event. True should be returned in those cases. + * last event. False should be returned in those cases. */ boolean isLastEvent(long sequenceNumber, EventT event); } diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/MutableState.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/MutableState.java index 13891cbaec728..84c93e649ca9f 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/MutableState.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/MutableState.java @@ -26,9 +26,6 @@ public interface MutableState extends Serializable { * The interface assumes that events will mutate the state without the possibility of throwing an * error. * - *

    TODO: this might be too simplistic and a mechanism for failure of applying the event to a - * state would need to be created. - * * @param event to be processed */ void mutate(EventT event); diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java index 5dcaa3f949ca3..657e90b417266 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java @@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; @@ -143,9 +142,7 @@ public OrderedEventProcessorResult expand( handler.getStatusUpdateFrequency(), unprocessedEventOutput, handler.isProduceStatusUpdateOnEveryEvent(), - input.isBounded() == IsBounded.BOUNDED - ? Integer.MAX_VALUE - : handler.getMaxOutputElementsPerBundle())) + handler.getMaxOutputElementsPerBundle())) .withOutputTags( mainOutput, TupleTagList.of(Arrays.asList(statusOutput, unprocessedEventOutput)))); @@ -442,13 +439,7 @@ private StateTypeT processNewEvent( // First event of the key/window // What if it's a duplicate event - it will reset everything. Shall we drop/DLQ anything // that's before the processingState.lastOutputSequence? - try { - state = eventExaminer.createStateOnInitialEvent(currentEvent); - } catch (Exception e) { - // TODO: Handle exception in a better way - DLQ. - // Initial state creator can be pretty heavy - remote calls, etc.. - throw new RuntimeException(e); - } + state = eventExaminer.createStateOnInitialEvent(currentEvent); processingState.eventAccepted(currentSequence, thisIsTheLastEvent); @@ -503,7 +494,7 @@ private void processBufferedEvents( return; } - if (exceededMaxResultCountForBundle(processingState, largeBatchEmissionTimer)) { + if (reachedMaxResultCountForBundle(processingState, largeBatchEmissionTimer)) { // No point in trying to process buffered events return; } @@ -544,7 +535,7 @@ private void processBufferedEvents( // This check needs to be done after we checked for sequence gap and before we // attempt to process the next element which can result in a new result. - if (exceededMaxResultCountForBundle(processingState, largeBatchEmissionTimer)) { + if (reachedMaxResultCountForBundle(processingState, largeBatchEmissionTimer)) { endClearRange = Instant.ofEpochMilli(eventSequence); break; } @@ -563,7 +554,7 @@ private void processBufferedEvents( bufferedEventsState.clearRange(startRange, endClearRange); } - private boolean exceededMaxResultCountForBundle( + private boolean reachedMaxResultCountForBundle( ProcessingState processingState, Timer largeBatchEmissionTimer) { boolean exceeded = processingState.resultsProducedInBundle(numberOfResultsBeforeBundleStart) diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java index 8409d0555d42a..2e4e7211f07d0 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java @@ -24,13 +24,22 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.values.KV; import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; +/** + * Parent class for Ordered Processing configuration handlers. + * + * @param type of events to be processed + * @param type of keys which will be used to group the events + * @param type of internal State which will be used for processing + * @param type of the result of the processing which will be output + */ public abstract class OrderedProcessingHandler< EventT, KeyT, StateT extends MutableState, ResultT> implements Serializable { - public static final int DEFAULT_STATUS_UPDATE_FREQUENCY_SECONDS = 5; + private static final int DEFAULT_STATUS_UPDATE_FREQUENCY_SECONDS = 5; private static final boolean DEFAULT_PRODUCE_STATUS_UPDATE_ON_EVERY_EVENT = false; public static final int DEFAULT_MAX_ELEMENTS_TO_OUTPUT = 10_000; @@ -44,6 +53,14 @@ public abstract class OrderedProcessingHandler< Duration.standardSeconds(DEFAULT_STATUS_UPDATE_FREQUENCY_SECONDS); private boolean produceStatusUpdateOnEveryEvent = DEFAULT_PRODUCE_STATUS_UPDATE_ON_EVERY_EVENT; + /** + * Provide concrete classes which will be used by the ordered processing transform. + * + * @param eventTClass class of the events + * @param keyTClass class of the keys + * @param stateTClass class of the state + * @param resultTClass class of the results + */ public OrderedProcessingHandler( Class eventTClass, Class keyTClass, @@ -55,8 +72,24 @@ public OrderedProcessingHandler( this.resultTClass = resultTClass; } + /** + * @return the event examiner instance which will be used by the transform. + */ public abstract @NonNull EventExaminer getEventExaminer(); + /** + * Provide the event coder. + *

    + * The default implementation of the method will use the event coder from the input PCollection. + * If the input PCollection doesn't use KVCoder, it will attempt to get the coder from + * the pipeline's coder registry. + * + * @param pipeline of the transform + * @param inputCoder input coder of the transform + * @return event coder + * @throws CannotProvideCoderException if the method can't determine the coder based on + * the above algorithm. + */ public @NonNull Coder getEventCoder( Pipeline pipeline, Coder>> inputCoder) throws CannotProvideCoderException { @@ -70,11 +103,31 @@ public OrderedProcessingHandler( return pipeline.getCoderRegistry().getCoder(eventTClass); } - public Coder getStateCoder(Pipeline pipeline) throws CannotProvideCoderException { + /** + * Provide the state coder. + *

    + * The default implementation will attempt to get the coder from the pipeline's code registry. + * @param pipeline of the transform + * @return the state coder + * @throws CannotProvideCoderException + */ + public @NonNull Coder getStateCoder(Pipeline pipeline) throws CannotProvideCoderException { return pipeline.getCoderRegistry().getCoder(stateTClass); } - public Coder getKeyCoder(Pipeline pipeline, Coder>> inputCoder) + /** + * Provide the key coder. + *

    + * The default implementation of the method will use the event coder from the input PCollection. + * If the input PCollection doesn't use KVCoder, it will attempt to get the coder from + * the pipeline's coder registry. + * @param pipeline + * @param inputCoder + * @return + * @throws CannotProvideCoderException if the method can't determine the coder based on + * the above algorithm. + */ + public @NonNull Coder getKeyCoder(Pipeline pipeline, Coder>> inputCoder) throws CannotProvideCoderException { if (KvCoder.class.isAssignableFrom(inputCoder.getClass())) { return ((KvCoder>) inputCoder).getKeyCoder(); @@ -82,31 +135,82 @@ public Coder getKeyCoder(Pipeline pipeline, Coder getResultCoder(Pipeline pipeline) throws CannotProvideCoderException { + /** + * Provide the result coder. + *

    + * The default implementation will attempt to get the coder from the pipeline's code registry. + * @param pipeline + * @return result coder + * @throws CannotProvideCoderException + */ + public @NonNull Coder getResultCoder(Pipeline pipeline) throws CannotProvideCoderException { return pipeline.getCoderRegistry().getCoder(resultTClass); } - public Duration getStatusUpdateFrequency() { + /** + * Determines the frequency of emission of the {@link OrderedProcessingStatus} elements. + *

    + * Default is 5 seconds. + * @return the frequency of updates. If null is returned, no updates will be emitted on a + * scheduled basis. + * + + */ + public @Nullable Duration getStatusUpdateFrequency() { return statusUpdateFrequency; } + /** + * Changes the default status update frequency. Updates will be disabled if set to null. + * @param statusUpdateFrequency + */ public void setStatusUpdateFrequency(Duration statusUpdateFrequency) { this.statusUpdateFrequency = statusUpdateFrequency; } + /** + * Indicates if the status update needs to be sent after each event's processing. + *

    + * Default is false. + * + * @return + * @see OrderedProcessingHandler#getStatusUpdateFrequency() getStatusUpdateFrequency + * @see OrderedEventProcessorResult#processingStatuses() PCollection of processing statuses + */ public boolean isProduceStatusUpdateOnEveryEvent() { return produceStatusUpdateOnEveryEvent; } + /** + * Sets the indicator of whether the status notification needs to be produced on every event. + * + * @param value + */ + public void setProduceStatusUpdateOnEveryEvent(boolean value) { + this.produceStatusUpdateOnEveryEvent = value; + } + + /** + * Returns the maximum number of elements which will be output per each bundle. The default is + * 10,000 elements. + *

    + * This is used to limit the amount of data produced for each bundle - many runners have + * limitations on how much data can be output from a single bundle. If many events arrive out + * of sequence and are buffered then at some point a single event can cause processing of a large + * number of buffered events. + * + * @return + */ public int getMaxOutputElementsPerBundle() { return maxOutputElementsPerBundle; } + /** + * Overrides the default value. + * + * @param maxOutputElementsPerBundle + */ public void setMaxOutputElementsPerBundle(int maxOutputElementsPerBundle) { this.maxOutputElementsPerBundle = maxOutputElementsPerBundle; } - - public void setProduceStatusUpdateOnEveryEvent(boolean value) { - this.produceStatusUpdateOnEveryEvent = value; - } } From 929ea4078be87b45c1cebfbe0f550bb4c3896a8e Mon Sep 17 00:00:00 2001 From: Sergei Lilichenko Date: Tue, 26 Mar 2024 16:06:36 -0700 Subject: [PATCH 3/7] Address PR comments. --- .../ordered/OrderedProcessingHandler.java | 70 ++++++++++--------- 1 file changed, 37 insertions(+), 33 deletions(-) diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java index 2e4e7211f07d0..444fdb118091b 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingHandler.java @@ -72,23 +72,21 @@ public OrderedProcessingHandler( this.resultTClass = resultTClass; } - /** - * @return the event examiner instance which will be used by the transform. - */ + /** @return the event examiner instance which will be used by the transform. */ public abstract @NonNull EventExaminer getEventExaminer(); /** * Provide the event coder. - *

    - * The default implementation of the method will use the event coder from the input PCollection. - * If the input PCollection doesn't use KVCoder, it will attempt to get the coder from - * the pipeline's coder registry. + * + *

    The default implementation of the method will use the event coder from the input + * PCollection. If the input PCollection doesn't use KVCoder, it will attempt to get the coder + * from the pipeline's coder registry. * * @param pipeline of the transform * @param inputCoder input coder of the transform * @return event coder - * @throws CannotProvideCoderException if the method can't determine the coder based on - * the above algorithm. + * @throws CannotProvideCoderException if the method can't determine the coder based on the above + * algorithm. */ public @NonNull Coder getEventCoder( Pipeline pipeline, Coder>> inputCoder) @@ -105,29 +103,33 @@ public OrderedProcessingHandler( /** * Provide the state coder. - *

    - * The default implementation will attempt to get the coder from the pipeline's code registry. + * + *

    The default implementation will attempt to get the coder from the pipeline's code registry. + * * @param pipeline of the transform * @return the state coder * @throws CannotProvideCoderException */ - public @NonNull Coder getStateCoder(Pipeline pipeline) throws CannotProvideCoderException { + public @NonNull Coder getStateCoder(Pipeline pipeline) + throws CannotProvideCoderException { return pipeline.getCoderRegistry().getCoder(stateTClass); } /** * Provide the key coder. - *

    - * The default implementation of the method will use the event coder from the input PCollection. - * If the input PCollection doesn't use KVCoder, it will attempt to get the coder from - * the pipeline's coder registry. + * + *

    The default implementation of the method will use the event coder from the input + * PCollection. If the input PCollection doesn't use KVCoder, it will attempt to get the coder + * from the pipeline's coder registry. + * * @param pipeline * @param inputCoder * @return - * @throws CannotProvideCoderException if the method can't determine the coder based on - * the above algorithm. + * @throws CannotProvideCoderException if the method can't determine the coder based on the above + * algorithm. */ - public @NonNull Coder getKeyCoder(Pipeline pipeline, Coder>> inputCoder) + public @NonNull Coder getKeyCoder( + Pipeline pipeline, Coder>> inputCoder) throws CannotProvideCoderException { if (KvCoder.class.isAssignableFrom(inputCoder.getClass())) { return ((KvCoder>) inputCoder).getKeyCoder(); @@ -137,24 +139,25 @@ public OrderedProcessingHandler( /** * Provide the result coder. - *

    - * The default implementation will attempt to get the coder from the pipeline's code registry. + * + *

    The default implementation will attempt to get the coder from the pipeline's code registry. + * * @param pipeline * @return result coder * @throws CannotProvideCoderException */ - public @NonNull Coder getResultCoder(Pipeline pipeline) throws CannotProvideCoderException { + public @NonNull Coder getResultCoder(Pipeline pipeline) + throws CannotProvideCoderException { return pipeline.getCoderRegistry().getCoder(resultTClass); } /** * Determines the frequency of emission of the {@link OrderedProcessingStatus} elements. - *

    - * Default is 5 seconds. - * @return the frequency of updates. If null is returned, no updates will be emitted on a - * scheduled basis. * - + *

    Default is 5 seconds. + * + * @return the frequency of updates. If null is returned, no updates will be emitted on a + * scheduled basis. */ public @Nullable Duration getStatusUpdateFrequency() { return statusUpdateFrequency; @@ -162,6 +165,7 @@ public OrderedProcessingHandler( /** * Changes the default status update frequency. Updates will be disabled if set to null. + * * @param statusUpdateFrequency */ public void setStatusUpdateFrequency(Duration statusUpdateFrequency) { @@ -170,8 +174,8 @@ public void setStatusUpdateFrequency(Duration statusUpdateFrequency) { /** * Indicates if the status update needs to be sent after each event's processing. - *

    - * Default is false. + * + *

    Default is false. * * @return * @see OrderedProcessingHandler#getStatusUpdateFrequency() getStatusUpdateFrequency @@ -193,10 +197,10 @@ public void setProduceStatusUpdateOnEveryEvent(boolean value) { /** * Returns the maximum number of elements which will be output per each bundle. The default is * 10,000 elements. - *

    - * This is used to limit the amount of data produced for each bundle - many runners have - * limitations on how much data can be output from a single bundle. If many events arrive out - * of sequence and are buffered then at some point a single event can cause processing of a large + * + *

    This is used to limit the amount of data produced for each bundle - many runners have + * limitations on how much data can be output from a single bundle. If many events arrive out of + * sequence and are buffered then at some point a single event can cause processing of a large * number of buffered events. * * @return From 931c3508eefe9b4ade20f4d4d9bb5390933954aa Mon Sep 17 00:00:00 2001 From: Sergei Lilichenko Date: Tue, 26 Mar 2024 16:28:50 -0700 Subject: [PATCH 4/7] Added JavaDocs to OrderedProcessingStatus.java --- .../ordered/OrderedProcessingStatus.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingStatus.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingStatus.java index cb1f7b4c423a2..6659bd2e2b922 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingStatus.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedProcessingStatus.java @@ -51,25 +51,47 @@ public static OrderedProcessingStatus create( .build(); } + /** + * @return Last sequence processed. If null is returned - no elements for the given key and window + * have been processed yet. + */ @Nullable public abstract Long getLastProcessedSequence(); + /** @return Number of events received out of sequence and buffered. */ public abstract long getNumberOfBufferedEvents(); + /** @return Earliest buffered sequence. If null is returned - there are no buffered events. */ @Nullable public abstract Long getEarliestBufferedSequence(); + /** @return Latest buffered sequence. If null is returned - there are no buffered events. */ @Nullable public abstract Long getLatestBufferedSequence(); + /** @return Total number of events received for the given key and window. */ public abstract long getNumberOfReceivedEvents(); + /** + * @return Number of duplicate events which were output in {@link + * OrderedEventProcessorResult#unprocessedEvents()} PCollection + */ public abstract long getDuplicateCount(); + /** @return Number of output results produced. */ public abstract long getResultCount(); + /** + * @return Indicator that the last event for the given key and window has been received. It + * doesn't necessarily mean that all the events for the given key and window have been + * processed. Use {@link OrderedProcessingStatus#getNumberOfBufferedEvents()} == 0 and this + * indicator as the sign that the processing is complete. + */ public abstract boolean isLastEventReceived(); + /** + * @return Timestamp of when the status was produced. It is not related to the event's timestamp. + */ public abstract Instant getStatusDate(); @Override From 1fa5f9ae27f2b0500695ee2f98923f506748bf31 Mon Sep 17 00:00:00 2001 From: Sergei Lilichenko Date: Wed, 27 Mar 2024 11:44:28 -0700 Subject: [PATCH 5/7] Added batch tests. Added DLQ for events with the sequence outside of the valid range. --- .../ordered/OrderedEventProcessor.java | 16 +- .../extensions/ordered/UnprocessedEvent.java | 3 +- .../ordered/OrderedEventProcessorTest.java | 147 ++++++++++++++---- 3 files changed, 127 insertions(+), 39 deletions(-) diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java index 657e90b417266..8fb16605382a2 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java @@ -414,11 +414,17 @@ private StateTypeT processNewEvent( OrderedListState bufferedEventsState, MultiOutputReceiver outputReceiver) { if (currentSequence == Long.MAX_VALUE) { - LOG.error( - "Received an event with " - + currentSequence - + " as the sequence number. " - + "It will be dropped because it needs to be less than Long.MAX_VALUE."); + // OrderedListState can't handle the timestamp based on MAX_VALUE. + // To avoid exceptions, we DLQ this event. + outputReceiver + .get(unprocessedEventsTupleTag) + .output( + KV.of( + processingState.getKey(), + KV.of( + currentSequence, + UnprocessedEvent.create( + currentEvent, Reason.sequence_id_outside_valid_range)))); return null; } diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/UnprocessedEvent.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/UnprocessedEvent.java index b1a8a3699fc63..987571483f6ab 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/UnprocessedEvent.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/UnprocessedEvent.java @@ -35,7 +35,8 @@ public static UnprocessedEvent create(EventT event, Reason reas public enum Reason { duplicate, - buffered + buffered, + sequence_id_outside_valid_range }; public abstract EventT getEvent(); diff --git a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java index 8a71551527b3e..7383bd54d9e09 100644 --- a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java +++ b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java @@ -22,15 +22,20 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.extensions.ordered.UnprocessedEvent.Reason; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Rule; @@ -55,7 +60,10 @@ public class OrderedEventProcessorTest { public static final int LARGE_MAX_RESULTS_PER_OUTPUT = 1000; public static final int EMISSION_FREQUENCY_ON_EVERY_OTHER_EVENT = 2; public static final boolean PRODUCE_STATUS_ON_EVERY_EVENT = true; - @Rule public final transient TestPipeline p = TestPipeline.create(); + public static final boolean STREAMING = true; + public static final boolean BATCH = false; + @Rule public final transient TestPipeline streamingPipeline = TestPipeline.create(); + @Rule public final transient TestPipeline batchPipeline = TestPipeline.create(); static class MapEventsToKV extends DoFn>> { @@ -122,7 +130,7 @@ public void testPerfectOrderingProcessing() throws CannotProvideCoderException { expectedOutput.add(KV.of("id-2", "a")); expectedOutput.add(KV.of("id-2", "ab")); - testStreamingProcessing( + testProcessing( events, expectedStatuses, expectedOutput, @@ -183,7 +191,7 @@ public void testOutOfSequenceProcessing() throws CannotProvideCoderException { expectedOutput.add(KV.of("id-2", "abcd")); expectedOutput.add(KV.of("id-2", "abcde")); - testStreamingProcessing( + testProcessing( events, expectedStatuses, expectedOutput, @@ -215,7 +223,7 @@ public void testUnfinishedProcessing() throws CannotProvideCoderException { expectedOutput.add(KV.of("id-2", "a")); expectedOutput.add(KV.of("id-2", "ab")); - testStreamingProcessing(events, expectedStatuses, expectedOutput, 1, 0, 1000, false); + testProcessing(events, expectedStatuses, expectedOutput, 1, 0, 1000, false); } @Test @@ -255,7 +263,7 @@ public void testHandlingOfDuplicateSequences() throws CannotProvideCoderExceptio duplicates.add(KV.of("id-1", KV.of(1L, UnprocessedEvent.create("b", Reason.duplicate)))); duplicates.add(KV.of("id-1", KV.of(3L, UnprocessedEvent.create("d", Reason.duplicate)))); - testStreamingProcessing( + testProcessing( events, expectedStatuses, expectedOutput, @@ -290,7 +298,7 @@ public void testProcessingWithEveryOtherResultEmission() throws CannotProvideCod // Skipped KV.of("id-1", "abcd"), expectedOutput.add(KV.of("id-2", "a")); // Skipped KV.of("id-2", "ab") - testStreamingProcessing( + testProcessing( events, expectedStatuses, expectedOutput, @@ -372,7 +380,7 @@ public void testLargeBufferedOutputInTimer() throws CannotProvideCoderException 0, false))); - testStreamingProcessing( + testProcessing( events.toArray(new Event[events.size()]), expectedStatuses, expectedOutput, @@ -467,7 +475,7 @@ public void testSequenceGapProcessingInBufferedOutput() throws CannotProvideCode OrderedProcessingStatus.create( 10L, 0, null, null, numberOfReceivedEvents, 10L, 0, false))); - testStreamingProcessing( + testProcessing( events.toArray(new Event[events.size()]), expectedStatuses, expectedOutput, @@ -493,10 +501,20 @@ public void testHandlingOfMaxSequenceNumber() throws CannotProvideCoderException expectedOutput.add(KV.of("id-1", "a")); expectedOutput.add(KV.of("id-1", "ab")); - testStreamingProcessing( + Collection>>> unprocessedEvents = + new ArrayList<>(); + unprocessedEvents.add( + KV.of( + "id-1", + KV.of( + Long.MAX_VALUE, + UnprocessedEvent.create("c", Reason.sequence_id_outside_valid_range)))); + + testProcessing( events, expectedStatuses, expectedOutput, + unprocessedEvents, EMISSION_FREQUENCY_ON_EVERY_ELEMENT, INITIAL_SEQUENCE_OF_0, LARGE_MAX_RESULTS_PER_OUTPUT, @@ -523,10 +541,17 @@ public void testProcessingOfTheLastInput() throws CannotProvideCoderException { expectedOutput.add(KV.of("id-1", "ab")); expectedOutput.add(KV.of("id-1", "ab" + StringEventExaminer.LAST_INPUT)); - testStreamingProcessing(events, expectedStatuses, expectedOutput, 1, 0, 1000, false); + testProcessing( + events, + expectedStatuses, + expectedOutput, + EMISSION_FREQUENCY_ON_EVERY_ELEMENT, + INITIAL_SEQUENCE_OF_0, + LARGE_MAX_RESULTS_PER_OUTPUT, + DONT_PRODUCE_STATUS_ON_EVERY_EVENT); } - private void testStreamingProcessing( + private void testProcessing( Event[] events, Collection> expectedStatuses, Collection> expectedOutput, @@ -535,45 +560,69 @@ private void testStreamingProcessing( int maxResultsPerOutput, boolean produceStatusOnEveryEvent) throws CannotProvideCoderException { - testStreamingProcessing( + testProcessing( events, expectedStatuses, expectedOutput, - Collections.emptySet() /* no duplicates */, + Collections.emptySet() /* no events in DLQ */, emissionFrequency, initialSequence, maxResultsPerOutput, produceStatusOnEveryEvent); } - private void testStreamingProcessing( + private void testProcessing( Event[] events, Collection> expectedStatuses, Collection> expectedOutput, - Collection>>> expectedDuplicates, + Collection>>> expectedUnprocessedEvents, int emissionFrequency, long initialSequence, int maxResultsPerOutput, boolean produceStatusOnEveryEvent) throws CannotProvideCoderException { - Instant now = Instant.now().minus(Duration.standardMinutes(20)); - TestStream.Builder messageFlow = - TestStream.create(p.getCoderRegistry().getCoder(Event.class)).advanceWatermarkTo(now); + doTest( + events, + expectedStatuses, + expectedOutput, + expectedUnprocessedEvents, + emissionFrequency, + initialSequence, + maxResultsPerOutput, + produceStatusOnEveryEvent, + STREAMING); + doTest( + events, + expectedStatuses, + expectedOutput, + expectedUnprocessedEvents, + emissionFrequency, + initialSequence, + maxResultsPerOutput, + produceStatusOnEveryEvent, + BATCH); + } - int delayInMilliseconds = 0; - for (Event e : events) { - messageFlow = - messageFlow - .advanceWatermarkTo(now.plus(Duration.millis(++delayInMilliseconds))) - .addElements(e); - } + private void doTest( + Event[] events, + Collection> expectedStatuses, + Collection> expectedOutput, + Collection>>> expectedDuplicates, + int emissionFrequency, + long initialSequence, + int maxResultsPerOutput, + boolean produceStatusOnEveryEvent, + boolean streaming) + throws @UnknownKeyFor @NonNull @Initialized CannotProvideCoderException { - // Needed to force the processing time based timers. - messageFlow = messageFlow.advanceProcessingTime(Duration.standardMinutes(15)); + Pipeline pipeline = streaming ? streamingPipeline : batchPipeline; + PCollection rawInput = + streaming + ? createStreamingPCollection(pipeline, events) + : createBatchPCollection(pipeline, events); PCollection>> input = - p.apply("Create Events", messageFlow.advanceWatermarkToInfinity()) - .apply("To KV", ParDo.of(new MapEventsToKV())); + rawInput.apply("To KV", ParDo.of(new MapEventsToKV())); StringBufferOrderedProcessingHandler handler = new StringBufferOrderedProcessingHandler(emissionFrequency, initialSequence); @@ -584,7 +633,8 @@ private void testStreamingProcessing( // update is needed. handler.setStatusUpdateFrequency(null); } else { - handler.setStatusUpdateFrequency(Duration.standardMinutes(5)); + handler.setStatusUpdateFrequency( + streaming ? Duration.standardMinutes(5) : Duration.standardSeconds(1)); } OrderedEventProcessor orderedEventProcessor = OrderedEventProcessor.create(handler); @@ -594,12 +644,43 @@ private void testStreamingProcessing( PAssert.that("Output matches", processingResult.output()).containsInAnyOrder(expectedOutput); - PAssert.that("Statuses match", processingResult.processingStatuses()) - .containsInAnyOrder(expectedStatuses); + if (streaming) { + // Only in streaming the events will arrive in a pre-determined order and the statuses + // will be deterministic. In batch events can be processed in any order, so we skip status + // verification and rely on the output and unprocessed event matches. + PAssert.that("Statuses match", processingResult.processingStatuses()) + .containsInAnyOrder(expectedStatuses); + } PAssert.that("Unprocessed events match", processingResult.unprocessedEvents()) .containsInAnyOrder(expectedDuplicates); - p.run(); + pipeline.run(); + } + + private @UnknownKeyFor @NonNull @Initialized PCollection createBatchPCollection( + Pipeline pipeline, Event[] events) { + return pipeline.apply("Create Batch Events", Create.of(Arrays.asList(events))); + } + + private @UnknownKeyFor @NonNull @Initialized PCollection createStreamingPCollection( + Pipeline pipeline, Event[] events) + throws @UnknownKeyFor @NonNull @Initialized CannotProvideCoderException { + Instant now = Instant.now().minus(Duration.standardMinutes(20)); + TestStream.Builder messageFlow = + TestStream.create(pipeline.getCoderRegistry().getCoder(Event.class)) + .advanceWatermarkTo(now); + + int delayInMilliseconds = 0; + for (Event e : events) { + messageFlow = + messageFlow + .advanceWatermarkTo(now.plus(Duration.millis(++delayInMilliseconds))) + .addElements(e); + } + + // Needed to force the processing time based timers. + messageFlow = messageFlow.advanceProcessingTime(Duration.standardMinutes(15)); + return pipeline.apply("Create Streaming Events", messageFlow.advanceWatermarkToInfinity()); } } From cee173ac791456a35bd34206a8fb0d0b7ea8230a Mon Sep 17 00:00:00 2001 From: Sergei Lilichenko Date: Wed, 27 Mar 2024 18:12:12 -0700 Subject: [PATCH 6/7] Added tests for windowed input. Added references to the unresolved TODO's captured as Beam's issues. --- .../ordered/OrderedEventProcessor.java | 7 +- .../ordered/OrderedEventProcessorTest.java | 124 ++++++++++++++++-- 2 files changed, 117 insertions(+), 14 deletions(-) diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java index 8fb16605382a2..f409199d53bf6 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java @@ -372,9 +372,7 @@ private void saveStates( if (produceStatusUpdateOnEveryEvent) { // During pipeline draining the window timestamp is set to a large value in the future. // Producing an event before that results in error, that's why this logic exist. - Instant statusTimestamp = Instant.now(); - statusTimestamp = - statusTimestamp.isAfter(windowTimestamp) ? statusTimestamp : windowTimestamp; + Instant statusTimestamp = windowTimestamp; emitProcessingStatus(processingStatus, outputReceiver, statusTimestamp); } @@ -570,10 +568,9 @@ private boolean reachedMaxResultCountForBundle( "Setting the timer to output next batch of events for key '" + processingState.getKey() + "'"); - // TODO: this work fine for global windows. Need to check what happens for other types of - // windows. // See GroupIntoBatches for examples on how to hold the timestamp. // TODO: test that on draining the pipeline all the results are still produced correctly. + // See: https://github.com/apache/beam/issues/30781 largeBatchEmissionTimer.offset(Duration.millis(1)).setRelative(); } return exceeded; diff --git a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java index 7383bd54d9e09..72e48c29530f2 100644 --- a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java +++ b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.extensions.ordered.UnprocessedEvent.Reason; @@ -31,8 +32,13 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; @@ -46,9 +52,6 @@ /** * Ordered Processing tests use the same testing scenario. Events are sent in or out of sequence. * Each event is a string for a particular key. The output is a concatenation of all strings. - * - *

    TODO: add tests for outputting buffered events in case of drainage. TODO: add batch processing - * in parallel with streaming processing */ @RunWith(JUnit4.class) public class OrderedEventProcessorTest { @@ -62,6 +65,8 @@ public class OrderedEventProcessorTest { public static final boolean PRODUCE_STATUS_ON_EVERY_EVENT = true; public static final boolean STREAMING = true; public static final boolean BATCH = false; + public static final Set>>> NO_EXPECTED_DLQ_EVENTS = + Collections.emptySet(); @Rule public final transient TestPipeline streamingPipeline = TestPipeline.create(); @Rule public final transient TestPipeline batchPipeline = TestPipeline.create(); @@ -551,6 +556,92 @@ public void testProcessingOfTheLastInput() throws CannotProvideCoderException { DONT_PRODUCE_STATUS_ON_EVERY_EVENT); } + @Test + public void testWindowedProcessing() throws CannotProvideCoderException { + + Instant base = new Instant(0); + TestStream values = + TestStream.create(streamingPipeline.getCoderRegistry().getCoder(Event.class)) + .advanceWatermarkTo(base) + .addElements( + // Start of first window + TimestampedValue.of( + Event.create(0, "id-1", "a"), base.plus(Duration.standardSeconds(1))), + TimestampedValue.of( + Event.create(1, "id-1", "b"), base.plus(Duration.standardSeconds(2))), + TimestampedValue.of( + Event.create(0, "id-2", "x"), base.plus(Duration.standardSeconds(1))), + TimestampedValue.of( + Event.create(1, "id-2", "y"), base.plus(Duration.standardSeconds(2))), + TimestampedValue.of( + Event.create(2, "id-2", "z"), base.plus(Duration.standardSeconds(2))), + + // Start of second window. Numbering must start with 0 again. + TimestampedValue.of( + Event.create(0, "id-1", "c"), base.plus(Duration.standardSeconds(10))), + TimestampedValue.of( + Event.create(1, "id-1", "d"), base.plus(Duration.standardSeconds(11)))) + .advanceWatermarkToInfinity(); + + Pipeline pipeline = streamingPipeline; + + PCollection rawInput = pipeline.apply("Create Streaming Events", values); + PCollection>> input = + rawInput.apply("To KV", ParDo.of(new MapEventsToKV())); + + input = input.apply("Window input", Window.into(FixedWindows.of(Duration.standardSeconds(5)))); + + StringBufferOrderedProcessingHandler handler = + new StringBufferOrderedProcessingHandler( + EMISSION_FREQUENCY_ON_EVERY_ELEMENT, INITIAL_SEQUENCE_OF_0); + handler.setMaxOutputElementsPerBundle(LARGE_MAX_RESULTS_PER_OUTPUT); + handler.setStatusUpdateFrequency(null); + handler.setProduceStatusUpdateOnEveryEvent(true); + + OrderedEventProcessor orderedEventProcessor = + OrderedEventProcessor.create(handler); + + OrderedEventProcessorResult processingResult = + input.apply("Process Events", orderedEventProcessor); + + IntervalWindow window1 = new IntervalWindow(base, base.plus(Duration.standardSeconds(5))); + PAssert.that("Output matches in window 1", processingResult.output()) + .inWindow(window1) + .containsInAnyOrder( + KV.of("id-1", "a"), + KV.of("id-1", "ab"), + KV.of("id-2", "x"), + KV.of("id-2", "xy"), + KV.of("id-2", "xyz")); + + IntervalWindow window2 = + new IntervalWindow( + base.plus(Duration.standardSeconds(10)), base.plus(Duration.standardSeconds(15))); + PAssert.that("Output matches in window 2", processingResult.output()) + .inWindow(window2) + .containsInAnyOrder(KV.of("id-1", "c"), KV.of("id-1", "cd")); + + PAssert.that("Statuses match in window 1", processingResult.processingStatuses()) + .inWindow(window1) + .containsInAnyOrder( + KV.of("id-1", OrderedProcessingStatus.create(0L, 0, null, null, 1, 1, 0, false)), + KV.of("id-1", OrderedProcessingStatus.create(1L, 0, null, null, 2, 2, 0, false)), + KV.of("id-2", OrderedProcessingStatus.create(0L, 0, null, null, 1, 1, 0, false)), + KV.of("id-2", OrderedProcessingStatus.create(1L, 0, null, null, 2, 2, 0, false)), + KV.of("id-2", OrderedProcessingStatus.create(2L, 0, null, null, 3, 3, 0, false))); + + PAssert.that("Statuses match in window 2", processingResult.processingStatuses()) + .inWindow(window2) + .containsInAnyOrder( + KV.of("id-1", OrderedProcessingStatus.create(0L, 0, null, null, 1, 1, 0, false)), + KV.of("id-1", OrderedProcessingStatus.create(1L, 0, null, null, 2, 2, 0, false))); + + PAssert.that("Unprocessed events match", processingResult.unprocessedEvents()) + .containsInAnyOrder(NO_EXPECTED_DLQ_EVENTS); + + pipeline.run(); + } + private void testProcessing( Event[] events, Collection> expectedStatuses, @@ -564,7 +655,7 @@ private void testProcessing( events, expectedStatuses, expectedOutput, - Collections.emptySet() /* no events in DLQ */, + NO_EXPECTED_DLQ_EVENTS, emissionFrequency, initialSequence, maxResultsPerOutput, @@ -603,6 +694,20 @@ private void testProcessing( BATCH); } + /** + * The majority of the tests use this method. Testing is done in the global window. + * + * @param events + * @param expectedStatuses + * @param expectedOutput + * @param expectedDuplicates + * @param emissionFrequency + * @param initialSequence + * @param maxResultsPerOutput + * @param produceStatusOnEveryEvent + * @param streaming + * @throws @UnknownKeyFor @NonNull @Initialized CannotProvideCoderException + */ private void doTest( Event[] events, Collection> expectedStatuses, @@ -629,8 +734,7 @@ private void doTest( handler.setMaxOutputElementsPerBundle(maxResultsPerOutput); if (produceStatusOnEveryEvent) { handler.setProduceStatusUpdateOnEveryEvent(true); - // This disables status updates emitted on timers. Needed for simpler testing when per event - // update is needed. + // This disables status updates emitted on timers. handler.setStatusUpdateFrequency(null); } else { handler.setStatusUpdateFrequency( @@ -646,8 +750,8 @@ private void doTest( if (streaming) { // Only in streaming the events will arrive in a pre-determined order and the statuses - // will be deterministic. In batch events can be processed in any order, so we skip status - // verification and rely on the output and unprocessed event matches. + // will be deterministic. In batch pipelines events can be processed in any order, + // so we skip status verification and rely on the output and unprocessed event matches. PAssert.that("Statuses match", processingResult.processingStatuses()) .containsInAnyOrder(expectedStatuses); } @@ -660,7 +764,9 @@ private void doTest( private @UnknownKeyFor @NonNull @Initialized PCollection createBatchPCollection( Pipeline pipeline, Event[] events) { - return pipeline.apply("Create Batch Events", Create.of(Arrays.asList(events))); + return pipeline + .apply("Create Batch Events", Create.of(Arrays.asList(events))) + .apply("Reshuffle", Reshuffle.viaRandomKey()); } private @UnknownKeyFor @NonNull @Initialized PCollection createStreamingPCollection( From 8a55650f648167c4768c2b0057a15361fe5d561b Mon Sep 17 00:00:00 2001 From: Sergei Lilichenko Date: Thu, 28 Mar 2024 21:07:44 -0700 Subject: [PATCH 7/7] Added DLQ handling of checked exceptions happening during the state mutations. --- sdks/java/extensions/ordered/build.gradle | 2 +- .../sdk/extensions/ordered/MutableState.java | 4 +- .../ordered/OrderedEventProcessor.java | 34 ++++- .../extensions/ordered/ProcessingState.java | 50 +++---- .../extensions/ordered/UnprocessedEvent.java | 47 ++++++- .../ordered/OrderedEventProcessorTest.java | 122 +++++++++++++++++- .../ordered/StringBuilderState.java | 14 +- 7 files changed, 232 insertions(+), 41 deletions(-) diff --git a/sdks/java/extensions/ordered/build.gradle b/sdks/java/extensions/ordered/build.gradle index c7872c33c4a33..3c183f03162c3 100644 --- a/sdks/java/extensions/ordered/build.gradle +++ b/sdks/java/extensions/ordered/build.gradle @@ -24,7 +24,7 @@ dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation library.java.slf4j_api implementation library.java.joda_time - implementation library.java.joda_time + implementation library.java.commons_lang3 implementation library.java.vendored_guava_32_1_2_jre testImplementation library.java.junit testImplementation library.java.hamcrest diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/MutableState.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/MutableState.java index 84c93e649ca9f..3055bf7a446b0 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/MutableState.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/MutableState.java @@ -27,8 +27,10 @@ public interface MutableState extends Serializable { * error. * * @param event to be processed + * @throws Exception if a checked exception is thrown, the event will be output into {@link + * OrderedEventProcessorResult#unprocessedEvents()} with */ - void mutate(EventT event); + void mutate(EventT event) throws Exception; /** * This method is called after each state mutation. diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java index f409199d53bf6..935647c0e7e5e 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessor.java @@ -321,7 +321,7 @@ public void processElement( numberOfResultsBeforeBundleStart = processingState.getResultCount(); } - processingState.recordReceived(); + processingState.eventReceived(); StateTypeT state = processNewEvent( @@ -389,10 +389,10 @@ private void emitProcessingStatus( processingState.getKey(), OrderedProcessingStatus.create( processingState.getLastOutputSequence(), - processingState.getBufferedRecordCount(), + processingState.getBufferedEventCount(), processingState.getEarliestBufferedSequence(), processingState.getLatestBufferedSequence(), - processingState.getRecordsReceived(), + processingState.getEventsReceived(), processingState.getResultCount(), processingState.getDuplicates(), processingState.isLastEventReceived())), @@ -461,7 +461,18 @@ private StateTypeT processNewEvent( // Event matches expected sequence state = currentStateState.read(); - state.mutate(currentEvent); + try { + state.mutate(currentEvent); + } catch (Exception e) { + outputReceiver + .get(unprocessedEventsTupleTag) + .output( + KV.of( + processingState.getKey(), + KV.of(currentSequence, UnprocessedEvent.create(currentEvent, e)))); + return null; + } + ResultTypeT result = state.produceResult(); if (result != null) { outputReceiver.get(mainOutputTupleTag).output(KV.of(processingState.getKey(), result)); @@ -544,7 +555,20 @@ private void processBufferedEvents( break; } - state.mutate(bufferedEvent); + try { + state.mutate(bufferedEvent); + } catch (Exception e) { + outputReceiver + .get(unprocessedEventsTupleTag) + .output( + KV.of( + processingState.getKey(), + KV.of(eventSequence, UnprocessedEvent.create(bufferedEvent, e)))); + // There is a chance that the next event will have the same sequence number and will + // process successfully. + continue; + } + ResultTypeT result = state.produceResult(); if (result != null) { outputReceiver.get(mainOutputTupleTag).output(KV.of(processingState.getKey(), result)); diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessingState.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessingState.java index 646ead9848257..4b591a37faab8 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessingState.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/ProcessingState.java @@ -41,11 +41,11 @@ class ProcessingState { @Nullable private Long lastOutputSequence; @Nullable private Long latestBufferedSequence; @Nullable private Long earliestBufferedSequence; - private long bufferedRecordCount; + private long bufferedEventCount; private boolean lastEventReceived; - private long recordsReceived; + private long eventsReceived; private long duplicates; @@ -55,7 +55,7 @@ class ProcessingState { public ProcessingState(KeyT key) { this.key = key; - this.bufferedRecordCount = 0; + this.bufferedEventCount = 0; this.lastOutputSequence = null; this.earliestBufferedSequence = null; this.latestBufferedSequence = null; @@ -68,15 +68,15 @@ public ProcessingState(KeyT key) { * @param lastOutputSequence * @param earliestBufferedSequence * @param latestBufferedSequence - * @param bufferedRecordCount + * @param bufferedEventCount */ ProcessingState( KeyT key, @Nullable Long lastOutputSequence, @Nullable Long earliestBufferedSequence, @Nullable Long latestBufferedSequence, - long bufferedRecordCount, - long recordsReceived, + long bufferedEventCount, + long eventsReceived, long duplicates, long resultCount, boolean lastEventReceived) { @@ -84,8 +84,8 @@ public ProcessingState(KeyT key) { this.lastOutputSequence = lastOutputSequence; this.earliestBufferedSequence = earliestBufferedSequence; this.latestBufferedSequence = latestBufferedSequence; - this.bufferedRecordCount = bufferedRecordCount; - this.recordsReceived = recordsReceived; + this.bufferedEventCount = bufferedEventCount; + this.eventsReceived = eventsReceived; this.duplicates = duplicates; this.resultCount = resultCount; this.lastEventReceived = lastEventReceived; @@ -106,12 +106,12 @@ public Long getEarliestBufferedSequence() { return earliestBufferedSequence; } - public long getBufferedRecordCount() { - return bufferedRecordCount; + public long getBufferedEventCount() { + return bufferedEventCount; } - public long getRecordsReceived() { - return recordsReceived; + public long getEventsReceived() { + return eventsReceived; } public boolean isLastEventReceived() { @@ -153,7 +153,7 @@ private void setLastEventReceived(boolean lastEvent) { * @param isLastEvent */ void eventBuffered(long sequenceNumber, boolean isLastEvent) { - bufferedRecordCount++; + bufferedEventCount++; latestBufferedSequence = Math.max( sequenceNumber, @@ -172,10 +172,10 @@ void eventBuffered(long sequenceNumber, boolean isLastEvent) { * @param sequence of the processed event */ public void processedBufferedEvent(long sequence) { - bufferedRecordCount--; + bufferedEventCount--; lastOutputSequence = sequence; - if (bufferedRecordCount == 0) { + if (bufferedEventCount == 0) { earliestBufferedSequence = latestBufferedSequence = null; } else { // We don't know for sure that it's the earliest record yet, but OrderedEventProcessor will @@ -204,9 +204,9 @@ public boolean equals(@Nullable @Initialized Object o) { return false; } ProcessingState that = (ProcessingState) o; - return bufferedRecordCount == that.bufferedRecordCount + return bufferedEventCount == that.bufferedEventCount && lastEventReceived == that.lastEventReceived - && recordsReceived == that.recordsReceived + && eventsReceived == that.eventsReceived && duplicates == that.duplicates && Objects.equals(lastOutputSequence, that.lastOutputSequence) && Objects.equals(latestBufferedSequence, that.latestBufferedSequence) @@ -221,20 +221,20 @@ public int hashCode() { lastOutputSequence, latestBufferedSequence, earliestBufferedSequence, - bufferedRecordCount, + bufferedEventCount, lastEventReceived, - recordsReceived, + eventsReceived, duplicates, resultCount, key); } public boolean isProcessingCompleted() { - return lastEventReceived && bufferedRecordCount == 0; + return lastEventReceived && bufferedEventCount == 0; } - public void recordReceived() { - recordsReceived++; + public void eventReceived() { + eventsReceived++; } public boolean isNextEvent(long sequence) { @@ -253,7 +253,7 @@ public boolean checkForDuplicateBatchedEvent(long currentSequence) { boolean result = lastOutputSequence != null && lastOutputSequence == currentSequence; if (result) { duplicates++; - if (--bufferedRecordCount == 0) { + if (--bufferedEventCount == 0) { earliestBufferedSequence = latestBufferedSequence = null; } } @@ -302,8 +302,8 @@ public void encode(ProcessingState value, OutputStream outStream) throws I NULLABLE_LONG_CODER.encode(value.getLastOutputSequence(), outStream); NULLABLE_LONG_CODER.encode(value.getEarliestBufferedSequence(), outStream); NULLABLE_LONG_CODER.encode(value.getLatestBufferedSequence(), outStream); - LONG_CODER.encode(value.getBufferedRecordCount(), outStream); - LONG_CODER.encode(value.getRecordsReceived(), outStream); + LONG_CODER.encode(value.getBufferedEventCount(), outStream); + LONG_CODER.encode(value.getEventsReceived(), outStream); LONG_CODER.encode(value.getDuplicates(), outStream); LONG_CODER.encode(value.getResultCount(), outStream); BOOLEAN_CODER.encode(value.isLastEventReceived(), outStream); diff --git a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/UnprocessedEvent.java b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/UnprocessedEvent.java index 987571483f6ab..2131ef384e22f 100644 --- a/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/UnprocessedEvent.java +++ b/sdks/java/extensions/ordered/src/main/java/org/apache/beam/sdk/extensions/ordered/UnprocessedEvent.java @@ -23,29 +23,68 @@ import java.io.OutputStream; import java.util.Arrays; import java.util.List; +import javax.annotation.Nullable; import org.apache.beam.sdk.coders.ByteCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.commons.lang3.exception.ExceptionUtils; +/** + * Combines the source event which failed to process with the failure reason. + * + * @param + */ @AutoValue public abstract class UnprocessedEvent { + /** + * Create new unprocessed event. + * + * @param event failed event + * @param reason for failure + * @param type of the event + * @return + */ public static UnprocessedEvent create(EventT event, Reason reason) { - return new AutoValue_UnprocessedEvent<>(event, reason); + return new AutoValue_UnprocessedEvent<>(event, reason, null); + } + + /** + * Create new unprocessed event which failed due to an exception thrown. + * + * @param event which failed + * @param exception which caused the failure + * @param type of the event + * @return + */ + public static UnprocessedEvent create(EventT event, Exception exception) { + return new AutoValue_UnprocessedEvent<>( + event, Reason.exception_thrown, ExceptionUtils.getStackTrace(exception)); + } + + static UnprocessedEvent create( + EventT event, Reason reason, @Nullable String failureDetails) { + return new AutoValue_UnprocessedEvent<>(event, reason, failureDetails); } public enum Reason { duplicate, buffered, - sequence_id_outside_valid_range + sequence_id_outside_valid_range, + exception_thrown }; public abstract EventT getEvent(); public abstract Reason getReason(); + public abstract @Nullable String getExplanation(); + static class UnprocessedEventCoder extends Coder> { private final Coder eventCoder; + private final NullableCoder explanationCoder = NullableCoder.of(StringUtf8Coder.of()); UnprocessedEventCoder(Coder eventCoder) { this.eventCoder = eventCoder; @@ -54,14 +93,16 @@ static class UnprocessedEventCoder extends Coder value, OutputStream outStream) throws IOException { ByteCoder.of().encode((byte) value.getReason().ordinal(), outStream); + explanationCoder.encode(value.getExplanation(), outStream); eventCoder.encode(value.getEvent(), outStream); } @Override public UnprocessedEvent decode(InputStream inputStream) throws IOException { Reason reason = Reason.values()[ByteCoder.of().decode(inputStream)]; + String explanation = explanationCoder.decode(inputStream); EventT event = eventCoder.decode(inputStream); - return UnprocessedEvent.create(event, reason); + return UnprocessedEvent.create(event, reason, explanation); } @Override diff --git a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java index 72e48c29530f2..6a24021ad667d 100644 --- a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java +++ b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/OrderedEventProcessorTest.java @@ -27,14 +27,19 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.extensions.ordered.UnprocessedEvent.Reason; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.SerializableMatcher; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -42,6 +47,8 @@ import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Rule; @@ -279,6 +286,42 @@ public void testHandlingOfDuplicateSequences() throws CannotProvideCoderExceptio DONT_PRODUCE_STATUS_ON_EVERY_EVENT); } + @Test + public void testHandlingOfCheckedExceptions() throws CannotProvideCoderException { + Event[] events = { + Event.create(0, "id-1", "a"), + Event.create(1, "id-1", "b"), + Event.create(2, "id-1", StringBuilderState.BAD_VALUE), + Event.create(3, "id-1", "c"), + }; + + Collection> expectedStatuses = new ArrayList<>(); + expectedStatuses.add( + KV.of("id-1", OrderedProcessingStatus.create(1L, 1, 3L, 3L, events.length, 2, 0, false))); + + Collection> expectedOutput = new ArrayList<>(); + expectedOutput.add(KV.of("id-1", "a")); + expectedOutput.add(KV.of("id-1", "ab")); + + Collection>>> failedEvents = new ArrayList<>(); + failedEvents.add( + KV.of( + "id-1", + KV.of( + 2L, + UnprocessedEvent.create(StringBuilderState.BAD_VALUE, Reason.exception_thrown)))); + + testProcessing( + events, + expectedStatuses, + expectedOutput, + failedEvents, + EMISSION_FREQUENCY_ON_EVERY_ELEMENT, + INITIAL_SEQUENCE_OF_0, + LARGE_MAX_RESULTS_PER_OUTPUT, + DONT_PRODUCE_STATUS_ON_EVERY_EVENT); + } + @Test public void testProcessingWithEveryOtherResultEmission() throws CannotProvideCoderException { Event[] events = { @@ -700,7 +743,7 @@ private void testProcessing( * @param events * @param expectedStatuses * @param expectedOutput - * @param expectedDuplicates + * @param expectedUnprocessedEvents * @param emissionFrequency * @param initialSequence * @param maxResultsPerOutput @@ -712,7 +755,7 @@ private void doTest( Event[] events, Collection> expectedStatuses, Collection> expectedOutput, - Collection>>> expectedDuplicates, + Collection>>> expectedUnprocessedEvents, int emissionFrequency, long initialSequence, int maxResultsPerOutput, @@ -756,9 +799,32 @@ private void doTest( .containsInAnyOrder(expectedStatuses); } - PAssert.that("Unprocessed events match", processingResult.unprocessedEvents()) - .containsInAnyOrder(expectedDuplicates); + // This is a temporary workaround until PAssert changes. + boolean unprocessedEventsHaveExceptionStackTrace = false; + for (KV>> event : expectedUnprocessedEvents) { + if (event.getValue().getValue().getReason() == Reason.exception_thrown) { + unprocessedEventsHaveExceptionStackTrace = true; + break; + } + } + if (unprocessedEventsHaveExceptionStackTrace) { + PAssert.thatSingleton( + "Unprocessed event count", + processingResult + .unprocessedEvents() + .apply( + "Window", + Window.>>>into( + new GlobalWindows()) + .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) + .discardingFiredPanes()) + .apply("Count", Count.globally())) + .isEqualTo((long) expectedUnprocessedEvents.size()); + } else { + PAssert.that("Unprocessed events match", processingResult.unprocessedEvents()) + .containsInAnyOrder(expectedUnprocessedEvents); + } pipeline.run(); } @@ -789,4 +855,52 @@ private void doTest( messageFlow = messageFlow.advanceProcessingTime(Duration.standardMinutes(15)); return pipeline.apply("Create Streaming Events", messageFlow.advanceWatermarkToInfinity()); } + + /** + * Unprocessed event's explanation contains stacktraces which makes tests very brittle because it + * requires hardcoding the line numbers in the code. We use this matcher to only compare on the + * first line of the explanation. + */ + static class UnprocessedEventMatcher + extends BaseMatcher>>> + implements SerializableMatcher>>> { + + private KV>> element; + + public UnprocessedEventMatcher(KV>> element) { + this.element = element; + } + + @Override + public boolean matches(Object actual) { + KV>> toMatch = + (KV>>) actual; + + UnprocessedEvent originalEvent = element.getValue().getValue(); + UnprocessedEvent eventToMatch = toMatch.getValue().getValue(); + + return element.getKey().equals(toMatch.getKey()) + && element.getValue().getKey().equals(toMatch.getValue().getKey()) + && originalEvent.getEvent().equals(eventToMatch.getEvent()) + && originalEvent.getReason() == eventToMatch.getReason() + && normalizeExplanation(originalEvent.getExplanation()) + .equals(normalizeExplanation(eventToMatch.getExplanation())); + } + + @Override + public void describeTo(Description description) { + description.appendText("Just some text..."); + } + + static String normalizeExplanation(String value) { + if (value == null) { + return ""; + } + String firstLine = value.split("\n", 1)[0]; + if (firstLine.contains("Exception")) { + return firstLine; + } + return value; + } + } } diff --git a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBuilderState.java b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBuilderState.java index 3d65c36b8c24f..c88730aa8f0ac 100644 --- a/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBuilderState.java +++ b/sdks/java/extensions/ordered/src/test/java/org/apache/beam/sdk/extensions/ordered/StringBuilderState.java @@ -41,6 +41,8 @@ @DefaultCoder(StringBuilderStateCoder.class) class StringBuilderState implements MutableState { + public static final String BAD_VALUE = "throw exception if you see me"; + private int emissionFrequency = 1; private long currentlyEmittedElementNumber; @@ -54,11 +56,19 @@ class StringBuilderState implements MutableState { String initialEvent, int emissionFrequency, long currentlyEmittedElementNumber) { this.emissionFrequency = emissionFrequency; this.currentlyEmittedElementNumber = currentlyEmittedElementNumber; - mutate(initialEvent); + try { + mutate(initialEvent); + } catch (Exception e) { + // this shouldn't happen because the input should be pre-validated. + throw new RuntimeException(e); + } } @Override - public void mutate(String event) { + public void mutate(String event) throws Exception { + if (event.equals(BAD_VALUE)) { + throw new Exception("Validation failed"); + } state.append(event); }