Skip to content

Commit

Permalink
[7.15] Fix AzureBlobStore#convertStreamToByteBuffer chunking in Windo…
Browse files Browse the repository at this point in the history
…ws (#78775)

Today we are using Math.ceil in order to calculate the number of
chunks in the request. Since we cast the values to a double...
there be dragons. This could cause issues depending on the platform.

This commit uses good old integers to compute the number of parts.

Backport of #78772
  • Loading branch information
fcofdez authored Oct 7, 2021
1 parent 7e58aa6 commit e9933ea
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,11 @@ public synchronized int read() throws IOException {
// reclaim them (see MonoSendMany). Additionally, that very same operator requests
// 128 elements (that's hardcoded) once it's subscribed (later on, it requests
// by 64 elements), that's why we provide 64kb buffers.
return Flux.range(0, (int) Math.ceil((double) length / (double) chunkSize))

// length is at most 100MB so it's safe to cast back to an integer in this case
final int parts = (int) length / chunkSize;
final long remaining = length % chunkSize;
return Flux.range(0, remaining == 0 ? parts : parts + 1)
.map(i -> i * chunkSize)
.concatMap(pos -> Mono.fromCallable(() -> {
long count = pos + chunkSize > length ? length - pos : chunkSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,11 @@ public void testWriteBlobWithRetries() throws Exception {
public void testWriteLargeBlob() throws Exception {
final int maxRetries = randomIntBetween(2, 5);

final byte[] data = randomBytes((int) ByteSizeUnit.MB.toBytes(10));
int nbBlocks = (int) Math.ceil((double) data.length / (double) ByteSizeUnit.MB.toBytes(1));
final byte[] data = randomBytes(ByteSizeUnit.MB.toIntBytes(10) + randomIntBetween(0, ByteSizeUnit.MB.toIntBytes(1)));
int nbBlocks = data.length / ByteSizeUnit.MB.toIntBytes(1);
if (data.length % ByteSizeUnit.MB.toIntBytes(1) != 0) {
nbBlocks += 1;
}

final int nbErrors = 2; // we want all requests to fail at least once
final AtomicInteger countDownUploads = new AtomicInteger(nbErrors * nbBlocks);
Expand Down Expand Up @@ -378,6 +381,9 @@ public void testWriteLargeBlob() throws Exception {
if (randomBoolean()) {
Streams.readFully(exchange.getRequestBody());
AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
} else {
long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length"));
readFromInputStream(exchange.getRequestBody(), randomLongBetween(0, contentLength));
}
exchange.close();
});
Expand Down Expand Up @@ -621,4 +627,16 @@ private String getEndpointForServer(HttpServer server, String accountName) {
InetSocketAddress address = server.getAddress();
return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort() + "/" + accountName;
}

private void readFromInputStream(InputStream inputStream, long bytesToRead) {
try {
long totalBytesRead = 0;
while (inputStream.read() != -1 && totalBytesRead < bytesToRead) {
totalBytesRead += 1;
}
assertThat(totalBytesRead, equalTo(bytesToRead));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

0 comments on commit e9933ea

Please sign in to comment.