Skip to content

Commit

Permalink
Bug 37107153 - Build: Intermittent failure in NamedQueueProxyProtocol…
Browse files Browse the repository at this point in the history
…IT.shouldEnsureSimpleDequeAfterPagedQueue - harden proto message handling

(merge main -> ce/main 111628)

[git-p4: depot-paths = "//dev/coherence-ce/main/": change = 111631]
  • Loading branch information
thegridman committed Sep 27, 2024
1 parent a16dadc commit 06e3b01
Showing 1 changed file with 41 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@
import com.google.protobuf.BytesValue;
import com.google.protobuf.Empty;
import com.google.protobuf.Int32Value;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.oracle.bedrock.testsupport.deferred.Eventually;
import com.oracle.coherence.common.base.Exceptions;
import com.oracle.coherence.concurrent.Queues;
import com.oracle.coherence.grpc.BinaryHelper;
import com.oracle.coherence.grpc.NamedQueueProtocol;
import com.oracle.coherence.grpc.messages.cache.v1.NamedCacheResponse;
import com.oracle.coherence.grpc.messages.cache.v1.ResponseType;
import com.oracle.coherence.grpc.messages.common.v1.ErrorMessage;
import com.oracle.coherence.grpc.messages.common.v1.OptionalValue;
import com.oracle.coherence.grpc.messages.concurrent.queue.v1.EnsureQueueRequest;
Expand Down Expand Up @@ -58,6 +56,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
Expand Down Expand Up @@ -339,7 +338,7 @@ public void shouldPeekHeadWithExistingElement(String ignored, Serializer seriali
int queueId = ensureQueue(channel, observer, sQueueName);

NamedQueueResponse response = sendQueueRequest(channel, observer, queueId, NamedQueueRequestType.PeekHead, Empty.getDefaultInstance());
OptionalValue value = response.getMessage().unpack(OptionalValue.class);
OptionalValue value = unpackAny(response, NamedQueueResponse::getMessage, OptionalValue.class);

assertThat(value, is(notNullValue()));
assertThat(value.getPresent(), is(true));
Expand All @@ -365,7 +364,7 @@ public void shouldPeekHeadOnEmptyQueue(String ignored, Serializer serializer, St
int queueId = ensureQueue(channel, observer, sQueueName);

NamedQueueResponse response = sendQueueRequest(channel, observer, queueId, NamedQueueRequestType.PeekHead, Empty.getDefaultInstance());
OptionalValue value = response.getMessage().unpack(OptionalValue.class);
OptionalValue value = unpackAny(response, NamedQueueResponse::getMessage, OptionalValue.class);

assertThat(value, is(notNullValue()));
assertThat(value.getPresent(), is(false));
Expand All @@ -390,7 +389,7 @@ public void shouldPeekTailWithExistingElement(String ignored, Serializer seriali
int queueId = ensureDeque(channel, observer, sQueueName);

NamedQueueResponse response = sendQueueRequest(channel, observer, queueId, NamedQueueRequestType.PeekTail, Empty.getDefaultInstance());
OptionalValue value = response.getMessage().unpack(OptionalValue.class);
OptionalValue value = unpackAny(response, NamedQueueResponse::getMessage, OptionalValue.class);

assertThat(value, is(notNullValue()));
assertThat(value.getPresent(), is(true));
Expand All @@ -416,7 +415,7 @@ public void shouldPeekTailOnEmptyQueue(String ignored, Serializer serializer, St
int queueId = ensureDeque(channel, observer, sQueueName);

NamedQueueResponse response = sendQueueRequest(channel, observer, queueId, NamedQueueRequestType.PeekTail, Empty.getDefaultInstance());
OptionalValue value = response.getMessage().unpack(OptionalValue.class);
OptionalValue value = unpackAny(response, NamedQueueResponse::getMessage, OptionalValue.class);

assertThat(value, is(notNullValue()));
assertThat(value.getPresent(), is(false));
Expand All @@ -442,7 +441,7 @@ public void shouldPollHeadWithExistingElement(String ignored, Serializer seriali
int queueId = ensureQueue(channel, observer, sQueueName);

NamedQueueResponse response = sendQueueRequest(channel, observer, queueId, NamedQueueRequestType.PollHead, Empty.getDefaultInstance());
OptionalValue value = response.getMessage().unpack(OptionalValue.class);
OptionalValue value = unpackAny(response, NamedQueueResponse::getMessage, OptionalValue.class);

assertThat(value, is(notNullValue()));
assertThat(value.getPresent(), is(true));
Expand All @@ -468,7 +467,7 @@ public void shouldPollHeadOnEmptyQueue(String ignored, Serializer serializer, St
int queueId = ensureQueue(channel, observer, sQueueName);

NamedQueueResponse response = sendQueueRequest(channel, observer, queueId, NamedQueueRequestType.PollHead, Empty.getDefaultInstance());
OptionalValue value = response.getMessage().unpack(OptionalValue.class);
OptionalValue value = unpackAny(response, NamedQueueResponse::getMessage, OptionalValue.class);

assertThat(value, is(notNullValue()));
assertThat(value.getPresent(), is(false));
Expand All @@ -494,7 +493,7 @@ public void shouldPollTailWithExistingElement(String ignored, Serializer seriali
int queueId = ensureDeque(channel, observer, sQueueName);

NamedQueueResponse response = sendQueueRequest(channel, observer, queueId, NamedQueueRequestType.PollTail, Empty.getDefaultInstance());
OptionalValue value = response.getMessage().unpack(OptionalValue.class);
OptionalValue value = unpackAny(response, NamedQueueResponse::getMessage, OptionalValue.class);

assertThat(value, is(notNullValue()));
assertThat(value.getPresent(), is(true));
Expand All @@ -520,7 +519,7 @@ public void shouldPollTailOnEmptyQueue(String ignored, Serializer serializer, St
int queueId = ensureDeque(channel, observer, sQueueName);

NamedQueueResponse response = sendQueueRequest(channel, observer, queueId, NamedQueueRequestType.PollTail, Empty.getDefaultInstance());
OptionalValue value = response.getMessage().unpack(OptionalValue.class);
OptionalValue value = unpackAny(response, NamedQueueResponse::getMessage, OptionalValue.class);

assertThat(value, is(notNullValue()));
assertThat(value.getPresent(), is(false));
Expand All @@ -545,12 +544,12 @@ public void shouldOfferToTail(String ignored, Serializer serializer, String sSco
ByteString bytesTwo = toByteString("value-2", serializer);

NamedQueueResponse response = sendQueueRequest(channel, observer, queueId, NamedQueueRequestType.OfferTail, BytesValue.of(bytesOne));
QueueOfferResult result = response.getMessage().unpack(QueueOfferResult.class);
QueueOfferResult result = unpackAny(response, NamedQueueResponse::getMessage, QueueOfferResult.class);
assertThat(result, is(notNullValue()));
assertThat(result.getSucceeded(), is(true));

response = sendQueueRequest(channel, observer, queueId, NamedQueueRequestType.OfferTail, BytesValue.of(bytesTwo));
result = response.getMessage().unpack(QueueOfferResult.class);
result = unpackAny(response, NamedQueueResponse::getMessage, QueueOfferResult.class);
assertThat(result, is(notNullValue()));
assertThat(result.getSucceeded(), is(true));

Expand All @@ -577,12 +576,12 @@ public void shouldOfferToHead(String ignored, Serializer serializer, String sSco
ByteString bytesTwo = toByteString("value-2", serializer);

NamedQueueResponse response = sendQueueRequest(channel, observer, queueId, NamedQueueRequestType.OfferHead, BytesValue.of(bytesOne));
QueueOfferResult result = response.getMessage().unpack(QueueOfferResult.class);
QueueOfferResult result = unpackAny(response, NamedQueueResponse::getMessage, QueueOfferResult.class);
assertThat(result, is(notNullValue()));
assertThat(result.getSucceeded(), is(true));

response = sendQueueRequest(channel, observer, queueId, NamedQueueRequestType.OfferHead, BytesValue.of(bytesTwo));
result = response.getMessage().unpack(QueueOfferResult.class);
result = unpackAny(response, NamedQueueResponse::getMessage, QueueOfferResult.class);
assertThat(result, is(notNullValue()));
assertThat(result.getSucceeded(), is(true));

Expand All @@ -604,7 +603,7 @@ public void shouldCallIsReady(String ignored, Serializer serializer, String sSco
int queueId = ensureQueue(channel, observer, sQueueName);

NamedQueueResponse response = sendQueueRequest(channel, observer, queueId, NamedQueueRequestType.IsReady, Empty.getDefaultInstance());
BoolValue result = response.getMessage().unpack(BoolValue.class);
BoolValue result = unpackAny(response, NamedQueueResponse::getMessage, BoolValue.class);
assertThat(result, is(notNullValue()));
assertThat(result.getValue(), is(true));
}
Expand All @@ -625,7 +624,7 @@ public void shouldCallIsEmptyOnEmptyQueue(String ignored, Serializer serializer,
int queueId = ensureQueue(channel, observer, sQueueName);

NamedQueueResponse response = sendQueueRequest(channel, observer, queueId, NamedQueueRequestType.IsEmpty, Empty.getDefaultInstance());
BoolValue result = response.getMessage().unpack(BoolValue.class);
BoolValue result = unpackAny(response, NamedQueueResponse::getMessage, BoolValue.class);
assertThat(result, is(notNullValue()));
assertThat(result.getValue(), is(true));
}
Expand All @@ -648,7 +647,7 @@ public void shouldCallIsEmptyOnPopulatedQueue(String ignored, Serializer seriali
int queueId = ensureQueue(channel, observer, sQueueName);

NamedQueueResponse response = sendQueueRequest(channel, observer, queueId, NamedQueueRequestType.IsEmpty, Empty.getDefaultInstance());
BoolValue result = response.getMessage().unpack(BoolValue.class);
BoolValue result = unpackAny(response, NamedQueueResponse::getMessage, BoolValue.class);
assertThat(result, is(notNullValue()));
assertThat(result.getValue(), is(false));
}
Expand All @@ -669,7 +668,7 @@ public void shouldCallSizeOnEmptyQueue(String ignored, Serializer serializer, St
int queueId = ensureQueue(channel, observer, sQueueName);

NamedQueueResponse response = sendQueueRequest(channel, observer, queueId, NamedQueueRequestType.Size, Empty.getDefaultInstance());
Int32Value result = response.getMessage().unpack(Int32Value.class);
Int32Value result = unpackAny(response, NamedQueueResponse::getMessage, Int32Value.class);
assertThat(result, is(notNullValue()));
assertThat(result.getValue(), is(0));
}
Expand All @@ -692,7 +691,7 @@ public void shouldCallSizeOnPopulatedQueue(String ignored, Serializer serializer
int queueId = ensureQueue(channel, observer, sQueueName);

NamedQueueResponse response = sendQueueRequest(channel, observer, queueId, NamedQueueRequestType.Size, Empty.getDefaultInstance());
Int32Value result = response.getMessage().unpack(Int32Value.class);
Int32Value result = unpackAny(response, NamedQueueResponse::getMessage, Int32Value.class);
assertThat(result, is(notNullValue()));
assertThat(result.getValue(), is(queue.size()));
}
Expand Down Expand Up @@ -765,17 +764,7 @@ public void shouldDestroyQueue(String ignored, Serializer serializer, String sSc
.stream()
.skip(count)
.filter(ProxyResponse::hasMessage)
.map(m ->
{
try
{
return m.getMessage().unpack(NamedQueueResponse.class);
}
catch (InvalidProtocolBufferException e)
{
throw new RuntimeException(e);
}
})
.map(m -> unpackAny(m, ProxyResponse::getMessage, NamedQueueResponse.class))
.filter(m -> m.getQueueId() == queueId)
.filter(m -> m.getType() == NamedQueueResponseType.Destroyed)
.findFirst();
Expand Down Expand Up @@ -843,7 +832,7 @@ protected int ensurePagedQueue(StreamObserver<ProxyRequest> channel, TestStreamO
}

protected int ensureQueue(StreamObserver<ProxyRequest> channel, TestStreamObserver<ProxyResponse> observer,
String sQueueName, NamedQueueType type) throws Exception
String sQueueName, NamedQueueType type) throws Exception
{
EnsureQueueRequest ensureQueueRequest = EnsureQueueRequest.newBuilder()
.setQueue(sQueueName)
Expand All @@ -862,16 +851,17 @@ protected int ensureQueue(StreamObserver<ProxyRequest> channel, TestStreamObserv
observer.awaitCount(responseId, 1, TimeUnit.MINUTES);
observer.assertNoErrors();

ProxyResponse proxyResponse = observer.valueAt(responseId - 1);
ProxyResponse.ResponseCase responseCase = proxyResponse.getResponseCase();
ProxyResponse proxyResponse = observer.valueAt(responseId - 1);
assertThat(proxyResponse, is(notNullValue()));

ProxyResponse.ResponseCase responseCase = proxyResponse.getResponseCase();
if (responseCase == ProxyResponse.ResponseCase.ERROR)
{
ErrorMessage error = proxyResponse.getError();
fail(error.getMessage());
}

NamedQueueResponse response = proxyResponse.getMessage().unpack(NamedQueueResponse.class);
NamedQueueResponse response = unpackAny(proxyResponse, ProxyResponse::getMessage, NamedQueueResponse.class);
int queueId = response.getQueueId();

assertThat(queueId, is(not(0)));
Expand Down Expand Up @@ -922,7 +912,7 @@ protected <Resp extends Message> Resp sendQueueRequest(StreamObserver<ProxyReque
{
return (Resp) proxyResponse.getComplete();
}
return (Resp) proxyResponse.getMessage().unpack(NamedQueueResponse.class);
return (Resp) unpackAny(proxyResponse, ProxyResponse::getMessage, NamedQueueResponse.class);
}

protected void init(StreamObserver<ProxyRequest> channel, TestStreamObserver<ProxyResponse> observer,
Expand Down Expand Up @@ -953,6 +943,21 @@ protected Set<String> getCacheNames(CacheService service)
return cacheNames;
}

protected <M extends Message, T extends Message> T unpackAny(M message, Function<M, Any> fn, Class<T> expected)
{
assertThat(message, is(notNullValue()));
try
{
return fn.apply(message).unpack(expected);
}
catch (Throwable e)
{
throw Exceptions.ensureRuntimeException(e,
"Failed to unpack proto message: " + e.getMessage() + "\nMessage:\n" + message);
}
}


// ----- data members ---------------------------------------------------

public static final AtomicLong m_queueNameSuffix = new AtomicLong();
Expand Down

0 comments on commit 06e3b01

Please sign in to comment.