Skip to content

Commit

Permalink
Add Ordered Processing PTransform to Java SDK (#30735)
Browse files Browse the repository at this point in the history
* Initial check-in of the ordered processing extension in Java.

* Address PR comments.

* Address PR comments.

* Added JavaDocs to OrderedProcessingStatus.java

* Added batch tests. Added DLQ for events with the sequence outside of the valid range.

* Added tests for windowed input. Added references to the unresolved TODO's captured as Beam's issues.

* Added DLQ handling of checked exceptions happening during the state mutations.
  • Loading branch information
slilichenko committed Mar 29, 2024
1 parent e3fee51 commit fe2a224
Show file tree
Hide file tree
Showing 16 changed files with 2,972 additions and 0 deletions.
33 changes: 33 additions & 0 deletions sdks/java/extensions/ordered/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
plugins { id 'org.apache.beam.module' }
applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.sorter')

description = "Apache Beam :: SDKs :: Java :: Extensions :: Ordered"

dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation library.java.commons_lang3
implementation library.java.vendored_guava_32_1_2_jre
testImplementation library.java.junit
testImplementation library.java.hamcrest
testImplementation project(path: ':sdks:java:core')
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.ordered;

import java.io.Serializable;
import org.checkerframework.checker.nullness.qual.NonNull;

/**
* Classes extending this interface will be called by {@link OrderedEventProcessor} to examine every
* incoming event.
*
* @param <EventT>
* @param <StateT>
*/
public interface EventExaminer<EventT, StateT extends MutableState<EventT, ?>>
extends Serializable {

/**
* Is this event the first expected event for the given key and window?
*
* @param sequenceNumber the sequence number of the event as defined by the key of the input
* PCollection to {@link OrderedEventProcessor}
* @param event being processed
* @return true if this is the initial sequence.
*/
boolean isInitialEvent(long sequenceNumber, EventT event);

/**
* If the event was the first event in the sequence, create the state to hold the required data
* needed for processing. This data will be persisted.
*
* @param event the first event in the sequence.
* @return the state to persist.
*/
@NonNull
StateT createStateOnInitialEvent(EventT event);

/**
* Is this event the last expected event for a given key and window?
*
* @param sequenceNumber of the event
* @param event being processed
* @return true if the last event. There are cases where it's impossible to know whether it's the
* last event. False should be returned in those cases.
*/
boolean isLastEvent(long sequenceNumber, EventT event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.ordered;

import java.io.Serializable;

/** Mutable state mutates when events apply to it. It is stored in a Beam state. */
public interface MutableState<EventT, ResultT> extends Serializable {

/**
* The interface assumes that events will mutate the state without the possibility of throwing an
* error.
*
* @param event to be processed
* @throws Exception if a checked exception is thrown, the event will be output into {@link
* OrderedEventProcessorResult#unprocessedEvents()} with
*/
void mutate(EventT event) throws Exception;

/**
* This method is called after each state mutation.
*
* @return Result of the processing. Can be null if nothing needs to be output after this
* mutation.
*/
ResultT produceResult();
}
Loading

0 comments on commit fe2a224

Please sign in to comment.