Skip to content

Commit

Permalink
Fix connection leak in rest proxy when return type is void or mono<vo…
Browse files Browse the repository at this point in the history
…id> (Azure#30072)

* Fix connection leak in rest proxy when return type is void or mono<void>

* changelog.
  • Loading branch information
kasobol-msft committed Jul 21, 2022
1 parent 3f0138c commit 92b7424
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void run() {

@Override
public Mono<Void> runAsync() {
return service.setBinaryData(endpoint, id, binaryDataSupplier.get(), length)
.then();
return Mono.fromSupplier(binaryDataSupplier)
.flatMap(data -> service.setBinaryData(endpoint, id, data, length));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,23 @@ public void run() {

@Override
public Mono<Void> runAsync() {
HttpHeaders headers = new HttpHeaders();
headers.set("Content-Length", contentLengthHeaderValue);
HttpRequest httpRequest = new HttpRequest(
HttpMethod.PUT, targetURL, headers, binaryDataSupplier.get().toFluxByteBuffer());
// Context with azure-eagerly-read-response=true makes sure
// that response is disposed to prevent connection leak.
// There's no response body in this scenario anyway.
return httpPipeline.send(httpRequest, context)
.map(httpResponse -> {
if (httpResponse.getStatusCode() / 100 != 2) {
throw new IllegalStateException("Endpoint didn't return 2xx http status code.");
}
return httpResponse;
})
.then();
return Mono.fromSupplier(binaryDataSupplier)
.flatMap(data -> {
HttpHeaders headers = new HttpHeaders();
headers.set("Content-Length", contentLengthHeaderValue);
HttpRequest httpRequest = new HttpRequest(
HttpMethod.PUT, targetURL, headers, data);
// Context with azure-eagerly-read-response=true makes sure
// that response is disposed to prevent connection leak.
// There's no response body in this scenario anyway.
return httpPipeline.send(httpRequest, context)
.map(httpResponse -> {
if (httpResponse.getStatusCode() / 100 != 2) {
throw new IllegalStateException("Endpoint didn't return 2xx http status code.");
}
return httpResponse;
})
.then();
});
}
}
2 changes: 2 additions & 0 deletions sdk/core/azure-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Fixed bug where `RestProxy` could leak connection if service method returned `Mono<Void>` or `void`.

### Other Changes

## 1.30.0 (2022-06-30)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ private Object handleRestReturnType(final Mono<HttpResponseDecoder.HttpDecodedRe
final Type monoTypeParam = TypeUtil.getTypeArgument(returnType);
if (TypeUtil.isTypeOrSubTypeOf(monoTypeParam, Void.class)) {
// ProxyMethod ReturnType: Mono<Void>
result = asyncExpectedResponse.then();
result = asyncExpectedResponse.doOnNext(HttpResponseDecoder.HttpDecodedResponse::close).then();
} else {
// ProxyMethod ReturnType: Mono<? extends RestResponseBase<?, ?>>
result = asyncExpectedResponse.flatMap(response ->
Expand All @@ -218,7 +218,7 @@ private Object handleRestReturnType(final Mono<HttpResponseDecoder.HttpDecodedRe
} else if (TypeUtil.isTypeOrSubTypeOf(returnType, void.class) || TypeUtil.isTypeOrSubTypeOf(returnType,
Void.class)) {
// ProxyMethod ReturnType: Void
asyncExpectedResponse.block();
asyncExpectedResponse.doOnNext(HttpResponseDecoder.HttpDecodedResponse::close).block();
result = null;
} else {
// ProxyMethod ReturnType: T where T != async (Mono, Flux) or sync Void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ private Object handleRestReturnType(final HttpResponseDecoder.HttpDecodedRespons
if (TypeUtil.isTypeOrSubTypeOf(returnType, void.class) || TypeUtil.isTypeOrSubTypeOf(returnType,
Void.class)) {
// ProxyMethod ReturnType: Void
result = expectedResponse;
expectedResponse.close();
result = null;
} else {
// ProxyMethod ReturnType: T where T != async (Mono, Flux) or sync Void
// Block the deserialization until a value T is received
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ Mono<Response<Void>> testMethod(
@HeaderParam("Content-Length") Long contentLength
);

@Get("my/url/path")
@ExpectedResponses({200})
Mono<Void> testMethodReturnsMonoVoid();

@Get("my/url/path")
@ExpectedResponses({200})
void testVoidMethod();

@Get("my/url/path")
@ExpectedResponses({200})
StreamResponse testDownload();
Expand Down Expand Up @@ -179,6 +187,35 @@ public void doesNotChangeBinaryDataContentType(BinaryData data, long contentLeng
assertEquals(expectedContentClazz, actualContentClazz);
}

@Test
public void monoVoidReturningApiClosesResponse() {
LocalHttpClient client = new LocalHttpClient();
HttpPipeline pipeline = new HttpPipelineBuilder()
.httpClient(client)
.build();

TestInterface testInterface = RestProxy.create(TestInterface.class, pipeline);
StepVerifier.create(
testInterface.testMethodReturnsMonoVoid())
.verifyComplete();

Mockito.verify(client.lastResponseSpy).close();
}

@Test
public void voidReturningApiClosesResponse() {
LocalHttpClient client = new LocalHttpClient();
HttpPipeline pipeline = new HttpPipelineBuilder()
.httpClient(client)
.build();

TestInterface testInterface = RestProxy.create(TestInterface.class, pipeline);

testInterface.testVoidMethod();

Mockito.verify(client.lastResponseSpy).close();
}

private static Stream<Arguments> doesNotChangeBinaryDataContentTypeDataProvider() throws Exception {
String string = "hello";
byte[] bytes = string.getBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,25 @@ Response<Void> testMethod(
@Get("my/url/path")
@ExpectedResponses({200})
StreamResponse testDownload(Context context);

@Get("my/url/path")
@ExpectedResponses({200})
void testVoidMethod(Context context);
}

@Test
public void voidReturningApiClosesResponse() {
LocalHttpClient client = new LocalHttpClient();
HttpPipeline pipeline = new HttpPipelineBuilder()
.httpClient(client)
.build();

TestInterface testInterface = RestProxy.create(TestInterface.class, pipeline);

Context context = new Context(HTTP_REST_PROXY_SYNC_PROXY_ENABLE, true);
testInterface.testVoidMethod(context);

Mockito.verify(client.lastResponseSpy).close();
}

@Test
Expand Down

0 comments on commit 92b7424

Please sign in to comment.