From 21576f2312a0a4e16f79841160ec3b635e4f9c8a Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 17 Sep 2020 16:43:24 +1000 Subject: [PATCH] Issue #5287 - fix usages of new CompressionPool Signed-off-by: Lachlan Roberts --- .../jetty/http/GZIPContentDecoder.java | 14 +++---- .../server/handler/gzip/GzipFactory.java | 7 +--- .../server/handler/gzip/GzipHandler.java | 8 +--- .../gzip/GzipHttpOutputInterceptor.java | 42 ++++++++++--------- .../util/compression/CompressionPool.java | 37 ++++++++++++---- .../internal/PerMessageDeflateExtension.java | 26 ++++++------ .../server/jmh/DeflaterPoolBenchmark.java | 5 ++- 7 files changed, 76 insertions(+), 63 deletions(-) diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java b/jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java index ed4f50b99426..79518f821881 100644 --- a/jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java +++ b/jetty-http/src/main/java/org/eclipse/jetty/http/GZIPContentDecoder.java @@ -42,9 +42,9 @@ public class GZIPContentDecoder implements Destroyable private static final long UINT_MAX = 0xFFFFFFFFL; private final List _inflateds = new ArrayList<>(); - private final InflaterPool _inflaterPool; private final ByteBufferPool _pool; private final int _bufferSize; + private InflaterPool.Entry _inflaterEntry; private Inflater _inflater; private State _state; private int _size; @@ -64,13 +64,12 @@ public GZIPContentDecoder(int bufferSize) public GZIPContentDecoder(ByteBufferPool pool, int bufferSize) { - this(null, pool, bufferSize); + this(new InflaterPool(0, true), pool, bufferSize); } public GZIPContentDecoder(InflaterPool inflaterPool, ByteBufferPool pool, int bufferSize) { - _inflaterPool = inflaterPool; - _inflater = (inflaterPool == null) ? new Inflater(true) : inflaterPool.acquire(); + _inflaterEntry = inflaterPool.acquire(); _bufferSize = bufferSize; _pool = pool; reset(); @@ -416,11 +415,8 @@ private void reset() @Override public void destroy() { - if (_inflaterPool == null) - _inflater.end(); - else - _inflaterPool.release(_inflater); - + _inflaterEntry.release(); + _inflaterEntry = null; _inflater = null; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipFactory.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipFactory.java index 3c935e3e5363..81bb859e1ca8 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipFactory.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipFactory.java @@ -18,15 +18,12 @@ package org.eclipse.jetty.server.handler.gzip; -import java.util.zip.Deflater; - import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.util.compression.DeflaterPool; public interface GzipFactory { - Deflater getDeflater(Request request, long contentLength); + DeflaterPool.Entry getDeflaterEntry(Request request, long contentLength); boolean isMimeTypeGzipable(String mimetype); - - void recycle(Deflater deflater); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHandler.java index 670085dc71f5..33d219655ea9 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHandler.java @@ -418,7 +418,7 @@ public void addIncludedPaths(String... pathspecs) } @Override - public Deflater getDeflater(Request request, long contentLength) + public DeflaterPool.Entry getDeflaterEntry(Request request, long contentLength) { if (contentLength >= 0 && contentLength < _minGzipSize) { @@ -730,12 +730,6 @@ protected boolean isPathGzipable(String requestURI) return _paths.test(requestURI); } - @Override - public void recycle(Deflater deflater) - { - _deflaterPool.release(deflater); - } - /** * Set the excluded filter list of HTTP methods (replacing any previously set) * diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHttpOutputInterceptor.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHttpOutputInterceptor.java index 843e90474a62..86d73d1be346 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHttpOutputInterceptor.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/gzip/GzipHttpOutputInterceptor.java @@ -36,6 +36,7 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingNestedCallback; import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.compression.DeflaterPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +64,7 @@ private enum GZState private final int _bufferSize; private final boolean _syncFlush; - private Deflater _deflater; + private DeflaterPool.Entry _deflaterEntry; private ByteBuffer _buffer; public GzipHttpOutputInterceptor(GzipFactory factory, HttpChannel channel, HttpOutput.Interceptor next, boolean syncFlush) @@ -122,7 +123,7 @@ public void write(ByteBuffer content, boolean complete, Callback callback) private void addTrailer() { BufferUtil.putIntLittleEndian(_buffer, (int)_crc.getValue()); - BufferUtil.putIntLittleEndian(_buffer, _deflater.getTotalIn()); + BufferUtil.putIntLittleEndian(_buffer, _deflaterEntry.get().getTotalIn()); } private void gzip(ByteBuffer content, boolean complete, final Callback callback) @@ -195,8 +196,8 @@ protected void commit(ByteBuffer content, boolean complete, Callback callback) if (contentLength < 0 && complete) contentLength = content.remaining(); - _deflater = _factory.getDeflater(_channel.getRequest(), contentLength); - if (_deflater == null) + _deflaterEntry = _factory.getDeflaterEntry(_channel.getRequest(), contentLength); + if (_deflaterEntry == null) { LOG.debug("{} exclude no deflater", this); _state.set(GZState.NOT_COMPRESSING); @@ -213,7 +214,7 @@ protected void commit(ByteBuffer content, boolean complete, Callback callback) if (etag != null) fields.put(HttpHeader.ETAG, etagGzip(etag)); - LOG.debug("{} compressing {}", this, _deflater); + LOG.debug("{} compressing {}", this, _deflaterEntry); _state.set(GZState.COMPRESSING); if (BufferUtil.isEmpty(content)) @@ -277,8 +278,8 @@ public GzipBufferCB(ByteBuffer content, boolean complete, Callback callback) @Override protected void onCompleteFailure(Throwable x) { - _factory.recycle(_deflater); - _deflater = null; + _deflaterEntry.release(); + _deflaterEntry = null; super.onCompleteFailure(x); } @@ -286,7 +287,7 @@ protected void onCompleteFailure(Throwable x) protected Action process() throws Exception { // If we have no deflator - if (_deflater == null) + if (_deflaterEntry == null) { // then the trailer has been generated and written below. // we have finished compressing the entire content, so @@ -318,16 +319,17 @@ protected Action process() throws Exception } // If the deflator is not finished, then compress more data - if (!_deflater.finished()) + Deflater deflater = _deflaterEntry.get(); + if (!deflater.finished()) { - if (_deflater.needsInput()) + if (deflater.needsInput()) { // if there is no more content available to compress // then we are either finished all content or just the current write. if (BufferUtil.isEmpty(_content)) { if (_last) - _deflater.finish(); + deflater.finish(); else return Action.SUCCEEDED; } @@ -356,32 +358,32 @@ protected Action process() throws Exception _crc.update(array, off, len); // Ideally we would want to use the ByteBuffer API for Deflaters. However due the the ByteBuffer implementation // of the CRC32.update() it is less efficient for us to use this rather than to convert to array ourselves. - _deflater.setInput(array, off, len); + _deflaterEntry.get().setInput(array, off, len); slice.position(slice.position() + len); if (_last && BufferUtil.isEmpty(_content)) - _deflater.finish(); + deflater.finish(); } } // deflate the content into the available space in the buffer int off = _buffer.arrayOffset() + _buffer.limit(); int len = BufferUtil.space(_buffer); - int produced = _deflater.deflate(_buffer.array(), off, len, _syncFlush ? Deflater.SYNC_FLUSH : Deflater.NO_FLUSH); + int produced = deflater.deflate(_buffer.array(), off, len, _syncFlush ? Deflater.SYNC_FLUSH : Deflater.NO_FLUSH); _buffer.limit(_buffer.limit() + produced); } // If we have finished deflation and there is room for the trailer. - if (_deflater.finished() && BufferUtil.space(_buffer) >= 8) + if (deflater.finished() && BufferUtil.space(_buffer) >= 8) { // add the trailer and recycle the deflator to flag that we will have had completeSuccess when // the write below completes. addTrailer(); - _factory.recycle(_deflater); - _deflater = null; + _deflaterEntry.release(); + _deflaterEntry = null; } // write the compressed buffer. - _interceptor.write(_buffer, _deflater == null, this); + _interceptor.write(_buffer, _deflaterEntry == null, this); return Action.SCHEDULED; } @@ -394,8 +396,8 @@ public String toString() _last, BufferUtil.toDetailString(_copy), BufferUtil.toDetailString(_buffer), - _deflater, - _deflater != null && _deflater.finished() ? "(finished)" : ""); + _deflaterEntry, + _deflaterEntry != null && _deflaterEntry.get().finished() ? "(finished)" : ""); } } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/compression/CompressionPool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/compression/CompressionPool.java index 00d8e4de858c..e7bf4ce699c7 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/compression/CompressionPool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/compression/CompressionPool.java @@ -23,9 +23,10 @@ public abstract class CompressionPool extends AbstractLifeCycle { - public static final int INFINITE_CAPACITY = -1; + public static final int INFINITE_CAPACITY = Integer.MAX_VALUE; - private final Pool _pool; + private int _capacity; + private Pool _pool; /** * Create a Pool of {@link T} instances. @@ -38,7 +39,7 @@ public abstract class CompressionPool extends AbstractLifeCycle */ public CompressionPool(int capacity) { - _pool = new Pool(capacity, 1); + _capacity = capacity; } public int getCapacity() @@ -48,6 +49,8 @@ public int getCapacity() public void setCapacity(int capacity) { + if (isStarted()) + throw new IllegalStateException("Already Started"); _capacity = capacity; } @@ -62,7 +65,10 @@ public void setCapacity(int capacity) */ public Entry acquire() { - return new Entry(_pool.acquire(e -> newObject())); + if (_pool != null) + return new Entry(_pool.acquire(e -> newObject())); + else + return new Entry(); } /** @@ -74,10 +80,20 @@ public void release(Entry entry) } @Override - public void doStop() + protected void doStart() throws Exception + { + if (_capacity > 0) + _pool = new Pool<>(Pool.StrategyType.RANDOM, _capacity, true); + super.doStart(); + } + + @Override + public void doStop() throws Exception { - // TODO: We can't use this because it will not end the entries after it removes them. - // _pool.close(); + // TODO: Pool.close() will not end the entries after it removes them. + _pool.close(); + _pool = null; + super.doStop(); } public class Entry @@ -85,6 +101,11 @@ public class Entry private T _value; private Pool.Entry _entry; + Entry() + { + this(null); + } + Entry(Pool.Entry entry) { _entry = entry; @@ -125,6 +146,6 @@ public String toString() hashCode(), getState(), _pool.size(), - _capacity < 0 ? "UNLIMITED" : _capacity); + _capacity); } } diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java index a5df5908c34d..66193b5f3450 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java @@ -27,6 +27,8 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.compression.DeflaterPool; +import org.eclipse.jetty.util.compression.InflaterPool; import org.eclipse.jetty.websocket.core.AbstractExtension; import org.eclipse.jetty.websocket.core.ExtensionConfig; import org.eclipse.jetty.websocket.core.Frame; @@ -52,8 +54,8 @@ public class PerMessageDeflateExtension extends AbstractExtension private final TransformingFlusher outgoingFlusher; private final TransformingFlusher incomingFlusher; - private Deflater deflaterImpl; - private Inflater inflaterImpl; + private DeflaterPool.Entry deflaterEntry; + private InflaterPool.Entry inflaterEntry; private boolean incomingCompressed; private ExtensionConfig configRequested; @@ -178,28 +180,28 @@ public static boolean endsWithTail(ByteBuffer buf) public Deflater getDeflater() { - if (deflaterImpl == null) - deflaterImpl = getDeflaterPool().acquire(); - return deflaterImpl; + if (deflaterEntry == null) + deflaterEntry = getDeflaterPool().acquire(); + return deflaterEntry.get(); } public Inflater getInflater() { - if (inflaterImpl == null) - inflaterImpl = getInflaterPool().acquire(); - return inflaterImpl; + if (inflaterEntry == null) + inflaterEntry = getInflaterPool().acquire(); + return inflaterEntry.get(); } public void releaseInflater() { - getInflaterPool().release(inflaterImpl); - inflaterImpl = null; + inflaterEntry.release(); + inflaterEntry = null; } public void releaseDeflater() { - getDeflaterPool().release(deflaterImpl); - deflaterImpl = null; + deflaterEntry.release(); + deflaterEntry = null; } @Override diff --git a/tests/jetty-jmh/src/main/java/org/eclipse/jetty/server/jmh/DeflaterPoolBenchmark.java b/tests/jetty-jmh/src/main/java/org/eclipse/jetty/server/jmh/DeflaterPoolBenchmark.java index 72500ca72a09..0ab725050aab 100644 --- a/tests/jetty-jmh/src/main/java/org/eclipse/jetty/server/jmh/DeflaterPoolBenchmark.java +++ b/tests/jetty-jmh/src/main/java/org/eclipse/jetty/server/jmh/DeflaterPoolBenchmark.java @@ -92,13 +92,14 @@ public static void stopTrial() throws Exception @SuppressWarnings("deprecation") public long testPool() throws Exception { - Deflater deflater = _pool.acquire(); + DeflaterPool.Entry entry = _pool.acquire(); + Deflater deflater = entry.get(); deflater.setInput(COMPRESSION_STRING.getBytes()); deflater.finish(); byte[] output = new byte[COMPRESSION_STRING.length() + 1]; int compressedDataLength = deflater.deflate(output); - _pool.release(deflater); + _pool.release(entry); return compressedDataLength; }