Skip to content

Commit

Permalink
[FLINK-37276] Add missing state v2 access interfaces in RuntimeContext
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Feb 10, 2025
1 parent f335f1f commit bfdc6db
Show file tree
Hide file tree
Showing 13 changed files with 362 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.api.common.functions;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobInfo;
Expand Down Expand Up @@ -410,6 +411,95 @@ <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
@PublicEvolving
<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties);

// ------------------------------------------------------------------------
// Methods for accessing state V2
// ------------------------------------------------------------------------

/**
* Gets a handle to the system's key/value state. The key/value state is only accessible if the
* function is executed on a KeyedStream. On each access, the state exposes the value for the
* key of the element currently processed by the function. Each function may have multiple
* partitioned states, addressed with different names.
*
* <p>Because the scope of each value is the key of the currently processed element, and the
* elements are distributed by the Flink runtime, the system can transparently scale out and
* redistribute the state and KeyedStream.
*
* @param stateProperties The descriptor defining the properties of the stats.
* @param <T> The type of value stored in the state.
* @return The partitioned state object.
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
@Experimental
<T> org.apache.flink.api.common.state.v2.ValueState<T> getState(
org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties);

/**
* Gets a handle to the system's key/value list state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that holds
* lists. One can add elements to the list, or retrieve the list as a whole.
*
* @param stateProperties The descriptor defining the properties of the stats.
* @param <T> The type of value stored in the state.
* @return The partitioned state object.
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part os a KeyedStream).
*/
@Experimental
<T> org.apache.flink.api.common.state.v2.ListState<T> getListState(
org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties);

/**
* Gets a handle to the system's key/value reducing state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* aggregates values.
*
* @param stateProperties The descriptor defining the properties of the stats.
* @param <T> The type of value stored in the state.
* @return The partitioned state object.
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
@Experimental
<T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState(
org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> stateProperties);

/**
* Gets a handle to the system's key/value aggregating state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that
* aggregates values with different types.
*
* @param stateProperties The descriptor defining the properties of the stats.
* @param <IN> The type of the values that are added to the state.
* @param <ACC> The type of the accumulator (intermediate aggregation state).
* @param <OUT> The type of the values that are returned from the state.
* @return The partitioned state object.
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
@Experimental
<IN, ACC, OUT>
org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingState(
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT>
stateProperties);

/**
* Gets a handle to the system's key/value map state. This state is similar to the state
* accessed via {@link #getState(ValueStateDescriptor)}, but is optimized for state that is
* composed of user-defined key-value pairs
*
* @param stateProperties The descriptor defining the properties of the stats.
* @param <UK> The type of the user keys stored in the state.
* @param <UV> The type of the user values stored in the state.
* @return The partitioned state object.
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
@Experimental
<UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState(
org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> stateProperties);

/**
* Get the meta information of current job.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,43 @@ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> statePro
"This state is only accessible by functions executed on a KeyedStream");
}

@Override
public <T> org.apache.flink.api.common.state.v2.ValueState<T> getState(
org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}

@Override
public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState(
org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}

@Override
public <T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState(
org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}

@Override
public <IN, ACC, OUT>
org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingState(
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT>
stateProperties) {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}

@Override
public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState(
org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> stateProperties) {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}

@Internal
@VisibleForTesting
public String getAllocationIDAsString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,36 @@ public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
public <UK, UV> MapState<UK, UV> getMapState(final MapStateDescriptor<UK, UV> stateProperties) {
throw new UnsupportedOperationException("State is not supported.");
}

@Override
public <T> org.apache.flink.api.common.state.v2.ValueState<T> getState(
org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException("State is not supported.");
}

@Override
public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState(
org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException("State is not supported.");
}

@Override
public <T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState(
org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException("State is not supported.");
}

@Override
public <IN, ACC, OUT>
org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingState(
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT>
stateProperties) {
throw new UnsupportedOperationException("State is not supported.");
}

@Override
public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState(
org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> stateProperties) {
throw new UnsupportedOperationException("State is not supported.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,51 @@ public void testCepRuntimeContext() {
// expected
}

try {
runtimeContext.getState(
new org.apache.flink.api.common.state.v2.ValueStateDescriptor<>(
"foobar", Integer.class));
fail("Expected getState to fail with unsupported operation exception.");
} catch (UnsupportedOperationException e) {
// expected
}

try {
runtimeContext.getListState(
new org.apache.flink.api.common.state.v2.ListStateDescriptor<>(
"foobar", Integer.class));
fail("Expected getListState to fail with unsupported operation exception.");
} catch (UnsupportedOperationException e) {
// expected
}

try {
runtimeContext.getReducingState(
new org.apache.flink.api.common.state.v2.ReducingStateDescriptor<>(
"foobar", mock(ReduceFunction.class), Integer.class));
fail("Expected getReducingState to fail with unsupported operation exception.");
} catch (UnsupportedOperationException e) {
// expected
}

try {
runtimeContext.getAggregatingState(
new org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<>(
"foobar", mock(AggregateFunction.class), Integer.class));
fail("Expected getAggregatingState to fail with unsupported operation exception.");
} catch (UnsupportedOperationException e) {
// expected
}

try {
runtimeContext.getMapState(
new org.apache.flink.api.common.state.v2.MapStateDescriptor<>(
"foobar", Integer.class, String.class));
fail("Expected getMapState to fail with unsupported operation exception.");
} catch (UnsupportedOperationException e) {
// expected
}

try {
runtimeContext.addAccumulator("foobar", mock(Accumulator.class));
fail("Expected addAccumulator to fail with unsupported operation exception.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,38 @@ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> statePro
return keyedStateStore.getMapState(stateProperties);
}

@Override
public <T> org.apache.flink.api.common.state.v2.ValueState<T> getState(
org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException("State processor api does not support state v2.");
}

@Override
public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState(
org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException("State processor api does not support state v2.");
}

@Override
public <T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState(
org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException("State processor api does not support state v2.");
}

@Override
public <IN, ACC, OUT>
org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingState(
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT>
stateProperties) {
throw new UnsupportedOperationException("State processor api does not support state v2.");
}

@Override
public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState(
org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> stateProperties) {
throw new UnsupportedOperationException("State processor api does not support state v2.");
}

public List<StateDescriptor<?, ?>> getStateDescriptors() {
if (registeredDescriptors.isEmpty()) {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(
return keyedStateStore;
}

@Override
public <T> org.apache.flink.api.common.state.v2.ValueState<T> getState(
org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties) {
return getValueState(stateProperties);
}

// TODO: Reconstruct this after StateManager is ready in FLIP-410.
public <T> org.apache.flink.api.common.state.v2.ValueState<T> getValueState(
org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties) {
Expand All @@ -255,27 +261,31 @@ public <T> org.apache.flink.api.common.state.v2.ValueState<T> getValueState(
return keyedStateStore.getValueState(stateProperties);
}

@Override
public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState(
org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties) {
KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
stateProperties.initializeSerializerUnlessSet(this::createSerializer);
return keyedStateStore.getListState(stateProperties);
}

@Override
public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState(
org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> stateProperties) {
KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
stateProperties.initializeSerializerUnlessSet(this::createSerializer);
return keyedStateStore.getMapState(stateProperties);
}

@Override
public <T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState(
org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> stateProperties) {
KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
stateProperties.initializeSerializerUnlessSet(this::createSerializer);
return keyedStateStore.getReducingState(stateProperties);
}

@Override
public <IN, ACC, OUT>
org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingState(
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<IN, ACC, OUT>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,44 @@ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> statePro
"State is not supported in rich async functions.");
}

@Override
public <T> org.apache.flink.api.common.state.v2.ValueState<T> getState(
org.apache.flink.api.common.state.v2.ValueStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException(
"State is not supported in rich async functions.");
}

@Override
public <T> org.apache.flink.api.common.state.v2.ListState<T> getListState(
org.apache.flink.api.common.state.v2.ListStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException(
"State is not supported in rich async functions.");
}

@Override
public <T> org.apache.flink.api.common.state.v2.ReducingState<T> getReducingState(
org.apache.flink.api.common.state.v2.ReducingStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException(
"State is not supported in rich async functions.");
}

@Override
public <IN, ACC, OUT>
org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingState(
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<
IN, ACC, OUT>
stateProperties) {
throw new UnsupportedOperationException(
"State is not supported in rich async functions.");
}

@Override
public <UK, UV> org.apache.flink.api.common.state.v2.MapState<UK, UV> getMapState(
org.apache.flink.api.common.state.v2.MapStateDescriptor<UK, UV> stateProperties) {
throw new UnsupportedOperationException(
"State is not supported in rich async functions.");
}

@Override
public <V, A extends Serializable> void addAccumulator(
String name, Accumulator<V, A> accumulator) {
Expand Down
Loading

0 comments on commit bfdc6db

Please sign in to comment.