Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore thread starvation tests #12395

Open
wants to merge 28 commits into
base: jetty-12.0.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b8887fb
#12214 restore ee9 and ee10 thread starvation tests
lorban Oct 8, 2024
21fbc35
#12214 implement todos
lorban Oct 16, 2024
4b557e4
#12214 add read starvation test to ee* envs
lorban Oct 16, 2024
e9dc2eb
Fixed EE10 read starvation
gregw Oct 17, 2024
f95a3e4
#12214 fix Content.Source.asString() so its unblocking task is NON_BL…
lorban Oct 17, 2024
b61d6ad
#12214 fix ee9 testReadStarvation
lorban Oct 17, 2024
5df96ea
fix checkstyle
lorban Oct 17, 2024
2ff0e6e
Use Invocable.InvocableCompletableFuture for ContentSourceCompletable…
gregw Oct 18, 2024
24c180a
more demand invocables
gregw Oct 18, 2024
15a59ca
fix checkstyle
lorban Oct 18, 2024
d77ac47
cleanup ee9 fix
lorban Oct 18, 2024
0169d15
cleanup ee10 fix
lorban Oct 18, 2024
d269496
cleanup core fix
lorban Oct 18, 2024
8031d7c
remove redundant test
lorban Oct 18, 2024
f95b9cf
use inner classes for invocable tasks
lorban Oct 18, 2024
a0f1e39
use inner classes for invocable tasks
lorban Oct 18, 2024
bbda85d
review usages of CS.demand() w.r.t InvocationType
lorban Oct 18, 2024
9a0b4ba
Make the InvocableCompletableFuture non-dynamic
gregw Oct 21, 2024
49b592b
Merge remote-tracking branch 'origin/jetty-12.0.x' into fix/12.0.x/12…
gregw Oct 22, 2024
18ab56d
Fragile linking of InvocableCompletableFutures
gregw Oct 22, 2024
b1c8252
javadoc
gregw Oct 22, 2024
6b08435
Deprecated the CF APIs and replaced with explicit getXxx onXxx methods
gregw Oct 22, 2024
d9a54de
Deprecated the CF APIs and replaced with explicit getXxx onXxx methods
gregw Oct 22, 2024
f15e298
removed more CF code
gregw Oct 23, 2024
33b741f
replace FutureCallback and FuturePromise usages with Blocker.* instead
lorban Oct 23, 2024
d49d312
add missing getInvocationType() returning NON_BLOCKING
lorban Oct 23, 2024
bf7127b
Converted new API on FormFields to use Promise
gregw Oct 23, 2024
2875eeb
Converted new API on Multipart to use Promise
gregw Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ interface AsyncContentListener extends ContentSourceListener
default void onContentSource(Response response, Content.Source contentSource)
{
Content.Chunk chunk = contentSource.read();
// demandCallback eventually calls onContent() which calls end-user code, so its InvocationType must be BLOCKING.
Runnable demandCallback = () -> onContentSource(response, contentSource);
if (chunk == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -543,7 +544,7 @@ protected Action process() throws Throwable
// No content after the headers, demand.
demanded = true;
assert content != null;
content.demand(this::succeeded);
content.demand(Invocable.from(getInvocationType(), this::succeeded));
return Action.SCHEDULED;
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.jetty.io.content.ByteBufferContentSource;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -231,7 +232,7 @@ private static void consume(Content.Source contentSource)
if (chunk != null)
chunk.release();
if (chunk == null || !chunk.isLast())
contentSource.demand(() -> consume(contentSource));
contentSource.demand(Invocable.from(Invocable.InvocationType.NON_BLOCKING, () -> consume(contentSource)));
}

private static void notifyContentSource(Response.ContentSourceListener listener, Response response, Content.Source contentSource)
Expand Down Expand Up @@ -490,7 +491,7 @@ private void onDemandCallback()
{
// Retry the demand on spurious wakeup to avoid passing
// a null chunk to the demultiplexer's ContentSources.
originalContentSource.demand(this::onDemandCallback);
originalContentSource.demand(Invocable.from(deriveOriginalContentSourcesInvocationType(), this::onDemandCallback));
return;
}
// Demultiplexer content sources are invoked sequentially to be consistent with other listeners,
Expand All @@ -502,6 +503,20 @@ private void onDemandCallback()
chunk.release();
}

private Invocable.InvocationType deriveOriginalContentSourcesInvocationType()
{
Invocable.InvocationType invocationType = null;
for (ContentSource contentSource : contentSources)
{
Invocable.InvocationType demandCallbackInvocationType = contentSource.getDemandCallbackInvocationType();
if (invocationType == null)
invocationType = demandCallbackInvocationType;
else
invocationType = Invocable.combine(invocationType, demandCallbackInvocationType);
}
return invocationType == null ? Invocable.InvocationType.NON_BLOCKING : invocationType;
}

private void registerFailure(ContentSource contentSource, Throwable failure)
{
boolean processFail = false;
Expand All @@ -524,7 +539,7 @@ else if (counters.total() == listeners.size())
if (processFail)
originalContentSource.fail(failure);
else if (processDemand)
originalContentSource.demand(this::onDemandCallback);
originalContentSource.demand(Invocable.from(deriveOriginalContentSourcesInvocationType(), this::onDemandCallback));

if (LOG.isDebugEnabled())
LOG.debug("Registered failure on {}; {}", contentSource, counters);
Expand All @@ -547,7 +562,7 @@ private void registerDemand(ContentSource contentSource)
}
}
if (processDemand)
originalContentSource.demand(this::onDemandCallback);
originalContentSource.demand(Invocable.from(deriveOriginalContentSourcesInvocationType(), this::onDemandCallback));

if (LOG.isDebugEnabled())
LOG.debug("Registered demand on {}; {}", contentSource, counters);
Expand Down Expand Up @@ -641,6 +656,11 @@ private void onDemandCallback()
}
}

private Invocable.InvocationType getDemandCallbackInvocationType()
{
return Invocable.getInvocationType(demandCallbackRef.get());
}

@Override
public Content.Chunk read()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.SerializedInvoker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -823,14 +824,8 @@ public void demand(Runnable demandCallback)
}
if (part != null)
{
part.getContentSource().demand(() ->
{
try (AutoLock ignoredAgain = lock.lock())
{
this.demand = null;
}
demandCallback.run();
});
// Inner class used instead of lambda for clarity in stack traces.
part.getContentSource().demand(new DemandInvocableTask(demandCallback));
}
else if (invoke)
{
Expand Down Expand Up @@ -887,6 +882,32 @@ private enum State
{
FIRST, MIDDLE, HEADERS, CONTENT, COMPLETE
}

private class DemandInvocableTask implements Invocable.Task
{
private final Runnable demandCallback;

private DemandInvocableTask(Runnable demandCallback)
{
this.demandCallback = demandCallback;
}

@Override
public void run()
{
try (AutoLock ignoredAgain = lock.lock())
{
AbstractContentSource.this.demand = null;
}
demandCallback.run();
}

@Override
public InvocationType getInvocationType()
{
return Invocable.getInvocationType(demandCallback);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.content.ContentSourceCompletableFuture;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Invocable.InvocationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -78,6 +82,82 @@ private MultiPartFormData()
{
}

/**
* Get {@code multipart/form-data} {@link Parts} from an {@link Attributes}, typically
* cached there by calls to {@link #getParts(Content.Source, Attributes, String, MultiPartConfig)}
* or {@link #onParts(Content.Source, Attributes, String, MultiPartConfig, Promise, Invocable.InvocablePromise)}
*
* @param attributes the attributes where the futureParts are cahced
* @return the parts or null
*/
public static Parts getParts(Attributes attributes)
{
Object attribute = attributes.getAttribute(MultiPartFormData.class.getName());
if (attribute instanceof Parts parts)
return parts;
if (attribute instanceof CompletableFuture<?> futureParts && futureParts.isDone())
return (Parts)futureParts.join();
return null;
}

/**
* Get {@code multipart/form-data} {@link Parts} from a {@link Content.Source}, caching the results in an
* {@link Attributes}. If not already available, the {@code Parts} are read and parsed, blocking if necessary.
* <p>
* Calls to {@code onParts} and {@code getParts} methods are idempotent, and
* can be called multiple times, with subsequent calls returning the results of the first call.
* @param content the source of the multipart content.
* @param attributes the attributes where the Parts are cached.
* @param contentType the value of the {@link HttpHeader#CONTENT_TYPE} header.
* @param config the multipart configuration.
* @return the parts
*/
public static MultiPartFormData.Parts getParts(Content.Source content, Attributes attributes, String contentType, MultiPartConfig config)
{
return from(content, InvocationType.NON_BLOCKING, attributes, contentType, config).join();
}

/**
* Asynchronously get {@code multipart/form-data} {@link Parts} from a {@link Content.Source}, caching the results in an
* {@link Attributes}. If not already available, the {@code Parts} are read and parsed.
* <p>
* Calls to {@code onParts} and {@code getParts} methods are idempotent, and
* can be called multiple times, with subsequent calls returning the results of the first call.
* @param content the source of the multipart content.
* @param attributes the attributes where the futureParts are tracked.
* @param contentType the value of the {@link HttpHeader#CONTENT_TYPE} header.
* @param config the multipart configuration.
* @param immediate The action to take if the FormFields are available immediately (from within the scope of the call to this method).
* @param future The action to take when the FormFields are available, if they are not available immediately. The {@link org.eclipse.jetty.util.thread.Invocable.InvocationType}
* of this parameter will be used as the type for any implementation calls to {@link Content.Source#demand(Runnable)}.
*/
public static void onParts(Content.Source content, Attributes attributes, String contentType, MultiPartConfig config, Promise<Parts> immediate, Invocable.InvocablePromise<Parts> future)
{
CompletableFuture<Parts> futureParts = from(content, future.getInvocationType(), attributes, contentType, config);

if (futureParts.isDone())
{
Parts parts = null;
Throwable error = null;
try
{
parts = futureParts.get();
}
catch (Throwable t)
{
error = t;
}
if (error != null)
immediate.failed(error);
else
immediate.succeeded(parts);
}
else
{
futureParts.whenComplete(future);
}
}

/**
* Returns {@code multipart/form-data} parts using the given {@link Content.Source} and {@link MultiPartConfig}.
*
Expand All @@ -86,8 +166,28 @@ private MultiPartFormData()
* @param contentType the value of the {@link HttpHeader#CONTENT_TYPE} header.
* @param config the multipart configuration.
* @return the future parts
* @deprecated use {@link #getParts(Content.Source, Attributes, String, MultiPartConfig)}
* and/or {@link #onParts(Content.Source, Attributes, String, MultiPartConfig, Promise, Invocable.InvocablePromise)}
*/
@Deprecated(forRemoval = true, since = "12.0.15")
public static CompletableFuture<MultiPartFormData.Parts> from(Content.Source content, Attributes attributes, String contentType, MultiPartConfig config)
{
return from(content, InvocationType.NON_BLOCKING, attributes, contentType, config);
}

/**
* Returns {@code multipart/form-data} parts using the given {@link Content.Source} and {@link MultiPartConfig}.
*
* @param content the source of the multipart content.
* @param attributes the attributes where the futureParts are tracked.
* @param contentType the value of the {@link HttpHeader#CONTENT_TYPE} header.
* @param config the multipart configuration.
* @return the future parts
* @deprecated use {@link #getParts(Content.Source, Attributes, String, MultiPartConfig)}
* and/or {@link #onParts(Content.Source, Attributes, String, MultiPartConfig, Promise, Invocable.InvocablePromise)}
*/
@Deprecated(forRemoval = true, since = "12.0.15")
private static CompletableFuture<MultiPartFormData.Parts> from(Content.Source content, InvocationType invocationType, Attributes attributes, String contentType, MultiPartConfig config)
{
// Look for an existing future (we use the future here rather than the parts as it can remember any failure).
CompletableFuture<MultiPartFormData.Parts> futureParts = MultiPartFormData.get(attributes);
Expand All @@ -106,7 +206,7 @@ public static CompletableFuture<MultiPartFormData.Parts> from(Content.Source con

Parser parser = new Parser(boundary);
parser.configure(config);
futureParts = parser.parse(content);
futureParts = parser.parse(content, invocationType);
attributes.setAttribute(MultiPartFormData.class.getName(), futureParts);
return futureParts;
}
Expand All @@ -115,9 +215,10 @@ public static CompletableFuture<MultiPartFormData.Parts> from(Content.Source con

/**
* Returns {@code multipart/form-data} parts using {@link MultiPartCompliance#RFC7578}.
* @deprecated use {@link #from(Content.Source, Attributes, String, MultiPartConfig)}.
* @deprecated use {@link #getParts(Content.Source, Attributes, String, MultiPartConfig)}
* and/or {@link #onParts(Content.Source, Attributes, String, MultiPartConfig, Promise, Invocable.InvocablePromise)}
*/
@Deprecated
@Deprecated(forRemoval = true, since = "12.0.15")
public static CompletableFuture<Parts> from(Attributes attributes, String boundary, Function<Parser, CompletableFuture<Parts>> parse)
{
return from(attributes, MultiPartCompliance.RFC7578, ComplianceViolation.Listener.NOOP, boundary, parse);
Expand All @@ -132,9 +233,10 @@ public static CompletableFuture<Parts> from(Attributes attributes, String bounda
* @param boundary the boundary for the {@code multipart/form-data} parts
* @param parse the parser completable future
* @return the future parts
* @deprecated use {@link #from(Content.Source, Attributes, String, MultiPartConfig)}.
* @deprecated use {@link #getParts(Content.Source, Attributes, String, MultiPartConfig)}
* and/or {@link #onParts(Content.Source, Attributes, String, MultiPartConfig, Promise, Invocable.InvocablePromise)}
*/
@Deprecated
@Deprecated(forRemoval = true, since = "12.0.15")
public static CompletableFuture<Parts> from(Attributes attributes, MultiPartCompliance compliance, ComplianceViolation.Listener listener, String boundary, Function<Parser, CompletableFuture<Parts>> parse)
{
CompletableFuture<Parts> futureParts = get(attributes);
Expand All @@ -153,9 +255,15 @@ public static CompletableFuture<Parts> from(Attributes attributes, MultiPartComp
* @return the future parts
*/
@SuppressWarnings("unchecked")
@Deprecated(forRemoval = true, since = "12.0.15")
public static CompletableFuture<Parts> get(Attributes attributes)
{
return (CompletableFuture<Parts>)attributes.getAttribute(MultiPartFormData.class.getName());
Object value = attributes.getAttribute(MultiPartFormData.class.getName());
if (value instanceof CompletableFuture<?> cfp)
return (CompletableFuture<Parts>)cfp;
if (value instanceof Parts parts)
return CompletableFuture.completedFuture(parts);
return null;
}

/**
Expand Down Expand Up @@ -295,9 +403,47 @@ public Parser(String boundary, MultiPartCompliance multiPartCompliance, Complian
parser = new MultiPart.Parser(Objects.requireNonNull(boundary), compliance, listener);
}

public void parse(Content.Source content, Promise<Parts> immediate, Invocable.InvocablePromise<Parts> future)
{
// TODO implement without CF
CompletableFuture<Parts> cf = parse(content, future.getInvocationType());
if (cf.isDone())
{
Parts parts = null;
Throwable failure = null;
try
{
parts = cf.get();
}
catch (ExecutionException e)
{
failure = e.getCause();
}
catch (Throwable t)
{
failure = t;
}
if (failure == null)
immediate.succeeded(parts);
else
immediate.failed(failure);
}
else
{
cf.whenComplete(future);
}
}

@Deprecated(forRemoval = true, since = "12.0.15")
public CompletableFuture<Parts> parse(Content.Source content)
{
ContentSourceCompletableFuture<Parts> futureParts = new ContentSourceCompletableFuture<>(content)
return parse(content, InvocationType.NON_BLOCKING);
}

@Deprecated(forRemoval = true, since = "12.0.15")
public CompletableFuture<Parts> parse(Content.Source content, InvocationType invocationType)
{
ContentSourceCompletableFuture<Parts> futureParts = new ContentSourceCompletableFuture<>(content, invocationType)
{
@Override
protected Parts parse(Content.Chunk chunk) throws Throwable
Expand Down
Loading