Skip to content

Commit c7e2cb7

Browse files
committed
[FLINK-38460] Add SinkUpsertMaterializerRescalingTest
1 parent 59085a0 commit c7e2cb7

File tree

1 file changed

+348
-0
lines changed

1 file changed

+348
-0
lines changed
Lines changed: 348 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.runtime.operators.sink;
20+
21+
import org.apache.flink.api.common.state.StateTtlConfig;
22+
import org.apache.flink.core.execution.SavepointFormatType;
23+
import org.apache.flink.runtime.checkpoint.CheckpointType;
24+
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
25+
import org.apache.flink.runtime.checkpoint.SavepointType;
26+
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
27+
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
28+
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
29+
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
30+
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
31+
import org.apache.flink.table.data.RowData;
32+
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
33+
import org.apache.flink.table.runtime.generated.HashFunction;
34+
import org.apache.flink.table.runtime.generated.RecordEqualiser;
35+
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
36+
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
37+
import org.apache.flink.table.runtime.util.StateConfigUtil;
38+
import org.apache.flink.table.types.logical.BigIntType;
39+
import org.apache.flink.table.types.logical.IntType;
40+
import org.apache.flink.table.types.logical.LogicalType;
41+
import org.apache.flink.table.types.logical.RowType;
42+
import org.apache.flink.table.types.logical.VarCharType;
43+
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
44+
import org.apache.flink.types.RowKind;
45+
46+
import org.junit.Test;
47+
import org.junit.runner.RunWith;
48+
import org.junit.runners.Parameterized;
49+
import org.junit.runners.Parameterized.Parameter;
50+
51+
import javax.annotation.Nullable;
52+
53+
import java.util.ArrayList;
54+
import java.util.Arrays;
55+
import java.util.List;
56+
import java.util.Objects;
57+
import java.util.function.ToIntFunction;
58+
import java.util.stream.Collectors;
59+
60+
import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
61+
import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
62+
import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
63+
64+
/** Rescaling and migration unit tests for {@link SinkUpsertMaterializer}. */
65+
@RunWith(Parameterized.class)
66+
public class SinkUpsertMaterializerRescalingTest {
67+
68+
@Parameter public SumStateBackend backend;
69+
70+
@Parameterized.Parameters(name = "stateBackend={0}")
71+
public static Object[][] generateTestParameters() {
72+
List<Object[]> result = new ArrayList<>();
73+
for (SumStateBackend backend : SumStateBackend.values()) {
74+
result.add(new Object[] {backend});
75+
}
76+
return result.toArray(new Object[0][]);
77+
}
78+
79+
@Test
80+
public void testScaleUpThenDown() throws Exception {
81+
testRescaleFromToFrom(10, 2, 3, backend, backend);
82+
}
83+
84+
@Test
85+
public void testScaleDownThenUp() throws Exception {
86+
testRescaleFromToFrom(10, 3, 2, backend, backend);
87+
}
88+
89+
@Test
90+
public void testRecovery() throws Exception {
91+
testRescaleFromToFrom(1, 1, 1, backend, backend);
92+
}
93+
94+
@Test
95+
public void testForwardAndBackwardMigration() throws Exception {
96+
testRescaleFromToFrom(7, 3, 3, backend, getOtherBackend(backend));
97+
}
98+
99+
@Test
100+
public void testScaleUpThenDownWithMigration() throws Exception {
101+
testRescaleFromToFrom(7, 1, 5, backend, getOtherBackend(backend));
102+
}
103+
104+
@Test
105+
public void testScaleDownThenUpWithMigration() throws Exception {
106+
testRescaleFromToFrom(7, 5, 1, backend, getOtherBackend(SumStateBackend.HEAP));
107+
}
108+
109+
private SumStateBackend getOtherBackend(SumStateBackend backend) {
110+
return backend == SumStateBackend.HEAP ? SumStateBackend.ROCKSDB : SumStateBackend.HEAP;
111+
}
112+
113+
@SuppressWarnings("unchecked")
114+
private void testRescaleFromToFrom(
115+
final int maxParallelism,
116+
final int fromParallelism,
117+
final int toParallelism,
118+
final SumStateBackend fromBackend,
119+
final SumStateBackend toBackend)
120+
throws Exception {
121+
122+
int[] currentParallelismRef = new int[] {fromParallelism};
123+
124+
boolean useSavepoint = fromBackend != toBackend;
125+
126+
OneInputStreamOperator<RowData, RowData>[] materializers =
127+
new OneInputStreamOperator[maxParallelism];
128+
KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>[] harnesses =
129+
new KeyedOneInputStreamOperatorTestHarness[maxParallelism];
130+
131+
final ToIntFunction<StreamRecord<RowData>> combinedHarnesses =
132+
(r) -> {
133+
try {
134+
int subtaskIndex =
135+
KeyGroupRangeAssignment.assignKeyToParallelOperator(
136+
KEY_SELECTOR.getKey(r.getValue()),
137+
maxParallelism,
138+
currentParallelismRef[0]);
139+
140+
harnesses[subtaskIndex].processElement(r);
141+
return subtaskIndex;
142+
} catch (Exception e) {
143+
throw new RuntimeException(e);
144+
}
145+
};
146+
147+
initHarnessesAndMaterializers(
148+
harnesses, materializers, fromBackend, maxParallelism, fromParallelism, null);
149+
150+
int idx = combinedHarnesses.applyAsInt(insertRecord(1L, 1, "a1"));
151+
ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
152+
153+
idx = combinedHarnesses.applyAsInt(insertRecord(2L, 1, "a2"));
154+
ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2"));
155+
156+
List<OperatorSubtaskState> subtaskStates =
157+
snapshotHarnesses(harnesses, fromParallelism, 1L, useSavepoint);
158+
159+
currentParallelismRef[0] = toParallelism;
160+
initHarnessesAndMaterializers(
161+
harnesses, materializers, toBackend, maxParallelism, toParallelism, subtaskStates);
162+
163+
idx = combinedHarnesses.applyAsInt(insertRecord(3L, 1, "a3"));
164+
ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3"));
165+
166+
idx = combinedHarnesses.applyAsInt(insertRecord(4L, 1, "a4"));
167+
ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 4L, 1, "a4"));
168+
169+
subtaskStates = snapshotHarnesses(harnesses, toParallelism, 2L, useSavepoint);
170+
171+
currentParallelismRef[0] = fromParallelism;
172+
initHarnessesAndMaterializers(
173+
harnesses,
174+
materializers,
175+
fromBackend,
176+
maxParallelism,
177+
fromParallelism,
178+
subtaskStates);
179+
180+
idx = combinedHarnesses.applyAsInt(deleteRecord(4L, 1, "a4"));
181+
ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3"));
182+
183+
idx = combinedHarnesses.applyAsInt(deleteRecord(2L, 1, "a2"));
184+
ASSERTOR.shouldEmitNothing(harnesses[idx]);
185+
186+
idx = combinedHarnesses.applyAsInt(deleteRecord(3L, 1, "a3"));
187+
ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1"));
188+
189+
idx = combinedHarnesses.applyAsInt(deleteRecord(1L, 1, "a1"));
190+
ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.DELETE, 1L, 1, "a1"));
191+
192+
idx = combinedHarnesses.applyAsInt(insertRecord(4L, 1, "a4"));
193+
ASSERTOR.shouldEmit(harnesses[idx], rowOfKind(RowKind.INSERT, 4L, 1, "a4"));
194+
195+
Arrays.stream(harnesses)
196+
.filter(Objects::nonNull)
197+
.forEach(h -> h.setStateTtlProcessingTime(1002));
198+
199+
idx = combinedHarnesses.applyAsInt(deleteRecord(4L, 1, "a4"));
200+
ASSERTOR.shouldEmitNothing(harnesses[idx]);
201+
202+
Arrays.stream(harnesses)
203+
.filter(Objects::nonNull)
204+
.forEach(
205+
h -> {
206+
try {
207+
h.close();
208+
} catch (Exception e) {
209+
throw new RuntimeException(e);
210+
}
211+
});
212+
}
213+
214+
private void initHarnessesAndMaterializers(
215+
KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>[] harnesses,
216+
OneInputStreamOperator<RowData, RowData>[] materializers,
217+
SumStateBackend backend,
218+
int maxParallelism,
219+
int parallelism,
220+
@Nullable List<OperatorSubtaskState> subtaskStates)
221+
throws Exception {
222+
for (int i = 0; i < parallelism; ++i) {
223+
materializers[i] =
224+
SinkUpsertMaterializer.create(
225+
TTL_CONFIG,
226+
RowType.of(LOGICAL_TYPES),
227+
EQUALISER,
228+
UPSERT_KEY_EQUALISER,
229+
null);
230+
harnesses[i] =
231+
new KeyedOneInputStreamOperatorTestHarness<>(
232+
materializers[i],
233+
KEY_SELECTOR,
234+
KEY_SELECTOR.getProducedType(),
235+
maxParallelism,
236+
parallelism,
237+
i);
238+
239+
harnesses[i].setStateBackend(backend.create(false));
240+
241+
if (subtaskStates != null) {
242+
OperatorSubtaskState operatorSubtaskState =
243+
AbstractStreamOperatorTestHarness.repackageState(
244+
subtaskStates.toArray(new OperatorSubtaskState[0]));
245+
246+
harnesses[i].initializeState(
247+
AbstractStreamOperatorTestHarness.repartitionOperatorState(
248+
operatorSubtaskState,
249+
maxParallelism,
250+
subtaskStates.size(),
251+
parallelism,
252+
i));
253+
}
254+
255+
harnesses[i].open();
256+
harnesses[i].setStateTtlProcessingTime(1);
257+
}
258+
}
259+
260+
private List<OperatorSubtaskState> snapshotHarnesses(
261+
KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>[] harnesses,
262+
int parallelism,
263+
long checkpointId,
264+
boolean useSavepoint) {
265+
return Arrays.stream(harnesses, 0, parallelism)
266+
.map(
267+
h -> {
268+
try {
269+
return h.snapshotWithLocalState(
270+
checkpointId,
271+
0L,
272+
useSavepoint
273+
? SavepointType.savepoint(
274+
SavepointFormatType.CANONICAL)
275+
: CheckpointType.CHECKPOINT)
276+
.getJobManagerOwnedState();
277+
} catch (Exception e) {
278+
throw new RuntimeException(e);
279+
}
280+
})
281+
.collect(Collectors.toList());
282+
}
283+
284+
/** Test equalizer for records. */
285+
protected static class TestRecordEqualiser implements RecordEqualiser, HashFunction {
286+
@Override
287+
public boolean equals(RowData row1, RowData row2) {
288+
return row1.getRowKind() == row2.getRowKind()
289+
&& row1.getLong(0) == row2.getLong(0)
290+
&& row1.getInt(1) == row2.getInt(1)
291+
&& row1.getString(2).equals(row2.getString(2));
292+
}
293+
294+
@Override
295+
public int hashCode(Object data) {
296+
RowData rd = (RowData) data;
297+
return Objects.hash(rd.getRowKind(), rd.getLong(0), rd.getInt(1), rd.getString(2));
298+
}
299+
}
300+
301+
/** Test equalizer for upsert keys. */
302+
protected static class TestUpsertKeyEqualiser implements RecordEqualiser, HashFunction {
303+
@Override
304+
public boolean equals(RowData row1, RowData row2) {
305+
return row1.getRowKind() == row2.getRowKind() && row1.getLong(0) == row2.getLong(0);
306+
}
307+
308+
@Override
309+
public int hashCode(Object data) {
310+
RowData rd = (RowData) data;
311+
return Objects.hash(rd.getRowKind(), rd.getLong(0));
312+
}
313+
}
314+
315+
private static class MyGeneratedRecordEqualiser extends GeneratedRecordEqualiser {
316+
317+
public MyGeneratedRecordEqualiser() {
318+
super("", "", new Object[0]);
319+
}
320+
321+
@Override
322+
public RecordEqualiser newInstance(ClassLoader classLoader) {
323+
return new TestRecordEqualiser();
324+
}
325+
}
326+
327+
private static final StateTtlConfig TTL_CONFIG = StateConfigUtil.createTtlConfig(1000);
328+
329+
private static final LogicalType[] LOGICAL_TYPES =
330+
new LogicalType[] {new BigIntType(), new IntType(), new VarCharType()};
331+
332+
private static final RowDataKeySelector KEY_SELECTOR =
333+
HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, LOGICAL_TYPES);
334+
335+
private static final RowDataHarnessAssertor ASSERTOR =
336+
new RowDataHarnessAssertor(LOGICAL_TYPES);
337+
338+
private static final GeneratedRecordEqualiser EQUALISER = new MyGeneratedRecordEqualiser();
339+
340+
private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER =
341+
new GeneratedRecordEqualiser("", "", new Object[0]) {
342+
343+
@Override
344+
public RecordEqualiser newInstance(ClassLoader classLoader) {
345+
return new TestUpsertKeyEqualiser();
346+
}
347+
};
348+
}

0 commit comments

Comments
 (0)