Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactive mechanism to consume a reactive stream of ByteBuffer and write to ByteChannel #2840

Closed
romain-grecourt opened this issue Mar 8, 2021 · 4 comments · Fixed by #2864
Assignees
Labels
bug Something isn't working P2 reactive Reactive streams and related components

Comments

@romain-grecourt
Copy link
Contributor

We do we have IoMulti that can be used to create a Publisher<ByteBuffer> from a ByteChannel, however we do not have a way to subscribe to a Publisher<ByteBuffer> and write the content to a ByteChannel.

The former is used to serve files in the static content support. The latter would be used to implement a file-upload reactively.
We do have various examples that are not implemented correctly:

  • see FileService under examples/media/multipart
  • see ServerFileWriter under examples/webserver/streaming
@romain-grecourt romain-grecourt added the reactive Reactive streams and related components label Mar 8, 2021
@romain-grecourt
Copy link
Contributor Author

CC @tomas-langer

@danielkec
Copy link
Contributor

danielkec commented Mar 9, 2021

I believe we can achieve that with current toolset, we can wrap it under IoMulti to make it simple to use

        ExecutorService executor = Executors.newSingleThreadExecutor();
        Path filePath = Files.createTempFile("large-file", ".tmp");
        FileChannel fileChannel = FileChannel.open(filePath, StandardOpenOption.WRITE);
        Multi.range(1, 10)
                .map(i -> "line" + i + "\n")
                .map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
                .flatMap(bb -> Single.create(CompletableFuture.runAsync(() -> {
                    try {
                        fileChannel.write(bb);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }, executor), true).map(unused -> bb), 
                        1 // I assume no parallelism is needed as ByteChannel write can be invoked only once at a time
                        , false, 1)
                .onComplete(() -> {
                    try {
                        fileChannel.close();
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                })
                .collectList()
                .await();
        
        executor.shutdown();
        
        System.out.println(Files.readString(filePath));

@romain-grecourt
Copy link
Contributor Author

romain-grecourt commented Mar 9, 2021

It would be nice to have that provided as a utility. Also, you do need to loop over the result of fileChannel.write as it may or may not write all the bytes (or any bytes). @tomas-langer was suggesting that when the return value is 0 we may want to slow down the loop to avoid using too much CPU (hence the suggestion of Thread.onSpinWait()).

@danielkec
Copy link
Contributor

Ok, I would use for that exponential backoff to avoid cpu burning as onSpinWait seems more suitable for short spins

@m0mus m0mus added bug Something isn't working P2 labels Mar 11, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P2 reactive Reactive streams and related components
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

3 participants