From 4585e5185563e633d81ea555bf96c8e6337535e4 Mon Sep 17 00:00:00 2001 From: John Niang Date: Wed, 4 Sep 2024 15:57:43 +0800 Subject: [PATCH] Support chunked response cache (#10) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit /kind feature ```release-note 支持缓存分段传输响应 ``` --- .../halo/cache/page/PageCacheWebFilter.java | 86 ++++++++++++------- 1 file changed, 57 insertions(+), 29 deletions(-) diff --git a/src/main/java/run/halo/cache/page/PageCacheWebFilter.java b/src/main/java/run/halo/cache/page/PageCacheWebFilter.java index 9d09a1a..2ee2a15 100644 --- a/src/main/java/run/halo/cache/page/PageCacheWebFilter.java +++ b/src/main/java/run/halo/cache/page/PageCacheWebFilter.java @@ -1,16 +1,18 @@ package run.halo.cache.page; -import static java.nio.ByteBuffer.allocateDirect; +import static java.nio.ByteBuffer.allocate; import static org.springframework.security.web.server.util.matcher.ServerWebExchangeMatcher.MatchResult; +import java.nio.ByteBuffer; import java.time.Instant; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import lombok.extern.slf4j.Slf4j; import org.reactivestreams.Publisher; import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; @@ -139,40 +141,66 @@ public CacheResponseDecorator(ServerWebExchange exchange, String cacheKey) { @Override @NonNull - public Mono writeWith(@NonNull Publisher body) { + public Mono writeAndFlushWith( + @NonNull Publisher> body) { if (responseCacheable(exchange)) { log.debug("Caching response for {}", cacheKey); - var response = getDelegate(); + var builder = new CachedResponse.CachedResponseBuilder(); + var headers = new HttpHeaders(); + headers.addAll(getHeaders()); + builder.statusCode(getStatusCode()); + builder.timestamp(Instant.now()); + builder.headers(headers); + var bodyCopies = new ArrayList(); + if (body instanceof Flux> fluxBody) { + body = fluxBody.concatMap(content -> copyBody(content, bodyCopies)) + .window(1) + .doOnComplete(putCache(builder, bodyCopies)); + } else if (body instanceof Mono> monoBody) { + body = monoBody.flatMapMany(content -> copyBody(content, bodyCopies)) + .window(1) + .doOnComplete(putCache(builder, bodyCopies)); + } + } + return super.writeAndFlushWith(body); + } + @Override + @NonNull + public Mono writeWith(@NonNull Publisher body) { + if (responseCacheable(exchange)) { + log.info("Caching response for {}", cacheKey); var builder = new CachedResponse.CachedResponseBuilder(); - response.beforeCommit( - () -> Mono.fromRunnable(() -> { - var statusCode = getStatusCode(); - if (statusCode != null && statusCode.is2xxSuccessful()) { - // we only cache response with 2xx status code - var headers = new HttpHeaders(); - headers.addAll(getHeaders()); - builder.statusCode(statusCode); - builder.timestamp(Instant.now()); - builder.headers(headers); - cache.put(cacheKey, builder.build()); - } - }) - ); - body = Flux.from(body) - .map(dataBuffer -> { - var byteBuffer = allocateDirect(dataBuffer.readableByteCount()); - dataBuffer.toByteBuffer(byteBuffer); - DataBufferUtils.release(dataBuffer); - return byteBuffer.asReadOnlyBuffer(); - }) - .collectSortedList() - .doOnNext(builder::body) - .flatMapMany(Flux::fromIterable) - .map(byteBuffer -> response.bufferFactory().wrap(byteBuffer)); + var headers = new HttpHeaders(); + headers.addAll(getHeaders()); + builder.statusCode(getStatusCode()); + builder.timestamp(Instant.now()); + builder.headers(headers); + var bodyCopies = new ArrayList(); + body = copyBody(body, bodyCopies) + .doOnComplete(putCache(builder, bodyCopies)); } // write the response return super.writeWith(body); } + + private Runnable putCache(CachedResponse.CachedResponseBuilder builder, + List bodyCopies) { + return () -> { + builder.body(List.copyOf(bodyCopies)); + cache.put(cacheKey, builder.build()); + log.info("Cached response for {}", cacheKey); + }; + } + + private static Flux copyBody(Publisher body, + List buffers) { + return Flux.from(body) + .doOnNext(dataBuffer -> { + var byteBuffer = allocate(dataBuffer.readableByteCount()); + dataBuffer.toByteBuffer(byteBuffer); + buffers.add(byteBuffer.asReadOnlyBuffer()); + }); + } } }