diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 8df7f805b5ce..756f8a11bd07 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -17,9 +17,7 @@ import java.io.IOException; import java.nio.channels.WritePendingException; import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.List; -import java.util.Queue; +import java.util.Deque; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -55,7 +53,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa private static final Logger LOG = LoggerFactory.getLogger(HTTP2Stream.class); private final AutoLock lock = new AutoLock(); - private final Queue dataQueue = new ArrayDeque<>(); + private Deque dataQueue; private final AtomicReference attachment = new AtomicReference<>(); private final AtomicReference> attributes = new AtomicReference<>(); private final AtomicReference closeState = new AtomicReference<>(CloseState.NOT_CLOSED); @@ -235,15 +233,19 @@ public boolean isRemotelyClosed() @Override public boolean failAllData(Throwable x) { - List copy; + Deque copy; try (AutoLock l = lock.lock()) { dataDemand = 0; - copy = new ArrayList<>(dataQueue); - dataQueue.clear(); + copy = dataQueue; + dataQueue = null; + } + DataEntry lastDataEntry = null; + if (copy != null) + { + copy.forEach(dataEntry -> dataEntry.callback.failed(x)); + lastDataEntry = copy.isEmpty() ? null : copy.peekLast(); } - copy.forEach(dataEntry -> dataEntry.callback.failed(x)); - DataEntry lastDataEntry = copy.isEmpty() ? null : copy.get(copy.size() - 1); if (lastDataEntry == null) return isRemotelyClosed(); return lastDataEntry.frame.isEndStream(); @@ -413,6 +415,8 @@ private void onData(DataFrame frame, Callback callback) DataEntry entry = new DataEntry(frame, callback); try (AutoLock l = lock.lock()) { + if (dataQueue == null) + dataQueue = new ArrayDeque<>(); dataQueue.offer(entry); initial = dataInitial; if (initial) @@ -454,7 +458,7 @@ public void demand(long n) { demand = dataDemand = MathUtils.cappedAdd(dataDemand, n); if (!dataProcess) - dataProcess = proceed = !dataQueue.isEmpty(); + dataProcess = proceed = dataQueue != null && !dataQueue.isEmpty(); } if (LOG.isDebugEnabled()) LOG.debug("Demand {}/{}, {} data processing for {}", n, demand, proceed ? "proceeding" : "stalling", this); @@ -469,7 +473,7 @@ private void processData() DataEntry dataEntry; try (AutoLock l = lock.lock()) { - if (dataQueue.isEmpty() || dataDemand == 0) + if (dataQueue == null || dataQueue.isEmpty() || dataDemand == 0) { if (LOG.isDebugEnabled()) LOG.debug("Stalling data processing for {}", this);