Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid collectList when sending a Flux of objects as JSON using WebClient #28398

Closed
micopiira opened this issue Apr 29, 2022 · 8 comments
Closed
Assignees
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) type: enhancement A general enhancement
Milestone

Comments

@micopiira
Copy link
Contributor

micopiira commented Apr 29, 2022

Would it be possible to send a flux of objects in the request body using WebClient with "Content-Type: application/json" but avoid having to collect the whole flux as a list in memory?

Let's say I have a Flux containing millions of elements and my target server only accepts non-streaming content-types, the Jackson2JsonEncoder would then call "collectList()" on the flux, possibly running out of memory.

Couldn't the Jackson2JsonEncoder somehow write the objects as they come available?

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged or decided on label Apr 29, 2022
@poutsma
Copy link
Contributor

poutsma commented May 9, 2022

It is possible, but not with application/json as the Content-Type; it needs to be application/x-ndjson instead.

There is significant overhead for writing JSON content individually instead of collectively. That is why the Jackson2JsonEncoder collects to a list by default, and serializes that. If you specify a streaming mime-type, set to application/x-ndjson by default–but changeable via the streamingMediaTypes property, then the Jackson2JsonEncoder will not collect the list but stream them as they arrive, with a newline in between the elements.

Does that answer your question?

@poutsma poutsma added the status: waiting-for-feedback We need additional information before we can continue label May 9, 2022
@micopiira
Copy link
Contributor Author

Hey, I'm trying to POST to a server that only accepts application/json . With something more low level than WebClient, like reactor-netty I could do something like this:

    Flux<String> flux = Flux.range(0, 1000000).map(Object::toString);
    final Mono<HttpClientResponse> response = httpClient.headers(headers -> {
                headers.set(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON.toString());
                headers.set(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON.toString());
            }).post()
            .uri("/url")
            .send(ByteBufFlux.fromString(flux
                    .map(o -> {
                        try {
                            return objectMapper.writeValueAsString(o);
                        } catch (JsonProcessingException e) {
                            throw new RuntimeException(e);
                        }
                    }).zipWith(Flux.just(",").repeat(), (a, b) -> a + b).startWith("[").concatWithValues("]")))
            .response();

Which would then, atleast for what I know stream the objects to the target as they come available without having to wait for all of them.

Would this kind of behaviour be possible to implement in WebClient or should I stick with more low level clients like reactor-netty?

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels May 9, 2022
@poutsma
Copy link
Contributor

poutsma commented May 10, 2022

I'm trying to POST to a server that only accepts application/json

You can change the streamingMediaTypes property of Jackson2JsonEncoder from the default to application/json, and trigger the streaming behavior that way. The reference docs explains how to change codec defaults.

@poutsma poutsma added status: waiting-for-feedback We need additional information before we can continue and removed status: feedback-provided Feedback has been provided labels May 10, 2022
@micopiira
Copy link
Contributor Author

I see, however it does not produce valid JSON "out of the box". Heres what I tried:

private final WebClient webClient = WebClient.builder()
   .baseUrl("http://localhost:8080/")
   .codecs(clientCodecConfigurer -> {
     final Jackson2JsonEncoder jackson2JsonEncoder = new Jackson2JsonEncoder();
     jackson2JsonEncoder.setStreamingMediaTypes(List.of(MediaType.APPLICATION_JSON));
     clientCodecConfigurer.defaultCodecs().jackson2JsonEncoder(jackson2JsonEncoder);
    })
   .build();

    Flux<MyEntity> flux = ...;
    webClient.post()
               .uri("/")
               .contentType(MediaType.APPLICATION_JSON)
               .accept(MediaType.APPLICATION_JSON)
               .body(flux, MyEntity.class)
               .retrieve()
               .bodyToMono(MyResponse.class)

This will not wrap the JSON in an array nor add commas between the items. Should I manually map my Flux<MyEntity> into a Flux<String> with the wrapping [ & ] and commas in between the items?

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels May 11, 2022
@rstoyanchev
Copy link
Contributor

rstoyanchev commented May 17, 2022

Looking at the Javadoc of setStreamingMediaTypes it's mainly about flushing per item (for event streams that emit periodically) vs a single flush at the end (for continuous streams).

Maybe, if we are in the streaming section and the media type is "application/json" (i.e. explicitly set as a streaming media type), we could simply add the opening and closing square brackets to ensure valid JSON is produced.

We could also switch to flushing at some regularity > 1 (or just leave it to the underlying server buffer) since we know it's a media type that implies continues writing and shouldn't require explicit flushes. In which case I'm even wondering about removing the non-streaming section entirely, and doing this by default, so that we always write Flux items as they come with flushing as the only difference between streaming and non-streaming media types.

@rstoyanchev rstoyanchev added this to the Triage Queue milestone May 17, 2022
@rstoyanchev rstoyanchev added the in: web Issues in web modules (web, webmvc, webflux, websocket) label May 17, 2022
@rstoyanchev rstoyanchev modified the milestones: Triage Queue, 6.0.0-M5 May 17, 2022
@rstoyanchev rstoyanchev added type: enhancement A general enhancement and removed status: waiting-for-triage An issue we've not yet triaged or decided on status: feedback-provided Feedback has been provided labels May 17, 2022
@rstoyanchev rstoyanchev self-assigned this May 17, 2022
@rstoyanchev rstoyanchev changed the title Avoid collectList when sending a Flux of objects in request body using WebClient Avoid collectList when sending a Flux of objects as JSON using WebClient May 23, 2022
@hu553in
Copy link

hu553in commented Jul 11, 2023

@rstoyanchev is there a way to override this behavior to old-way now?
I need to temporarily configure Spring WebFlux Kotlin Flow endpoints to have Content-Type: application/json and to be collected to list before writing...
Because my frontend app is not able to process stream at the moment

@bclozel
Copy link
Member

bclozel commented Jul 11, 2023

@hu553in this issue is about the other way around: avoiding to buffer all elements before sending them as a single JSON document from the client. If you're using the server side of WebFlux and an application/json content type, data should not be streamed to the client. Maybe create a new StackOverflow question explaining the problem and showing how you're using this?

@hu553in
Copy link

hu553in commented Jul 11, 2023

@bclozel done, thank you... :)
will be glad if you check it, maybe you have simple answer.. I don't think that this is very complex issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) type: enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

6 participants