Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Ordered Processing PTransform to Java SDK #30735

Merged
merged 8 commits into from
Mar 29, 2024
33 changes: 33 additions & 0 deletions sdks/java/extensions/ordered/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
plugins { id 'org.apache.beam.module' }
applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.sorter')

description = "Apache Beam :: SDKs :: Java :: Extensions :: Ordered"

dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation library.java.commons_lang3
implementation library.java.vendored_guava_32_1_2_jre
testImplementation library.java.junit
testImplementation library.java.hamcrest
testImplementation project(path: ':sdks:java:core')
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.ordered;

import java.io.Serializable;
import org.checkerframework.checker.nullness.qual.NonNull;

/**
* Classes extending this interface will be called by {@link OrderedEventProcessor} to examine every
* incoming event.
*
* @param <EventT>
* @param <StateT>
*/
public interface EventExaminer<EventT, StateT extends MutableState<EventT, ?>>
extends Serializable {

/**
* Is this event the first expected event for the given key and window?
*
* @param sequenceNumber the sequence number of the event as defined by the key of the input
* PCollection to {@link OrderedEventProcessor}
* @param event being processed
* @return true if this is the initial sequence.
*/
boolean isInitialEvent(long sequenceNumber, EventT event);

/**
* If the event was the first event in the sequence, create the state to hold the required data
* needed for processing. This data will be persisted.
*
* @param event the first event in the sequence.
* @return the state to persist.
*/
@NonNull
StateT createStateOnInitialEvent(EventT event);

/**
* Is this event the last expected event for a given key and window?
*
* @param sequenceNumber of the event
* @param event being processed
* @return true if the last event. There are cases where it's impossible to know whether it's the
* last event. False should be returned in those cases.
*/
boolean isLastEvent(long sequenceNumber, EventT event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.ordered;

import java.io.Serializable;

/** Mutable state mutates when events apply to it. It is stored in a Beam state. */
public interface MutableState<EventT, ResultT> extends Serializable {

/**
* The interface assumes that events will mutate the state without the possibility of throwing an
* error.
*
* @param event to be processed
* @throws Exception if a checked exception is thrown, the event will be output into {@link
* OrderedEventProcessorResult#unprocessedEvents()} with
*/
void mutate(EventT event) throws Exception;

/**
* This method is called after each state mutation.
*
* @return Result of the processing. Can be null if nothing needs to be output after this
* mutation.
*/
ResultT produceResult();
}
Loading
Loading