Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebClient request id added and logging messages updated #2257

Merged
merged 3 commits into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static io.helidon.webclient.WebClientRequestBuilderImpl.IN_USE;
import static io.helidon.webclient.WebClientRequestBuilderImpl.RECEIVED;
import static io.helidon.webclient.WebClientRequestBuilderImpl.REQUEST;
import static io.helidon.webclient.WebClientRequestBuilderImpl.REQUEST_ID;
import static io.helidon.webclient.WebClientRequestBuilderImpl.RESPONSE;
import static io.helidon.webclient.WebClientRequestBuilderImpl.RESULT;

Expand All @@ -69,6 +70,7 @@ class NettyClientHandler extends SimpleChannelInboundHandler<HttpObject> {

private HttpResponsePublisher publisher;
private ResponseCloser responseCloser;
private long requestId;

/**
* Creates new instance.
Expand All @@ -87,10 +89,13 @@ public void channelReadComplete(ChannelHandlerContext ctx) {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws IOException {
if (msg instanceof HttpResponse) {
ctx.channel().config().setAutoRead(false);
Channel channel = ctx.channel();
channel.config().setAutoRead(false);
HttpResponse response = (HttpResponse) msg;
WebClientRequestImpl clientRequest = ctx.channel().attr(REQUEST).get();
this.requestId = channel.attr(REQUEST_ID).get();
WebClientRequestImpl clientRequest = channel.attr(REQUEST).get();
RequestConfiguration requestConfiguration = clientRequest.configuration();
LOGGER.finest(() -> "(client reqID: " + requestId + ") Initial http response message received.");

this.publisher = new HttpResponsePublisher(ctx);
this.responseCloser = new ResponseCloser(ctx);
Expand All @@ -111,15 +116,15 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws IO
// we got a response, we can safely complete the future
// all errors are now fed only to the publisher
WebClientResponse clientResponse = responseBuilder.build();
ctx.channel().attr(RESPONSE).set(clientResponse);
channel.attr(RESPONSE).set(clientResponse);

for (HttpInterceptor interceptor : HTTP_INTERCEPTORS) {
if (interceptor.shouldIntercept(response.status(), requestConfiguration)) {
boolean continueAfter = !interceptor.continueAfterInterception();
if (continueAfter) {
responseCloser.close().thenAccept(future -> LOGGER.finest(() -> "Response closed due to redirection"));
}
interceptor.handleInterception(response, clientRequest, ctx.channel().attr(RESULT).get());
interceptor.handleInterception(response, clientRequest, channel.attr(RESULT).get());
if (continueAfter) {
return;
}
Expand All @@ -134,7 +139,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws IO
clientResponse.headers(),
clientResponse.status());

ctx.channel().attr(SERVICE_RESPONSE).set(clientServiceResponse);
channel.attr(SERVICE_RESPONSE).set(clientServiceResponse);

List<WebClientService> services = requestConfiguration.services();
CompletionStage<WebClientServiceResponse> csr = CompletableFuture.completedFuture(clientServiceResponse);
Expand All @@ -143,8 +148,8 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws IO
csr = csr.thenCompose(clientSerResponse -> service.response(clientRequest, clientSerResponse));
}

CompletableFuture<WebClientServiceResponse> responseReceived = ctx.channel().attr(RECEIVED).get();
CompletableFuture<WebClientResponse> responseFuture = ctx.channel().attr(RESULT).get();
CompletableFuture<WebClientServiceResponse> responseReceived = channel.attr(RECEIVED).get();
CompletableFuture<WebClientResponse> responseFuture = channel.attr(RESULT).get();
csr.whenComplete((clientSerResponse, throwable) -> {
if (throwable != null) {
responseReceived.completeExceptionally(throwable);
Expand Down Expand Up @@ -176,6 +181,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws IO
}

if (msg instanceof LastHttpContent) {
LOGGER.finest(() -> "(client reqID: " + requestId + ") Last http content received.");
responseCloser.close();
}
}
Expand Down Expand Up @@ -286,6 +292,7 @@ boolean isClosed() {
*/
Single<Void> close() {
if (closed.compareAndSet(false, true)) {
LOGGER.finest(() -> "(client reqID: " + requestId + ") Closing the response from the server.");
Channel channel = ctx.channel();
WebClientServiceResponse clientServiceResponse = channel.attr(SERVICE_RESPONSE).get();
CompletableFuture<WebClientServiceResponse> requestComplete = channel.attr(COMPLETED).get();
Expand All @@ -297,7 +304,8 @@ Single<Void> close() {
ctx.close()
.addListener(future -> {
if (future.isSuccess()) {
LOGGER.finest(() -> "Response from the server has been closed.");
LOGGER.finest(() -> "(client reqID: " + requestId + ") "
+ "Response from the server has been closed.");
cf.complete(null);
} else {
LOGGER.log(Level.SEVERE,
Expand All @@ -307,6 +315,7 @@ Single<Void> close() {
}
});
} else {
LOGGER.finest(() -> "(client reqID: " + requestId + ") Returning channel to the cache.");
channel.attr(IN_USE).get().set(false);
cf.complete(null);
channel.read();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ public void handleInterception(HttpResponse httpResponse,
WebClientRequestImpl clientRequest,
CompletableFuture<WebClientResponse> responseFuture) {
if (httpResponse.headers().contains(Http.Header.LOCATION)) {
long requestId = clientRequest.configuration().requestId();
String newUri = httpResponse.headers().get(Http.Header.LOCATION);
LOGGER.fine(() -> "Redirecting to " + newUri);
LOGGER.finest(() -> "(client reqID: " + requestId + ") Redirecting to " + newUri);
WebClientRequestBuilder requestBuilder = WebClientRequestBuilderImpl
.create(clientRequest);
if (URI.create(newUri).getHost() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
class RequestConfiguration extends WebClientConfiguration {

private final URI requestURI;
private final long requestId;
private final WebClientServiceRequest clientServiceRequest;
private final List<WebClientService> services;

Expand All @@ -36,6 +37,7 @@ private RequestConfiguration(Builder builder) {
requestURI = builder.requestURI;
clientServiceRequest = builder.clientServiceRequest;
services = builder.services;
requestId = builder.requestId;
}

URI requestURI() {
Expand All @@ -50,13 +52,18 @@ List<WebClientService> services() {
return services;
}

long requestId() {
return requestId;
}

static Builder builder(URI requestURI) {
return new Builder(requestURI);
}

static final class Builder extends WebClientConfiguration.Builder<Builder, RequestConfiguration> {

private final URI requestURI;
private long requestId = -1;
private WebClientServiceRequest clientServiceRequest;
private List<WebClientService> services = new ArrayList<>();

Expand All @@ -74,6 +81,11 @@ Builder services(List<WebClientService> services) {
return this;
}

Builder requestId(long requestId) {
this.requestId = requestId;
return this;
}

@Override
public RequestConfiguration build() {
return new RequestConfiguration(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.netty.util.concurrent.GenericFutureListener;

import static io.helidon.webclient.WebClientRequestBuilderImpl.REQUEST;
import static io.helidon.webclient.WebClientRequestBuilderImpl.REQUEST_ID;

/**
* Subscriber which handles entity sending.
Expand All @@ -46,6 +47,7 @@ class RequestContentSubscriber implements Flow.Subscriber<DataChunk> {
private final CompletableFuture<WebClientServiceRequest> sent;
private final DefaultHttpRequest request;
private final Channel channel;
private final long requestId;

private volatile Flow.Subscription subscription;
private volatile DataChunk firstDataChunk;
Expand All @@ -59,13 +61,14 @@ class RequestContentSubscriber implements Flow.Subscriber<DataChunk> {
this.channel = channel;
this.responseFuture = responseFuture;
this.sent = sent;
this.requestId = channel.attr(REQUEST_ID).get();
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
LOGGER.finest(() -> "Writing sending request and its content to the server.");
LOGGER.finest(() -> "(client reqID: " + requestId + ") Writing sending request and its content to the server.");
}

@Override
Expand Down Expand Up @@ -103,7 +106,8 @@ public void onError(Throwable throwable) {
@Override
public void onComplete() {
if (lengthOptimization) {
LOGGER.finest(() -> "Message body contains only one data chunk. Setting chunked encoding to false.");
LOGGER.finest(() -> "(client reqID: " + requestId + ") "
+ "Message body contains only one data chunk. Setting chunked encoding to false.");
HttpUtil.setTransferEncodingChunked(request, false);
if (firstDataChunk != null) {
HttpUtil.setContentLength(request, firstDataChunk.remaining());
Expand All @@ -113,26 +117,28 @@ public void onComplete() {
sendData(firstDataChunk);
}
}
LOGGER.finest(() -> "Sending last http content");
LOGGER.finest(() -> "(client reqID: " + requestId + ") Sending last http content");
channel.writeAndFlush(LAST_HTTP_CONTENT)
.addListener(completeOnFailureListener("An exception occurred when writing last http content."))
.addListener(completeOnFailureListener("(client reqID: " + requestId + ") "
+ "An exception occurred when writing last http content."))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

WebClientRequestImpl clientRequest = channel.attr(REQUEST).get();
WebClientServiceRequest serviceRequest = clientRequest.configuration().clientServiceRequest();
sent.complete(serviceRequest);
LOGGER.finest(() -> "(client reqID: " + requestId + ") Request sent");
}

private void sendData(DataChunk data) {
LOGGER.finest(() -> "Sending data chunk");
LOGGER.finest(() -> "(client reqID: " + requestId + ") Sending data chunk");
DefaultHttpContent httpContent = new DefaultHttpContent(Unpooled.wrappedBuffer(data.data()));
channel.writeAndFlush(httpContent)
.addListener(future -> {
data.release();
subscription.request(1);
LOGGER.finest(() -> "Data chunk sent with result: " + future.isSuccess());
LOGGER.finest(() -> "(client reqID: " + requestId + ") Data chunk sent with result: " + future.isSuccess());
})
.addListener(completeOnFailureListener("Failure when sending a content!"))
.addListener(completeOnFailureListener("(client reqID: " + requestId + ") Failure when sending a content!"))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,14 @@ public interface WebClientRequestBuilder {
*/
WebClientRequestBuilder keepAlive(boolean keepAlive);

/**
* Set new request id. This id is used in logging messages.
*
* @param requestId new request id
* @return updated builder instance
*/
WebClientRequestBuilder requestId(long requestId);

/**
* Performs prepared request and transforms response to requested type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class WebClientRequestBuilderImpl implements WebClientRequestBuilder {
static final AttributeKey<AtomicBoolean> IN_USE = AttributeKey.valueOf("inUse");
static final AttributeKey<WebClientResponse> RESPONSE = AttributeKey.valueOf("response");
static final AttributeKey<ConnectionIdent> CONNECTION_IDENT = AttributeKey.valueOf("connectionIdent");
static final AttributeKey<Long> REQUEST_ID = AttributeKey.valueOf("requestID");

private static final AtomicLong REQUEST_NUMBER = new AtomicLong(0);
private static final String DEFAULT_TRANSPORT_PROTOCOL = "http";
Expand Down Expand Up @@ -128,6 +129,7 @@ class WebClientRequestBuilderImpl implements WebClientRequestBuilder {
private Duration readTimeout;
private Duration connectTimeout;
private boolean keepAlive;
private Long requestId;

private WebClientRequestBuilderImpl(LazyValue<NioEventLoopGroup> eventGroup,
WebClientConfiguration configuration,
Expand All @@ -147,7 +149,8 @@ private WebClientRequestBuilderImpl(LazyValue<NioEventLoopGroup> eventGroup,
this.services = configuration.clientServices();
this.readerContext = MessageBodyReaderContext.create(configuration.readerContext());
this.writerContext = MessageBodyWriterContext.create(configuration.writerContext(), headers);
Context.Builder contextBuilder = Context.builder().id("webclient-" + REQUEST_NUMBER.incrementAndGet());
this.requestId = null;
Context.Builder contextBuilder = Context.builder().id("webclient-" + requestId);
configuration.context().ifPresentOrElse(contextBuilder::parent,
() -> Contexts.context().ifPresent(contextBuilder::parent));
this.context = contextBuilder.build();
Expand Down Expand Up @@ -365,6 +368,12 @@ public WebClientRequestBuilder keepAlive(boolean keepAlive) {
return this;
}

@Override
public WebClientRequestBuilder requestId(long requestId) {
this.requestId = requestId;
return this;
}

@Override
public <T> Single<T> request(Class<T> responseType) {
return request(GenericType.create(responseType));
Expand Down Expand Up @@ -425,6 +434,10 @@ public MessageBodyWriterContext writerContext() {
return writerContext;
}

long requestId() {
return requestId;
}

Http.RequestMethod method() {
return method;
}
Expand Down Expand Up @@ -481,6 +494,10 @@ private <T> Single<T> invokeWithEntity(Flow.Publisher<DataChunk> requestEntity,

private Single<WebClientResponse> invoke(Flow.Publisher<DataChunk> requestEntity) {
this.uri = prepareFinalURI();
if (requestId == null) {
requestId = REQUEST_NUMBER.incrementAndGet();
}
// LOGGER.finest(() -> "(client reqID: " + requestId + ") Request final URI: " + uri);
CompletableFuture<WebClientServiceRequest> sent = new CompletableFuture<>();
CompletableFuture<WebClientServiceResponse> responseReceived = new CompletableFuture<>();
CompletableFuture<WebClientServiceResponse> complete = new CompletableFuture<>();
Expand All @@ -498,6 +515,7 @@ private Single<WebClientResponse> invoke(Flow.Publisher<DataChunk> requestEntity
}

return Single.create(rcs.thenCompose(serviceRequest -> {
requestId = serviceRequest.requestId();
HttpHeaders headers = toNettyHttpHeaders();
DefaultHttpRequest request = new DefaultHttpRequest(toNettyHttpVersion(httpVersion),
toNettyMethod(method),
Expand All @@ -517,6 +535,7 @@ private Single<WebClientResponse> invoke(Flow.Publisher<DataChunk> requestEntity
.context(context)
.proxy(proxy)
.keepAlive(keepAlive)
.requestId(requestId)
.build();
WebClientRequestImpl clientRequest = new WebClientRequestImpl(this);

Expand All @@ -535,12 +554,13 @@ private Single<WebClientResponse> invoke(Flow.Publisher<DataChunk> requestEntity
: bootstrap.connect(uri.getHost(), uri.getPort());

channelFuture.addListener((ChannelFutureListener) future -> {
LOGGER.finest(() -> "ChannelFuture hashcode -> " + channelFuture.hashCode());
LOGGER.finest(() -> "Channel hashcode -> " + channelFuture.channel().hashCode());
LOGGER.finest(() -> "(client reqID: " + requestId + ") "
+ "Channel hashcode -> " + channelFuture.channel().hashCode());
channelFuture.channel().attr(REQUEST).set(clientRequest);
channelFuture.channel().attr(RECEIVED).set(responseReceived);
channelFuture.channel().attr(COMPLETED).set(complete);
channelFuture.channel().attr(RESULT).set(result);
channelFuture.channel().attr(REQUEST_ID).set(requestId);
Throwable cause = future.cause();
if (null == cause) {
RequestContentSubscriber requestContentSubscriber = new RequestContentSubscriber(request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ public interface WebClientServiceRequest extends HttpRequest {
*/
Context context();

/**
* Request id which will be used in logging messages.
*
* @return current request id
*/
long requestId();

/**
* Set new request id. This id is used in logging messages.
*
* @param requestId new request id
*/
void requestId(long requestId);

/**
* Completes when the request part of this request is done (e.g. we have sent all headers and bytes).
*
Expand Down
Loading