Skip to content

Commit

Permalink
Issue #12266 - InvocationType improvements and cleanups.
Browse files Browse the repository at this point in the history
This is the work for the server-side only, the client side will be done in another pull request.

Previously, AbstractConnection.getInvocationType() was called by AbstractConnection.ReadCallback, but it was deprecated and is now removed, along with all its overrides.

This mechanism is now replaced by using a specific Callback implementation for each AbstractConnection subclass.
For example, HttpConnection uses HttpConnection.FillableCallback that in turn asks the InvocationType to the Server, and therefore the `Handler` tree.

Restored synchronous code for ServerFCGIConnection.close(), ensuring super.close() is always called.
Ensuring that in HttpConnection.close(), super.close() is always called.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Nov 19, 2024
1 parent b115f12 commit 8e7ce53
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,6 @@ public void failed(Throwable x)
promise.failed(x);
}

@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}

@Override
public void onFillable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
Expand All @@ -43,6 +44,7 @@ public class ServerFCGIConnection extends AbstractMetaDataConnection implements
{
private static final Logger LOG = LoggerFactory.getLogger(ServerFCGIConnection.class);

private final Callback fillableCallback = new FillableCallback();
private final HttpChannel.Factory httpChannelFactory = new HttpChannel.DefaultFactory();
private final Attributes attributes = new Lazy();
private final Connector connector;
Expand Down Expand Up @@ -161,7 +163,7 @@ public void clearAttributes()
public void onOpen()
{
super.onOpen();
fillInterested();
fillInterested(fillableCallback);
}

@Override
Expand Down Expand Up @@ -189,7 +191,7 @@ public void onFillable()
else if (read == 0)
{
releaseInputBuffer();
fillInterested();
fillInterested(fillableCallback);
return;
}
else
Expand Down Expand Up @@ -305,7 +307,7 @@ void onCompleted(Throwable failure)
{
releaseInputBuffer();
if (failure == null)
fillInterested();
fillInterested(fillableCallback);
else
getFlusher().shutdown();
}
Expand Down Expand Up @@ -407,25 +409,46 @@ public void onFailure(int request, Throwable failure)
@Override
public void close()
{
if (stream != null)
try
{
Runnable task = stream.getHttpChannel().onClose();
if (task != null)
if (stream != null)
{
ThreadPool.executeImmediately(getExecutor(), () ->
{
try
{
task.run();
}
finally
{
super.close();
}
});
return;
Runnable task = stream.getHttpChannel().onClose();
if (task != null)
task.run();
}
}
super.close();
finally
{
super.close();
}
}

private class FillableCallback implements Callback
{
private final InvocationType invocationType;

public FillableCallback()
{
invocationType = getConnector().getServer().getInvocationType();
}

@Override
public void succeeded()
{
onFillable();
}

@Override
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}

@Override
public InvocationType getInvocationType()
{
return invocationType;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,6 @@ public void failed(Throwable x)
close();
promise.failed(x);
}

@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
}

private static class ConnectionListener implements Connection.Listener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ else if (filled == 0)
}
networkBuffer = null;
if (interested)
getEndPoint().fillInterested(fillableCallback);
fillInterested(fillableCallback);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,6 @@ protected AbstractConnection(EndPoint endPoint, Executor executor)
_readCallback = new ReadCallback();
}

@Deprecated
@Override
public InvocationType getInvocationType()
{
// TODO consider removing the #fillInterested method from the connection and only use #fillInterestedCallback
// so a connection need not be Invocable
return Invocable.super.getInvocationType();
}

@Override
public void addEventListener(EventListener listener)
{
Expand Down Expand Up @@ -90,25 +81,27 @@ protected Executor getExecutor()
}

/**
* <p>Utility method to be called to register read interest.</p>
* <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
* will be called back as appropriate.</p>
* <p>Registers read interest using the default {@link Callback} with {@link Invocable.InvocationType#BLOCKING}.</p>
* <p>When read readiness is signaled, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
* will be invoked.</p>
* <p>This method should be used sparingly, mainly from {@link #onOpen()}, and {@link #fillInterested(Callback)}
* should be preferred instead, passing a {@link Callback} that specifies the {@link Invocable.InvocationType}
* for each specific case where read interest needs to be registered.</p>
*
* @see #fillInterested(Callback)
* @see #onFillable()
* @see #onFillInterestedFailed(Throwable)
*/
public void fillInterested()
{
if (LOG.isDebugEnabled())
LOG.debug("fillInterested {}", this);
getEndPoint().fillInterested(_readCallback);
fillInterested(_readCallback);
}

/**
* <p>Utility method to be called to register read interest.</p>
* <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
* will be called back as appropriate.</p>
* <p>Registers read interest with the given callback.</p>
* <p>When read readiness is signaled, the callback will be completed.</p>
*
* @see #onFillable()
* @param callback the callback to complete when read readiness is signaled
*/
public void fillInterested(Callback callback)
{
Expand All @@ -130,7 +123,7 @@ public boolean isFillInterested()
/**
* <p>Callback method invoked when the endpoint is ready to be read.</p>
*
* @see #fillInterested()
* @see #fillInterested(Callback)
*/
public abstract void onFillable();

Expand All @@ -139,7 +132,7 @@ public boolean isFillInterested()
*
* @param cause the exception that caused the failure
*/
protected void onFillInterestedFailed(Throwable cause)
public void onFillInterestedFailed(Throwable cause)
{
if (LOG.isDebugEnabled())
LOG.debug("onFillInterestedFailed {}", this, cause);
Expand Down Expand Up @@ -286,7 +279,12 @@ public String toConnectionString()
return String.format("%s@%x", getClass().getSimpleName(), hashCode());
}

private class ReadCallback implements Callback, Invocable
/**
* <p>The default {@link Callback} for read interest, typically used from {@link #onOpen()}.</p>
* <p>In other cases, use {@link #fillInterested(Callback)} with a {@link Callback} that
* reports a more specific {@link Invocable.InvocationType}.</p>
*/
private class ReadCallback implements Callback
{
@Override
public void succeeded()
Expand All @@ -305,11 +303,5 @@ public String toString()
{
return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), AbstractConnection.this);
}

@Override
public InvocationType getInvocationType()
{
return AbstractConnection.this.getInvocationType();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
private static final AtomicLong __connectionIdGenerator = new AtomicLong();

private final Callback _fillableCallback = new FillableCallback();
private final TunnelSupport _tunnelSupport = new TunnelSupportOverHTTP1();
private final AtomicLong _streamIdGenerator = new AtomicLong();
private final long _id;
Expand Down Expand Up @@ -146,12 +147,6 @@ public HttpConnection(HttpConfiguration configuration, Connector connector, EndP
LOG.debug("New HTTP Connection {}", this);
}

@Override
public InvocationType getInvocationType()
{
return getServer().getInvocationType();
}

protected HttpGenerator newHttpGenerator()
{
HttpGenerator generator = new HttpGenerator();
Expand Down Expand Up @@ -451,7 +446,7 @@ else if (filled == 0)
{
assert isRequestBufferEmpty();
releaseRequestBuffer();
fillInterested();
fillInterested(_fillableCallback);
break;
}
else if (filled < 0)
Expand Down Expand Up @@ -623,7 +618,7 @@ private boolean upgrade(HttpStreamOverHTTP1 stream)
}

@Override
protected void onFillInterestedFailed(Throwable cause)
public void onFillInterestedFailed(Throwable cause)
{
_parser.close();
super.onFillInterestedFailed(cause);
Expand All @@ -641,18 +636,24 @@ public boolean onIdleExpired(TimeoutException timeout)
@Override
public void close()
{
Runnable task = _httpChannel.onClose();
if (task != null)
task.run();
super.close();
try
{
Runnable task = _httpChannel.onClose();
if (task != null)
task.run();
}
finally
{
super.close();
}
}

@Override
public void onOpen()
{
super.onOpen();
if (isRequestBufferEmpty())
fillInterested();
fillInterested(_fillableCallback);
else
getExecutor().execute(this);
}
Expand Down Expand Up @@ -1678,4 +1679,32 @@ public String getReason()
return getMessage();
}
}

private class FillableCallback implements Callback
{
private final InvocationType _invocationType;

public FillableCallback()
{
_invocationType = getServer().getInvocationType();
}

@Override
public void succeeded()
{
onFillable();
}

@Override
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}

@Override
public InvocationType getInvocationType()
{
return _invocationType;
}
}
}
Loading

0 comments on commit 8e7ce53

Please sign in to comment.