Skip to content

Commit

Permalink
Fixes #6250 - Lazily allocate HTTP2Stream data queue. (#6252)
Browse files Browse the repository at this point in the history
* Fixes #6250 - Lazily allocate HTTP2Stream data queue.

Now the data queue is lazily allocated.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet authored May 11, 2021
1 parent c3c51a1 commit 54e4761
Showing 1 changed file with 15 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
import java.io.IOException;
import java.nio.channels.WritePendingException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Deque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -55,7 +53,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private static final Logger LOG = LoggerFactory.getLogger(HTTP2Stream.class);

private final AutoLock lock = new AutoLock();
private final Queue<DataEntry> dataQueue = new ArrayDeque<>();
private Deque<DataEntry> dataQueue;
private final AtomicReference<Object> attachment = new AtomicReference<>();
private final AtomicReference<ConcurrentMap<String, Object>> attributes = new AtomicReference<>();
private final AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
Expand Down Expand Up @@ -235,15 +233,19 @@ public boolean isRemotelyClosed()
@Override
public boolean failAllData(Throwable x)
{
List<DataEntry> copy;
Deque<DataEntry> copy;
try (AutoLock l = lock.lock())
{
dataDemand = 0;
copy = new ArrayList<>(dataQueue);
dataQueue.clear();
copy = dataQueue;
dataQueue = null;
}
DataEntry lastDataEntry = null;
if (copy != null)
{
copy.forEach(dataEntry -> dataEntry.callback.failed(x));
lastDataEntry = copy.isEmpty() ? null : copy.peekLast();
}
copy.forEach(dataEntry -> dataEntry.callback.failed(x));
DataEntry lastDataEntry = copy.isEmpty() ? null : copy.get(copy.size() - 1);
if (lastDataEntry == null)
return isRemotelyClosed();
return lastDataEntry.frame.isEndStream();
Expand Down Expand Up @@ -413,6 +415,8 @@ private void onData(DataFrame frame, Callback callback)
DataEntry entry = new DataEntry(frame, callback);
try (AutoLock l = lock.lock())
{
if (dataQueue == null)
dataQueue = new ArrayDeque<>();
dataQueue.offer(entry);
initial = dataInitial;
if (initial)
Expand Down Expand Up @@ -454,7 +458,7 @@ public void demand(long n)
{
demand = dataDemand = MathUtils.cappedAdd(dataDemand, n);
if (!dataProcess)
dataProcess = proceed = !dataQueue.isEmpty();
dataProcess = proceed = dataQueue != null && !dataQueue.isEmpty();
}
if (LOG.isDebugEnabled())
LOG.debug("Demand {}/{}, {} data processing for {}", n, demand, proceed ? "proceeding" : "stalling", this);
Expand All @@ -469,7 +473,7 @@ private void processData()
DataEntry dataEntry;
try (AutoLock l = lock.lock())
{
if (dataQueue.isEmpty() || dataDemand == 0)
if (dataQueue == null || dataQueue.isEmpty() || dataDemand == 0)
{
if (LOG.isDebugEnabled())
LOG.debug("Stalling data processing for {}", this);
Expand Down

0 comments on commit 54e4761

Please sign in to comment.