Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebFlux non-streaming JSON array response is flushed after every value #33224

Closed
0xabadea opened this issue Jul 17, 2024 · 15 comments
Closed

WebFlux non-streaming JSON array response is flushed after every value #33224

0xabadea opened this issue Jul 17, 2024 · 15 comments
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) status: invalid An issue that we don't feel is valid

Comments

@0xabadea
Copy link

We have an endpoint like this:

@GetMapping(path = "/api/v1/stuff", produces = { APPLICATION_JSON_VALUE })
public Flux<Stuff> getStuff() {

In our use case this endpoint can return tens of thousands of values. In production we see that all responses are
flushed often, most of the time after each value is written. These values can be quite small, so flushing after each results in many small packets, which is wasteful due to TCP/IP bookkeeping and a poor use of the connection bandwidth.

This is not a streaming use case. The way the code is written implies that flushing after each value is not intentional: ReactorServerHttpResponse.writeWithInternal is called, as opposed to writeAndFlushWithInternal.

I was not able to reproduce the behavior by simply returning the values from memory. I could only reproduce when streaming data from the database, in our case PostgreSQL. I'm attaching a simple demo project, which on my machine reproduces the flushing consistently. Start the application and run

curl http://localhost:8080/api/v1/stuff

demo-flush.zip

Spring Boot version: 3.3.1

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged or decided on label Jul 17, 2024
@snicoll snicoll transferred this issue from spring-projects/spring-boot Jul 17, 2024
@snicoll snicoll added the in: web Issues in web modules (web, webmvc, webflux, websocket) label Jul 17, 2024
@sdeleuze
Copy link
Contributor

sdeleuze commented Jul 17, 2024

Thanks for the repro, in your use case ReactorServerHttpResponse.writeWithInternal is indeed invoked as expected which invokes NettyOutbound#send with a flushing predicate set to false.

Could you please share more on how you identify that flushing happens after each element returned by a Flux, is it by debugging at Netty level, network level, etc?

@sdeleuze sdeleuze added the status: waiting-for-feedback We need additional information before we can continue label Jul 17, 2024
@0xabadea
Copy link
Author

By looking at the network traffic. Attaching two capture files:

  • flush-spring-3.3.1.pcapng.gz shows the behavior in Spring Boot 3.3.1 -- many small packets.
  • flush-spring-2.7.3.pcapng.gz shows the behavior in Spring Boot 2.7.3 -- packets ~32K in size. I'm adding this just to show what I consider to be the right behavior. The implementation in 2.7.3 was different: it collected the flux into a list and Jackson-encoded the whole list.

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Jul 17, 2024
@sdeleuze
Copy link
Contributor

sdeleuze commented Jul 17, 2024

@violetagg @rstoyanchev Is it expected that Reactor Netty flushes on every Flux element for the following use case?

ReactorServerHttpResponse.writeWithInternal is indeed invoked as expected which invokes NettyOutbound#send with a flushing predicate set to false.

@sdeleuze sdeleuze added status: waiting-for-internal-feedback An issue that needs input from a member or another Spring Team and removed status: feedback-provided Feedback has been provided labels Jul 17, 2024
@violetagg
Copy link
Member

@0xabadea Please enable wiretap and provide the logs (see how to do it below). We do try to combine the messsages and to flush on batches but it depends from the Publisher. We work with prefetch value 128, which means we will try to request 128 items and to flush them with one batch. However if your Publisher produces only 1 by 1, we cannot do anything. But let's see what we will see from wiretap.

@Component
public class MyNettyWebServerCustomizer
        implements WebServerFactoryCustomizer<NettyReactiveWebServerFactory> {
    @Override
    public void customize(NettyReactiveWebServerFactory factory) {
        factory.addServerCustomizers(httpServer -> httpServer.wiretap(true));
    }
}

@bclozel bclozel added status: waiting-for-feedback We need additional information before we can continue and removed status: waiting-for-internal-feedback An issue that needs input from a member or another Spring Team labels Jul 18, 2024
@0xabadea
Copy link
Author

Attaching the wiretap logs.

@violetagg, could you please advice? How do you suggest I should change the publisher to publish in a more effective manner than 1 by 1? Eventually, onNext is still going to be called for each individual value, so my understanding is that MonoSendManyshould buffer several values before flushing.

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Jul 19, 2024
@0xabadea
Copy link
Author

wiretap-logs.txt.gz

@rstoyanchev
Copy link
Contributor

This change was made in #28398. In your case with lots of small items, it makes sense to aggregate, but in other cases it is better to stream. After the change we don't aggregate by default, but you can still choose to aggregate by using flux.collectList() and returning Mono<List<Stuff>>.

@violetagg
Copy link
Member

@0xabadea In the provided logs, I can see, that sometimes we flush many writes, sometime only one, and it is exactly how the implementation in Reactor Netty is done (async flushes that flush whatever was collected in the outbound buffer)
If this is not enough, then either apply the @rstoyanchev recommendation or consider to add to the Reactor Netty pipeline https://netty.io/4.1/api/io/netty/handler/flush/FlushConsolidationHandler.html
We cannot add this handler by default because it requires configuration that only the owner of the solution can decide.

@0xabadea
Copy link
Author

Thank you for the pointer to FlushConsolidationHandler. I gave it a try, but it made no difference. This is how I'm adding it to the pipeline:

@Component
public class MyNettyWebServerCustomizer implements WebServerFactoryCustomizer<NettyReactiveWebServerFactory> {

    @Override
    public void customize(NettyReactiveWebServerFactory factory) {
        factory.addServerCustomizers(httpServer ->
                httpServer.doOnConnection(conn -> conn.addHandlerFirst(new FlushConsolidationHandler(500, true)))
        );
    }
}

I tried both true and false for the consolidateWhenNoReadInProgress parameter, but that didn't make a difference either.

@violetagg
Copy link
Member

@0xabadea I assume that you are using HTTP/1.1 (if that's not the case please tell me). Try the following:
httpServer.doOnConnection(conn -> conn.channel().pipeline().addBefore(NettyPipeline.HttpCodec, "some-name", new FlushConsolidationHandler(500, true)))

@0xabadea
Copy link
Author

@violetagg Indeed, at least the reproducer uses HTTP/1.1. I have followed your yesterday's suggestion, but unfortunately I got the same behavior of many small flushes.

@violetagg
Copy link
Member

@0xabadea Then I will need some example project to play with it.

@0xabadea
Copy link
Author

An example project is attached to the original description. The only change I made to it is adding

@Component
public class MyNettyWebServerCustomizer implements WebServerFactoryCustomizer<NettyReactiveWebServerFactory> {

    @Override
    public void customize(NettyReactiveWebServerFactory factory) {
        factory.addServerCustomizers(httpServer ->
                httpServer.doOnConnection(conn -> conn.channel().pipeline().addBefore(NettyPipeline.HttpCodec, "some-name", new FlushConsolidationHandler(500, true)))
        );
    }
}

@violetagg
Copy link
Member

violetagg commented Jul 25, 2024

@0xabadea Yeah you are right - in your case FlushConsolidationHandler uses the same strategy as Reactor Netty (scheduled flushes), but you can take a look at it and change it to some custom FlushConsolidationHandler.

@snicoll
Copy link
Member

snicoll commented Jul 30, 2024

I am going to close this now as this is out of scope of this project. Thanks @violetagg for following up.

@snicoll snicoll closed this as not planned Won't fix, can't repro, duplicate, stale Jul 30, 2024
@snicoll snicoll removed status: waiting-for-triage An issue we've not yet triaged or decided on status: feedback-provided Feedback has been provided labels Jul 30, 2024
@snicoll snicoll added the status: invalid An issue that we don't feel is valid label Jul 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) status: invalid An issue that we don't feel is valid
Projects
None yet
Development

No branches or pull requests

7 participants