Skip to content

Commit

Permalink
Merge pull request #4 from jianghaolu/no-core-netty-fluxutil
Browse files Browse the repository at this point in the history
Add file read/write to FluxUtil
  • Loading branch information
JonathanGiles authored Aug 12, 2019
2 parents 92c7977 + d6276ab commit 6c69d62
Show file tree
Hide file tree
Showing 5 changed files with 378 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,65 +106,60 @@ public void testCanReadEmptyFile() throws IOException {

@Test
public void testAsynchronyShortInput() throws IOException {
// File file = createFileIfNotExist("target/test3");
// FileOutputStream stream = new FileOutputStream(file);
// stream.write("hello there".getBytes(StandardCharsets.UTF_8));
// stream.close();
// try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ)) {
// byte[] bytes = FluxUtil.byteBufStreamFromFile(channel)
// .map(bb -> {
// byte[] bt = bb.array();
// ReferenceCountUtil.release(bb);
// return bt;
// })
// .limitRequest(1)
// .subscribeOn(reactor.core.scheduler.Schedulers.newElastic("io", 30))
// .publishOn(reactor.core.scheduler.Schedulers.newElastic("io", 30))
// .collect(() -> new ByteArrayOutputStream(),
// (bos, b) -> {
// try {
// bos.write(b);
// } catch (IOException ioe) {
// throw Exceptions.propagate(ioe);
// }
// })
// .block()
// .toByteArray();
// assertEquals("hello there", new String(bytes, StandardCharsets.UTF_8));
// }
// assertTrue(file.delete());
Assert.fail("Need to implement this test again");
File file = createFileIfNotExist("target/test3");
FileOutputStream stream = new FileOutputStream(file);
stream.write("hello there".getBytes(StandardCharsets.UTF_8));
stream.close();
try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ)) {
byte[] bytes = FluxUtil.readFile(channel)
.map(bb -> {
byte[] bt = new byte[bb.remaining()];
bb.get(bt);
return bt;
})
.limitRequest(1)
.subscribeOn(reactor.core.scheduler.Schedulers.newElastic("io", 30))
.publishOn(reactor.core.scheduler.Schedulers.newElastic("io", 30))
.collect(() -> new ByteArrayOutputStream(),
(bos, b) -> {
try {
bos.write(b);
} catch (IOException ioe) {
throw Exceptions.propagate(ioe);
}
})
.block()
.toByteArray();
assertEquals("hello there", new String(bytes, StandardCharsets.UTF_8));
}
assertTrue(file.delete());
}

private static final int NUM_CHUNKS_IN_LONG_INPUT = 10_000_000;

@Test
public void testAsynchronyLongInput() throws IOException, NoSuchAlgorithmException {
// File file = createFileIfNotExist("target/test4");
// byte[] array = "1234567690".getBytes(StandardCharsets.UTF_8);
// MessageDigest digest = MessageDigest.getInstance("MD5");
// try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(file))) {
// for (int i = 0; i < NUM_CHUNKS_IN_LONG_INPUT; i++) {
// out.write(array);
// digest.update(array);
// }
// }
// System.out.println("long input file size=" + file.length() / (1024 * 1024) + "MB");
// byte[] expected = digest.digest();
// digest.reset();
// try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ)) {
// FluxUtil.byteBufStreamFromFile(channel)
// .subscribeOn(reactor.core.scheduler.Schedulers.newElastic("io", 30))
// .publishOn(reactor.core.scheduler.Schedulers.newElastic("io", 30))
// .toIterable().forEach(bb -> {
// digest.update(bb);
// ReferenceCountUtil.release(bb);
// });
//
// assertArrayEquals(expected, digest.digest());
// }
// assertTrue(file.delete());
Assert.fail("Need to implement this test again");
File file = createFileIfNotExist("target/test4");
byte[] array = "1234567690".getBytes(StandardCharsets.UTF_8);
MessageDigest digest = MessageDigest.getInstance("MD5");
try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(file))) {
for (int i = 0; i < NUM_CHUNKS_IN_LONG_INPUT; i++) {
out.write(array);
digest.update(array);
}
}
System.out.println("long input file size=" + file.length() / (1024 * 1024) + "MB");
byte[] expected = digest.digest();
digest.reset();
try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ)) {
FluxUtil.readFile(channel)
.subscribeOn(reactor.core.scheduler.Schedulers.newElastic("io", 30))
.publishOn(reactor.core.scheduler.Schedulers.newElastic("io", 30))
.toIterable().forEach(digest::update);

assertArrayEquals(expected, digest.digest());
}
assertTrue(file.delete());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private Mono<Void> logRequest(final ClientLogger logger, final HttpRequest reque

if (contentLength < MAX_BODY_LOG_SIZE && isHumanReadableContentType) {
try {
Mono<byte[]> collectedBytes = FluxUtil.collectBytesInByteBufferStream(request.body(), true);
Mono<byte[]> collectedBytes = FluxUtil.collectBytesInByteBufferStream(request.body());
reqBodyLoggingMono = collectedBytes.flatMap(bytes -> {
String bodyString = new String(bytes, StandardCharsets.UTF_8);
bodyString = prettyPrintIfNeeded(logger, request.headers().value("Content-Type"), bodyString);
Expand Down
Loading

0 comments on commit 6c69d62

Please sign in to comment.