Skip to content

Commit

Permalink
Defer trailer transformer callbacks on null msg-body to subscription (#…
Browse files Browse the repository at this point in the history
…1577)

Motivation:

When we have a null msg-body and call `transform`, then the callbacks of the trailer
transformer are invoked on the calling thread rather than the subscriber thread.

Modifications:

- Used `Publisher#defer` to defer the invocation of `TrailersTransformer#payloadComplete`;

Result:

Better error handling in case of erroneous trailer transformer (ie. trailers validations)
and deferred execution of the callbacks until someone subscribers to payload body.
  • Loading branch information
tkountis authored Jun 9, 2021
1 parent 83c9206 commit 4f7110b
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2019 Apple Inc. and the ServiceTalk project authors
* Copyright © 2019, 2021 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -135,8 +135,9 @@ void transformMessageBody(UnaryOperator<Publisher<?>> transformer) {

<T> void transform(final TrailersTransformer<T, Buffer> trailersTransformer) {
if (messageBody == null) {
messageBody = from(trailersTransformer.payloadComplete(trailersTransformer.newState(),
headersFactory.newEmptyTrailers()));
messageBody = defer(() ->
from(trailersTransformer.payloadComplete(trailersTransformer.newState(),
headersFactory.newEmptyTrailers())).subscribeShareContext());
} else {
payloadInfo.setEmpty(false); // transformer may add payload content
messageBody = messageBody.scanWith(() -> new ScanWithMapper<Object, Object>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ private enum UpdateMode {
SetWithSerializer,
Transform,
TransformWithTrailer,
TransformWithSerializer
TransformWithSerializer,
TransformWithErrorInTrailer
}

private enum SourceType {
Expand Down Expand Up @@ -133,6 +134,7 @@ private void setUp(SourceType sourceType, UpdateMode updateMode, boolean doubleT
case Transform:
case TransformWithTrailer:
case TransformWithSerializer:
case TransformWithErrorInTrailer:
transformFunctions.setupFor(updateMode, payloadHolder, sourceTypeTrailers);
if (doubleTransform) {
secondTransformFunctions.setupFor(updateMode, payloadHolder, sourceTypeTrailers);
Expand Down Expand Up @@ -219,7 +221,10 @@ void sourceEmitsTrailersUnconditionally(SourceType sourceType,
(updateMode == UpdateMode.Set || updateMode == UpdateMode.SetWithSerializer)) {
payloadSource.onComplete(); // Original source should complete for us to emit trailers
}
getPayloadSource().onComplete();
if (updateMode != UpdateMode.TransformWithErrorInTrailer) {
getPayloadSource().onComplete();
}

verifyTrailersReceived();
}

Expand Down Expand Up @@ -315,7 +320,10 @@ private void simulateAndVerifyTrailerReadIfApplicable() {
if (sourceType == SourceType.Trailers) {
assert payloadSource != null;
payloadSource.onNext(mock(HttpHeaders.class));
payloadSource.onComplete();
if (updateMode != UpdateMode.TransformWithErrorInTrailer) {
// If trailers with error, the publisher is already terminated by the error
payloadSource.onComplete();
}
tryCompletePayloadSource();
} else {
if (payloadSource != null) {
Expand All @@ -336,11 +344,15 @@ private void tryCompletePayloadSource() {
}

private void verifyTrailersReceived() {
List<Object> items = payloadAndTrailersSubscriber.takeOnNext(1);
assertThat("Unexpected trailer", items, hasSize(1));
assertThat("Unexpected trailer", items.get(0), is(instanceOf(HttpHeaders.class)));
payloadAndTrailersSubscriber.awaitOnComplete();
if (updateMode == UpdateMode.TransformWithTrailer) {
if (updateMode == UpdateMode.TransformWithErrorInTrailer) {
payloadAndTrailersSubscriber.awaitOnError();
} else {
List<Object> items = payloadAndTrailersSubscriber.takeOnNext(1);
assertThat("Unexpected trailer", items, hasSize(1));
assertThat("Unexpected trailer", items.get(0), is(instanceOf(HttpHeaders.class)));
payloadAndTrailersSubscriber.awaitOnComplete();
}
if (updateMode == UpdateMode.TransformWithTrailer || updateMode == UpdateMode.TransformWithErrorInTrailer) {
verify(transformFunctions.trailerTransformer).payloadComplete(any(), any());
}
}
Expand All @@ -349,14 +361,18 @@ private void simulateAndVerifyPayloadComplete(final TestPublisherSubscriber<?> s
if (canControlPayload()) {
getPayloadSource().onComplete();
}
if (updateMode == UpdateMode.TransformWithTrailer) {
if (updateMode == UpdateMode.TransformWithTrailer || updateMode == UpdateMode.TransformWithErrorInTrailer) {
subscriber.awaitSubscription().request(1);
}
if (payloadSource != null && (updateMode == UpdateMode.Set || updateMode == UpdateMode.SetWithSerializer)) {
// A set operation was done with a prior Publisher, we need to complete the prior Publisher.
payloadSource.onComplete();
}
subscriber.awaitOnComplete();
if (updateMode == UpdateMode.TransformWithErrorInTrailer) {
subscriber.awaitOnError();
} else {
subscriber.awaitOnComplete();
}
}

private TestPublisher<Object> getPayloadSource() {
Expand Down Expand Up @@ -438,6 +454,12 @@ void setupFor(UpdateMode updateMode, StreamingHttpPayloadHolder payloadHolder, b
assertThat(payloadHolder.mayHaveTrailers(), is(true));
assertThat(payloadHolder.isGenericTypeBuffer(), is(false));
break;
case TransformWithErrorInTrailer:
when(trailerTransformer.payloadComplete(any(), any())).thenThrow(DELIBERATE_EXCEPTION);
payloadHolder.transform(trailerTransformer);
assertThat(payloadHolder.mayHaveTrailers(), is(true));
assertThat(payloadHolder.isGenericTypeBuffer(), is(false));
break;
case TransformWithSerializer:
payloadHolder.transformPayloadBody(stringTransformer, textSerializer());
assertThat(payloadHolder.isGenericTypeBuffer(), is(not(sourceTypeTrailers)));
Expand All @@ -455,6 +477,7 @@ void verifyMocks(UpdateMode updateMode, SourceType sourceType, HttpHeadersFactor
verify(transformer).apply(any());
break;
case TransformWithTrailer:
case TransformWithErrorInTrailer:
verify(trailerTransformer).newState();
if (canControlPayload) {
verify(trailerTransformer).accept(any(), any());
Expand Down

0 comments on commit 4f7110b

Please sign in to comment.