diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index bf7c06af31e54..914fa2d138fc1 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -548,7 +548,11 @@ public synchronized int read() throws IOException { // reclaim them (see MonoSendMany). Additionally, that very same operator requests // 128 elements (that's hardcoded) once it's subscribed (later on, it requests // by 64 elements), that's why we provide 64kb buffers. - return Flux.range(0, (int) Math.ceil((double) length / (double) chunkSize)) + + // length is at most 100MB so it's safe to cast back to an integer in this case + final int parts = (int) length / chunkSize; + final long remaining = length % chunkSize; + return Flux.range(0, remaining == 0 ? parts : parts + 1) .map(i -> i * chunkSize) .concatMap(pos -> Mono.fromCallable(() -> { long count = pos + chunkSize > length ? length - pos : chunkSize; diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java index 0909fc7a5e237..f1a50150b0282 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java @@ -329,8 +329,11 @@ public void testWriteBlobWithRetries() throws Exception { public void testWriteLargeBlob() throws Exception { final int maxRetries = randomIntBetween(2, 5); - final byte[] data = randomBytes((int) ByteSizeUnit.MB.toBytes(10)); - int nbBlocks = (int) Math.ceil((double) data.length / (double) ByteSizeUnit.MB.toBytes(1)); + final byte[] data = randomBytes(ByteSizeUnit.MB.toIntBytes(10) + randomIntBetween(0, ByteSizeUnit.MB.toIntBytes(1))); + int nbBlocks = data.length / ByteSizeUnit.MB.toIntBytes(1); + if (data.length % ByteSizeUnit.MB.toIntBytes(1) != 0) { + nbBlocks += 1; + } final int nbErrors = 2; // we want all requests to fail at least once final AtomicInteger countDownUploads = new AtomicInteger(nbErrors * nbBlocks); @@ -378,6 +381,9 @@ public void testWriteLargeBlob() throws Exception { if (randomBoolean()) { Streams.readFully(exchange.getRequestBody()); AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE)); + } else { + long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length")); + readFromInputStream(exchange.getRequestBody(), randomLongBetween(0, contentLength)); } exchange.close(); }); @@ -621,4 +627,16 @@ private String getEndpointForServer(HttpServer server, String accountName) { InetSocketAddress address = server.getAddress(); return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort() + "/" + accountName; } + + private void readFromInputStream(InputStream inputStream, long bytesToRead) { + try { + long totalBytesRead = 0; + while (inputStream.read() != -1 && totalBytesRead < bytesToRead) { + totalBytesRead += 1; + } + assertThat(totalBytesRead, equalTo(bytesToRead)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } }