Skip to content

Commit

Permalink
Update streaming example to use IoMulti to demonstrate proper NIO usa…
Browse files Browse the repository at this point in the history
…ge. (#6604)

Fixes #2844
  • Loading branch information
romain-grecourt authored Apr 14, 2023
1 parent 1381dd5 commit 72a9299
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 178 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2021 Oracle and/or its affiliates.
* Copyright (c) 2018, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,11 +16,21 @@

package io.helidon.webserver.examples.streaming;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger;

import io.helidon.common.configurable.ScheduledThreadPoolSupplier;
import io.helidon.common.http.DataChunk;
import io.helidon.common.reactive.IoMulti;
import io.helidon.webserver.Routing;
import io.helidon.webserver.ServerRequest;
import io.helidon.webserver.ServerResponse;
Expand All @@ -34,12 +44,16 @@
*/
public class StreamingService implements Service {
private static final Logger LOGGER = Logger.getLogger(StreamingService.class.getName());

private final ScheduledExecutorService executor = ScheduledThreadPoolSupplier.create().get();
private final Path filePath;

StreamingService() {
URL resource = getClass().getResource(LARGE_FILE_RESOURCE);
if (resource == null) {
throw new IllegalStateException("Resource not found: " + LARGE_FILE_RESOURCE);
}
try {
filePath = Paths.get(getClass().getResource(LARGE_FILE_RESOURCE).toURI());
filePath = Paths.get(resource.toURI());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
Expand All @@ -48,20 +62,45 @@ public class StreamingService implements Service {
@Override
public void update(Routing.Rules routingRules) {
routingRules.get("/download", this::download)
.post("/upload", this::upload);
.post("/upload", this::upload);
}

private void upload(ServerRequest request, ServerResponse response) {
LOGGER.info("Entering upload ... " + Thread.currentThread());
request.content().subscribe(new ServerFileWriter(response));
Path tempFilePath = createTempFile("large-file", ".tmp");
request.content()
.map(DataChunk::data)
.flatMapIterable(Arrays::asList)
.to(IoMulti.writeToFile(tempFilePath)
.executor(executor)
.build());
LOGGER.info("Exiting upload ...");
}

private void download(ServerRequest request, ServerResponse response) {
LOGGER.info("Entering download ..." + Thread.currentThread());
long length = filePath.toFile().length();
response.headers().add("Content-Length", String.valueOf(length));
response.send(new ServerFileReader(filePath));
response.headers().contentLength(length);
response.send(IoMulti.multiFromByteChannelBuilder(newByteChannel(filePath))
.executor(executor)
.build());
LOGGER.info("Exiting download ...");
}

@SuppressWarnings("SameParameterValue")
private static Path createTempFile(String prefix, String suffix) {
try {
return Files.createTempFile(prefix, suffix);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}

private static ReadableByteChannel newByteChannel(Path path) {
try {
return Files.newByteChannel(path);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
}

0 comments on commit 72a9299

Please sign in to comment.