Skip to content

Commit

Permalink
Fix protobuf build error in WindmillMap.persistDirect() for a removal…
Browse files Browse the repository at this point in the history
… in Dataflow Streaming Java Legacy Runner without Streaming Engine (apache#32893)

* Check WorkItemCommitRequest is buildable in WindmillStateInternalTest
  • Loading branch information
baeminbo authored Oct 24, 2024
1 parent 135a5be commit 72fccfc
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* (Java) Fixed tearDown not invoked when DoFn throws on Portable Runners ([#18592](https://github.com/apache/beam/issues/18592), [#31381](https://github.com/apache/beam/issues/31381)).
* (Java) Fixed protobuf error with MapState.remove() in Dataflow Streaming Java Legacy Runner without Streaming Engine ([#32892](https://github.com/apache/beam/issues/32892)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,7 @@ protected Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForK
keyCoder.encode(key, keyStream, Coder.Context.OUTER);
ByteString keyBytes = keyStream.toByteString();
// Leaving data blank means that we delete the tag.
commitBuilder
.addValueUpdatesBuilder()
.setTag(keyBytes)
.setStateFamily(stateFamily)
.getValueBuilder()
.setTimestamp(Long.MAX_VALUE);
commitBuilder.addValueUpdatesBuilder().setTag(keyBytes).setStateFamily(stateFamily);

V cachedValue = cachedValues.remove(key);
if (cachedValue != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.AbstractMap;
import java.util.AbstractMap.SimpleEntry;
Expand Down Expand Up @@ -305,6 +307,26 @@ private <K> K userKeyFromProtoKey(ByteString tag, Coder<K> keyCoder) throws IOEx
return keyCoder.decode(keyBytes.newInput(), Context.OUTER);
}

private static void assertBuildable(
Windmill.WorkItemCommitRequest.Builder commitWorkRequestBuilder) {
Windmill.WorkItemCommitRequest.Builder clone = commitWorkRequestBuilder.clone();
if (!clone.hasKey()) {
clone.setKey(ByteString.EMPTY); // key is required to build
}
if (!clone.hasWorkToken()) {
clone.setWorkToken(1357924680L); // workToken is required to build
}

try {
clone.build();
} catch (Exception e) {
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw));
fail(
"Failed to build commitRequest from: " + commitWorkRequestBuilder + "\n" + sw.toString());
}
}

@Test
public void testMapAddBeforeGet() throws Exception {
StateTag<MapState<String, Integer>> addr =
Expand Down Expand Up @@ -647,6 +669,8 @@ public void testMapAddPersist() throws Exception {
.map(tv -> fromTagValue(tv, StringUtf8Coder.of(), VarIntCoder.of()))
.collect(Collectors.toList()),
Matchers.containsInAnyOrder(new SimpleEntry<>(tag1, 1), new SimpleEntry<>(tag2, 2)));

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -670,6 +694,8 @@ public void testMapRemovePersist() throws Exception {
.map(tv -> fromTagValue(tv, StringUtf8Coder.of(), VarIntCoder.of()))
.collect(Collectors.toList()),
Matchers.containsInAnyOrder(new SimpleEntry<>(tag1, null), new SimpleEntry<>(tag2, null)));

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -695,6 +721,8 @@ public void testMapClearPersist() throws Exception {
assertEquals(
protoKeyFromUserKey(null, StringUtf8Coder.of()),
commitBuilder.getTagValuePrefixDeletes(0).getTagPrefix());

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -736,6 +764,8 @@ public void testMapComplexPersist() throws Exception {
commitBuilder = Windmill.WorkItemCommitRequest.newBuilder();
assertEquals(0, commitBuilder.getTagValuePrefixDeletesCount());
assertEquals(0, commitBuilder.getValueUpdatesCount());

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -953,6 +983,8 @@ public void testMultimapRemovePersistPut() {

multimapState.put(key, 5);
assertThat(multimapState.get(key).read(), Matchers.containsInAnyOrder(4, 5));

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -1766,6 +1798,8 @@ public void testMultimapPutAndPersist() {
builder,
new MultimapEntryUpdate(key1, Arrays.asList(1, 2), false),
new MultimapEntryUpdate(key2, Collections.singletonList(2), false));

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -1799,6 +1833,8 @@ public void testMultimapRemovePutAndPersist() {
builder,
new MultimapEntryUpdate(key1, Arrays.asList(1, 2), true),
new MultimapEntryUpdate(key2, Collections.singletonList(4), true));

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -1825,6 +1861,8 @@ public void testMultimapRemoveAndPersist() {
builder,
new MultimapEntryUpdate(key1, Collections.emptyList(), true),
new MultimapEntryUpdate(key2, Collections.emptyList(), true));

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -1856,6 +1894,8 @@ public void testMultimapPutRemoveClearAndPersist() {
Iterables.getOnlyElement(commitBuilder.getMultimapUpdatesBuilderList());
assertEquals(0, builder.getUpdatesCount());
assertTrue(builder.getDeleteAll());

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -1894,6 +1934,8 @@ false, key(NAMESPACE, tag), STATE_FAMILY, VarIntCoder.of()))
Iterables.getOnlyElement(commitBuilder.getMultimapUpdatesBuilderList());
assertTagMultimapUpdates(
builder, new MultimapEntryUpdate(key1, Collections.singletonList(4), false));

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -1938,6 +1980,8 @@ true, key(NAMESPACE, tag), STATE_FAMILY, VarIntCoder.of()))
ByteArrayCoder.of().decode(entryUpdate.getEntryName().newInput(), Context.OUTER);
assertArrayEquals(key1, decodedKey);
assertTrue(entryUpdate.getDeleteAll());

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2053,6 +2097,8 @@ true, key(NAMESPACE, tag), STATE_FAMILY, VarIntCoder.of()))
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
underTest.persist(commitBuilder);

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2253,6 +2299,8 @@ public void testOrderedListAddPersist() throws Exception {
assertEquals("hello", updates.getInserts(0).getEntries(0).getValue().toStringUtf8());
assertEquals(1000, updates.getInserts(0).getEntries(0).getSortKey());
assertEquals(IdTracker.NEW_RANGE_MIN_ID, updates.getInserts(0).getEntries(0).getId());

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2284,6 +2332,8 @@ public void testOrderedListClearPersist() throws Exception {
assertEquals(IdTracker.NEW_RANGE_MIN_ID, updates.getInserts(0).getEntries(0).getId());
assertEquals(IdTracker.NEW_RANGE_MIN_ID + 1, updates.getInserts(0).getEntries(1).getId());
Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2331,6 +2381,8 @@ public void testOrderedListDeleteRangePersist() {
assertEquals(4000, updates.getInserts(0).getEntries(1).getSortKey());
assertEquals(IdTracker.NEW_RANGE_MIN_ID, updates.getInserts(0).getEntries(0).getId());
assertEquals(IdTracker.NEW_RANGE_MIN_ID + 1, updates.getInserts(0).getEntries(1).getId());

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2539,6 +2591,8 @@ public void testOrderedListPersistEmpty() throws Exception {
assertEquals(1, updates.getDeletesCount());
assertEquals(WindmillOrderedList.MIN_TS_MICROS, updates.getDeletes(0).getRange().getStart());
assertEquals(WindmillOrderedList.MAX_TS_MICROS, updates.getDeletes(0).getRange().getLimit());

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2653,6 +2707,8 @@ public void testBagAddPersist() throws Exception {
assertEquals("hello", bagUpdates.getValues(0).toStringUtf8());

Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -2678,6 +2734,8 @@ public void testBagClearPersist() throws Exception {
assertEquals("world", tagBag.getValues(0).toStringUtf8());

Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -2693,6 +2751,8 @@ public void testBagPersistEmpty() throws Exception {

// 1 bag update = the clear
assertEquals(1, commitBuilder.getBagUpdatesCount());

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2806,6 +2866,8 @@ public void testCombiningAddPersist() throws Exception {
11, CoderUtils.decodeFromByteArray(accumCoder, bagUpdates.getValues(0).toByteArray())[0]);

Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2835,6 +2897,8 @@ public void testCombiningAddPersistWithCompact() throws Exception {
assertTrue(bagUpdates.getDeleteAll());
assertEquals(
111, CoderUtils.decodeFromByteArray(accumCoder, bagUpdates.getValues(0).toByteArray())[0]);

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2862,6 +2926,8 @@ public void testCombiningClearPersist() throws Exception {
11, CoderUtils.decodeFromByteArray(accumCoder, tagBag.getValues(0).toByteArray())[0]);

Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -2990,6 +3056,8 @@ public void testWatermarkPersistEarliest() throws Exception {
assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), watermarkHold.getTimestamps(0));

Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -3016,6 +3084,8 @@ public void testWatermarkPersistLatestEmpty() throws Exception {

Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -3042,6 +3112,8 @@ public void testWatermarkPersistLatestWindmillWins() throws Exception {

Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -3068,6 +3140,8 @@ public void testWatermarkPersistLatestLocalAdditionsWin() throws Exception {

Mockito.verify(mockReader).watermarkFuture(key(NAMESPACE, "watermark"), STATE_FAMILY);
Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -3091,6 +3165,8 @@ public void testWatermarkPersistEndOfWindow() throws Exception {

// Blind adds should not need to read the future.
Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -3116,6 +3192,8 @@ public void testWatermarkClearPersist() throws Exception {
assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), clearAndUpdate.getTimestamps(0));

Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -3133,6 +3211,8 @@ public void testWatermarkPersistEmpty() throws Exception {

// 1 bag update corresponds to deletion. There shouldn't be a bag update adding items.
assertEquals(1, commitBuilder.getWatermarkHoldsCount());

assertBuildable(commitBuilder);
}

@Test
Expand Down Expand Up @@ -3200,6 +3280,8 @@ public void testValueSetPersist() throws Exception {
assertTrue(valueUpdate.isInitialized());

Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -3220,6 +3302,8 @@ public void testValueClearPersist() throws Exception {
assertEquals(0, valueUpdate.getValue().getData().size());

Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand All @@ -3234,6 +3318,8 @@ public void testValueNoChangePersist() throws Exception {
assertEquals(0, commitBuilder.getValueUpdatesCount());

Mockito.verifyNoMoreInteractions(mockReader);

assertBuildable(commitBuilder);
}

@Test
Expand Down

0 comments on commit 72fccfc

Please sign in to comment.