Skip to content

Commit

Permalink
Support chunked response cache (#10)
Browse files Browse the repository at this point in the history
/kind feature

```release-note
支持缓存分段传输响应
```
  • Loading branch information
JohnNiang authored Sep 4, 2024
1 parent 3271237 commit 4585e51
Showing 1 changed file with 57 additions and 29 deletions.
86 changes: 57 additions & 29 deletions src/main/java/run/halo/cache/page/PageCacheWebFilter.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package run.halo.cache.page;

import static java.nio.ByteBuffer.allocateDirect;
import static java.nio.ByteBuffer.allocate;
import static org.springframework.security.web.server.util.matcher.ServerWebExchangeMatcher.MatchResult;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
Expand Down Expand Up @@ -139,40 +141,66 @@ public CacheResponseDecorator(ServerWebExchange exchange, String cacheKey) {

@Override
@NonNull
public Mono<Void> writeWith(@NonNull Publisher<? extends DataBuffer> body) {
public Mono<Void> writeAndFlushWith(
@NonNull Publisher<? extends Publisher<? extends DataBuffer>> body) {
if (responseCacheable(exchange)) {
log.debug("Caching response for {}", cacheKey);
var response = getDelegate();
var builder = new CachedResponse.CachedResponseBuilder();
var headers = new HttpHeaders();
headers.addAll(getHeaders());
builder.statusCode(getStatusCode());
builder.timestamp(Instant.now());
builder.headers(headers);
var bodyCopies = new ArrayList<ByteBuffer>();
if (body instanceof Flux<? extends Publisher<? extends DataBuffer>> fluxBody) {
body = fluxBody.concatMap(content -> copyBody(content, bodyCopies))
.window(1)
.doOnComplete(putCache(builder, bodyCopies));
} else if (body instanceof Mono<? extends Publisher<? extends DataBuffer>> monoBody) {
body = monoBody.flatMapMany(content -> copyBody(content, bodyCopies))
.window(1)
.doOnComplete(putCache(builder, bodyCopies));
}
}
return super.writeAndFlushWith(body);
}

@Override
@NonNull
public Mono<Void> writeWith(@NonNull Publisher<? extends DataBuffer> body) {
if (responseCacheable(exchange)) {
log.info("Caching response for {}", cacheKey);
var builder = new CachedResponse.CachedResponseBuilder();
response.beforeCommit(
() -> Mono.fromRunnable(() -> {
var statusCode = getStatusCode();
if (statusCode != null && statusCode.is2xxSuccessful()) {
// we only cache response with 2xx status code
var headers = new HttpHeaders();
headers.addAll(getHeaders());
builder.statusCode(statusCode);
builder.timestamp(Instant.now());
builder.headers(headers);
cache.put(cacheKey, builder.build());
}
})
);
body = Flux.from(body)
.map(dataBuffer -> {
var byteBuffer = allocateDirect(dataBuffer.readableByteCount());
dataBuffer.toByteBuffer(byteBuffer);
DataBufferUtils.release(dataBuffer);
return byteBuffer.asReadOnlyBuffer();
})
.collectSortedList()
.doOnNext(builder::body)
.flatMapMany(Flux::fromIterable)
.map(byteBuffer -> response.bufferFactory().wrap(byteBuffer));
var headers = new HttpHeaders();
headers.addAll(getHeaders());
builder.statusCode(getStatusCode());
builder.timestamp(Instant.now());
builder.headers(headers);
var bodyCopies = new ArrayList<ByteBuffer>();
body = copyBody(body, bodyCopies)
.doOnComplete(putCache(builder, bodyCopies));
}
// write the response
return super.writeWith(body);
}

private Runnable putCache(CachedResponse.CachedResponseBuilder builder,
List<ByteBuffer> bodyCopies) {
return () -> {
builder.body(List.copyOf(bodyCopies));
cache.put(cacheKey, builder.build());
log.info("Cached response for {}", cacheKey);
};
}

private static Flux<? extends DataBuffer> copyBody(Publisher<? extends DataBuffer> body,
List<ByteBuffer> buffers) {
return Flux.from(body)
.doOnNext(dataBuffer -> {
var byteBuffer = allocate(dataBuffer.readableByteCount());
dataBuffer.toByteBuffer(byteBuffer);
buffers.add(byteBuffer.asReadOnlyBuffer());
});
}
}
}

0 comments on commit 4585e51

Please sign in to comment.