Skip to content

Commit

Permalink
Issue #12266 - InvocationType improvements and cleanups.
Browse files Browse the repository at this point in the history
This is the work for the client-side.

Now the `InvocationType` can be specified at the `HttpClientTransport` level, or at the `ClientConnectionFactory.Info` level (for the dynamic transport).

Wired the `InvocationType` also for `Response.ContentSourceListener`, so that for applications that read response content using `Content.Source` and specify an `InvocationType` to `demand(Runnable)`, the implementation will honor it.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Nov 29, 2024
1 parent d1fc707 commit 1345a6f
Show file tree
Hide file tree
Showing 26 changed files with 557 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@

/**
* {@link HttpClientTransport} represents what transport implementations should provide
* in order to plug-in a different transport for {@link HttpClient}.
* in order to plug in a different transport for {@link HttpClient}.
* <p>
* While the {@link HttpClient} APIs define the HTTP semantic (request, response, headers, etc.)
* <em>how</em> an HTTP exchange is carried over the network depends on implementations of this class.
* <p>
* The default implementation uses the HTTP protocol to carry over the network the HTTP exchange,
* but the HTTP exchange may also be carried using the FCGI protocol, the HTTP/2 protocol or,
* in future, other protocols.
* in the future, other protocols.
*/
public interface HttpClientTransport extends ClientConnectionFactory, HttpClient.Aware
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ interface AsyncContentListener extends ContentSourceListener
@Override
default void onContentSource(Response response, Content.Source contentSource)
{
Runnable demandCallback = Invocable.from(Invocable.InvocationType.NON_BLOCKING, () -> onContentSource(response, contentSource));
Runnable demandCallback = Invocable.from(Invocable.InvocationType.BLOCKING, () -> onContentSource(response, contentSource));
Content.Chunk chunk = contentSource.read();
if (chunk == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,23 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;

import org.eclipse.jetty.client.transport.internal.HttpConnectionOverHTTP;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.Transport;
import org.eclipse.jetty.util.thread.Invocable;

public class HttpClientConnectionFactory implements ClientConnectionFactory
public class HttpClientConnectionFactory implements ClientConnectionFactory, Invocable
{
/**
* <p>Representation of the {@code HTTP/1.1} application protocol used by {@link HttpClientTransportDynamic}.</p>
*/
public static final Info HTTP11 = new HTTP11();

private boolean initializeConnections;
private InvocationType invocationType = InvocationType.BLOCKING;

/**
* @return whether newly created connections should be initialized with an {@code OPTIONS * HTTP/1.1} request
Expand All @@ -46,11 +49,23 @@ public void setInitializeConnections(boolean initialize)
this.initializeConnections = initialize;
}

@Override
public InvocationType getInvocationType()
{
return invocationType;
}

public void setInvocationType(InvocationType invocationType)
{
this.invocationType = Objects.requireNonNull(invocationType);
}

@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context)
{
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, context);
connection.setInitialize(isInitializeConnections());
connection.setInvocationType(getInvocationType());
return customize(connection, context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ManagedObject("The HTTP/1.1 client transport")
public class HttpClientTransportOverHTTP extends AbstractConnectorHttpClientTransport
public class HttpClientTransportOverHTTP extends AbstractConnectorHttpClientTransport implements Invocable
{
public static final Origin.Protocol HTTP11 = new Origin.Protocol(List.of("http/1.1"), false);
private static final Logger LOG = LoggerFactory.getLogger(HttpClientTransportOverHTTP.class);
Expand Down Expand Up @@ -127,4 +128,15 @@ public void setInitializeConnections(boolean initialize)
{
factory.setInitializeConnections(initialize);
}

@Override
public InvocationType getInvocationType()
{
return factory.getInvocationType();
}

public void setInvocationType(InvocationType invocationType)
{
factory.setInvocationType(invocationType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.SerializedInvoker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -64,7 +65,7 @@
*
* @see HttpSender
*/
public abstract class HttpReceiver
public abstract class HttpReceiver implements Invocable
{
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiver.class);

Expand Down Expand Up @@ -442,6 +443,12 @@ private void terminateResponse(HttpExchange exchange, Result result)
}
}

@Override
public InvocationType getInvocationType()
{
return contentSource.getInvocationType();
}

/**
* Resets the state of this HttpReceiver.
* <p>
Expand Down Expand Up @@ -499,7 +506,7 @@ public void abort(HttpExchange exchange, Throwable failure, Promise<Boolean> pro
responseState = ResponseState.FAILURE;
this.failure = failure;
if (contentSource != null)
contentSource.error(failure);
contentSource.onError(failure);
dispose();

HttpResponse response = exchange.getResponse();
Expand Down Expand Up @@ -556,12 +563,12 @@ private enum ResponseState
FAILURE
}

private interface NotifiableContentSource extends Content.Source, Destroyable
private interface NotifiableContentSource extends Content.Source, Invocable, Destroyable
{
boolean error(Throwable failure);

void onDataAvailable();

boolean onError(Throwable failure);

@Override
default void destroy()
{
Expand Down Expand Up @@ -595,6 +602,12 @@ public void onDataAvailable()
getContentSource().onDataAvailable();
}

@Override
public InvocationType getInvocationType()
{
return getContentSource().getInvocationType();
}

@Override
protected Content.Chunk transform(Content.Chunk inputChunk)
{
Expand Down Expand Up @@ -666,12 +679,12 @@ protected Content.Chunk transform(Content.Chunk inputChunk)
}

@Override
public boolean error(Throwable failure)
public boolean onError(Throwable failure)
{
if (_chunk != null)
_chunk.release();
_chunk = null;
return getContentSource().error(failure);
return getContentSource().onError(failure);
}

@Override
Expand Down Expand Up @@ -740,6 +753,12 @@ public void onDataAvailable()
invokeDemandCallback(false);
}

@Override
public InvocationType getInvocationType()
{
return Invocable.getInvocationType(demandCallbackRef.get());
}

@Override
public void demand(Runnable demandCallback)
{
Expand Down Expand Up @@ -820,14 +839,14 @@ public void fail(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Failing {}", this);
boolean failed = error(failure);
boolean failed = onError(failure);
if (failed)
HttpReceiver.this.failAndClose(failure);
invokeDemandCallback(true);
}

@Override
public boolean error(Throwable failure)
public boolean onError(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Erroring {}", this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,18 @@
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Sweeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpConnectionOverHTTP extends AbstractConnection implements IConnection, org.eclipse.jetty.io.Connection.UpgradeFrom, Sweeper.Sweepable, Attachable
public class HttpConnectionOverHTTP extends AbstractConnection implements IConnection, org.eclipse.jetty.io.Connection.UpgradeFrom, Sweeper.Sweepable, Attachable, Invocable
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverHTTP.class);

private final Callback fillableCallback = new FillableCallback();
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicInteger sweeps = new AtomicInteger();
private final Promise<Connection> promise;
Expand All @@ -64,6 +67,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
private final LongAdder bytesOut = new LongAdder();
private long idleTimeout;
private boolean initialize;
private InvocationType invocationType = InvocationType.BLOCKING;

public HttpConnectionOverHTTP(EndPoint endPoint, Map<String, Object> context)
{
Expand Down Expand Up @@ -184,6 +188,23 @@ public void setInitialize(boolean initialize)
this.initialize = initialize;
}

@Override
public InvocationType getInvocationType()
{
return invocationType;
}

public void setInvocationType(InvocationType invocationType)
{
this.invocationType = invocationType;
}

@Override
public void fillInterested()
{
fillInterested(fillableCallback);
}

@Override
public void onOpen()
{
Expand Down Expand Up @@ -432,4 +453,25 @@ public String toString()
return HttpConnectionOverHTTP.this.toString();
}
}

private class FillableCallback implements Callback
{
@Override
public void succeeded()
{
onFillable();
}

@Override
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}

@Override
public InvocationType getInvocationType()
{
return HttpConnectionOverHTTP.this.getInvocationType();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,6 +44,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP.class);

private final Callback demandContentCallback = new DemandContentCallback();
private final Runnable receiveNext = this::receiveNext;
private final LongAdder inMessages = new LongAdder();
private final HttpParser parser;
Expand Down Expand Up @@ -72,18 +74,18 @@ public HttpReceiverOverHTTP(HttpChannelOverHTTP channel)

void receive()
{
if (!hasContent())
{
boolean setFillInterest = parseAndFill(true);
if (!hasContent() && setFillInterest)
fillInterested();
}
else
if (hasContent())
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
responseContentAvailable(exchange);
}
else
{
boolean setFillInterest = parseAndFill(true);
if (!hasContent() && setFillInterest)
getHttpConnection().fillInterested();
}
}

@Override
Expand Down Expand Up @@ -386,7 +388,7 @@ protected void fillInterested()
{
if (LOG.isDebugEnabled())
LOG.debug("Registering as fill interested in {}", this);
getHttpConnection().fillInterested();
getHttpConnection().fillInterested(demandContentCallback);
}

private void shutdown()
Expand Down Expand Up @@ -527,7 +529,7 @@ private void receiveNext()
LOG.debug("Receiving next request in {}", this);
boolean setFillInterest = parseAndFill(true);
if (!hasContent() && setFillInterest)
fillInterested();
getHttpConnection().fillInterested();
}

@Override
Expand Down Expand Up @@ -572,4 +574,25 @@ private enum State
{
STATUS, HEADERS, CONTENT, COMPLETE
}

private class DemandContentCallback implements Callback
{
@Override
public void succeeded()
{
getHttpConnection().onFillable();
}

@Override
public void failed(Throwable failure)
{
getHttpConnection().onFillInterestedFailed(failure);
}

@Override
public InvocationType getInvocationType()
{
return HttpReceiverOverHTTP.this.getInvocationType();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ protected void fillInterested()
// Verify that the buffer has been released
// before fillInterested() is called.
assertNull(getResponseBuffer());
// Fill the endpoint so receive is called again.
// Fill the endpoint so receive() is called again.
endPoint.addInput("X");
}
super.fillInterested();
Expand Down
Loading

0 comments on commit 1345a6f

Please sign in to comment.