From 258593122e5fd17f0e19a5757a913152dcab3c62 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Thu, 16 Dec 2021 15:27:54 -0500 Subject: [PATCH 1/4] feat: allow limiting ReadChannel The ReadChannel returned from storage.reader and blob.reader now allow limiting the max number of bytes the channel will make available for read. Use ReadChannel#limit(long) to set the limit. This can be used in conjunction with seek to allow for range gets of objects independent of any buffer or chunk sizes. --- .../google/cloud/storage/BlobReadChannel.java | 36 +++++- .../cloud/storage/BlobReadChannelTest.java | 6 + .../storage/it/ITBlobReadChannelTest.java | 107 ++++++++++++++++++ 3 files changed, 145 insertions(+), 4 deletions(-) create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobReadChannelTest.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannel.java index 8d51ab3b8..d0ed5277a 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannel.java @@ -18,6 +18,7 @@ import static com.google.cloud.RetryHelper.runWithRetries; +import com.google.api.client.util.Preconditions; import com.google.api.gax.retrying.ResultRetryAlgorithm; import com.google.api.services.storage.model.StorageObject; import com.google.cloud.ReadChannel; @@ -52,6 +53,7 @@ class BlobReadChannel implements ReadChannel { private final StorageObject storageObject; private int bufferPos; private byte[] buffer; + private long limit; BlobReadChannel( StorageOptions serviceOptions, BlobId blob, Map requestOptions) { @@ -62,6 +64,7 @@ class BlobReadChannel implements ReadChannel { isOpen = true; storageRpc = serviceOptions.getStorageRpcV1(); storageObject = blob.toPb(); + this.limit = Long.MAX_VALUE; } @Override @@ -71,7 +74,8 @@ public RestorableState capture() { .setPosition(position) .setIsOpen(isOpen) .setEndOfStream(endOfStream) - .setChunkSize(chunkSize); + .setChunkSize(chunkSize) + .setLimit(limit); if (buffer != null) { builder.setPosition(position + bufferPos); builder.setEndOfStream(false); @@ -119,7 +123,8 @@ public int read(ByteBuffer byteBuffer) throws IOException { if (endOfStream) { return -1; } - final int toRead = Math.max(byteBuffer.remaining(), chunkSize); + final int toRead = + Math.min(Math.toIntExact(limit - position), Math.max(byteBuffer.remaining(), chunkSize)); try { ResultRetryAlgorithm algorithm = retryAlgorithmManager.getForObjectsGet(storageObject, requestOptions); @@ -158,6 +163,18 @@ public int read(ByteBuffer byteBuffer) throws IOException { return toWrite; } + @Override + public ReadChannel limit(long limit) { + Preconditions.checkArgument(limit >= 0, "Limit must be >= 0"); + this.limit = limit; + return this; + } + + @Override + public long limit() { + return limit; + } + static class StateImpl implements RestorableState, Serializable { private static final long serialVersionUID = 3889420316004453706L; @@ -170,6 +187,7 @@ static class StateImpl implements RestorableState, Serializable { private final boolean isOpen; private final boolean endOfStream; private final int chunkSize; + private final long limit; StateImpl(Builder builder) { this.serviceOptions = builder.serviceOptions; @@ -180,6 +198,7 @@ static class StateImpl implements RestorableState, Serializable { this.isOpen = builder.isOpen; this.endOfStream = builder.endOfStream; this.chunkSize = builder.chunkSize; + this.limit = builder.limit; } static class Builder { @@ -191,6 +210,7 @@ static class Builder { private boolean isOpen; private boolean endOfStream; private int chunkSize; + private long limit; private Builder(StorageOptions options, BlobId blob, Map reqOptions) { this.serviceOptions = options; @@ -223,6 +243,11 @@ Builder setChunkSize(int chunkSize) { return this; } + Builder setLimit(long limit) { + this.limit = limit; + return this; + } + RestorableState build() { return new StateImpl(this); } @@ -241,13 +266,14 @@ public ReadChannel restore() { channel.isOpen = isOpen; channel.endOfStream = endOfStream; channel.chunkSize = chunkSize; + channel.limit = limit; return channel; } @Override public int hashCode() { return Objects.hash( - serviceOptions, blob, requestOptions, lastEtag, position, isOpen, endOfStream, chunkSize); + serviceOptions, blob, requestOptions, lastEtag, position, isOpen, endOfStream, chunkSize, limit); } @Override @@ -266,7 +292,8 @@ public boolean equals(Object obj) { && this.position == other.position && this.isOpen == other.isOpen && this.endOfStream == other.endOfStream - && this.chunkSize == other.chunkSize; + && this.chunkSize == other.chunkSize + && this.limit == other.limit; } @Override @@ -276,6 +303,7 @@ public String toString() { .add("position", position) .add("isOpen", isOpen) .add("endOfStream", endOfStream) + .add("limit", limit) .toString(); } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobReadChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobReadChannelTest.java index dafb0b2a5..6eb2b8aa1 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobReadChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobReadChannelTest.java @@ -223,13 +223,19 @@ public void testSaveAndRestore() throws IOException { public void testStateEquals() { replay(storageRpcMock); reader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS); + int limit = 342; + reader.limit(limit); @SuppressWarnings("resource") // avoid closing when you don't want partial writes to GCS ReadChannel secondReader = new BlobReadChannel(options, BLOB_ID, EMPTY_RPC_OPTIONS); + secondReader.limit(limit); RestorableState state = reader.capture(); RestorableState secondState = secondReader.capture(); assertEquals(state, secondState); assertEquals(state.hashCode(), secondState.hashCode()); assertEquals(state.toString(), secondState.toString()); + + ReadChannel restore = secondState.restore(); + assertEquals(limit, restore.limit()); } private static byte[] randomByteArray(int size) { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobReadChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobReadChannelTest.java new file mode 100644 index 000000000..2b91ab070 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobReadChannelTest.java @@ -0,0 +1,107 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage.it; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.NoCredentials; +import com.google.cloud.ReadChannel; +import com.google.cloud.WriteChannel; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Bucket; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.DataGeneration; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; +import com.google.cloud.storage.conformance.retry.TestBench; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public final class ITBlobReadChannelTest { + + private static final int _16MiB = 16 * 1024 * 1024; + private static final int _256KiB = 256 * 1024; + + @ClassRule + public static final TestBench testBench = + TestBench.newBuilder().setContainerName("blob-read-channel-test").build(); + + @Rule public final TestName testName = new TestName(); + + @Rule public final DataGeneration dataGeneration = new DataGeneration(new Random(872364872)); + + @Test + public void testLimit_smallerThanOneChunk() throws IOException { + int srcContentSize = _256KiB; + int rangeBegin = 57; + int rangeEnd = 2384; + int chunkSize = _16MiB; + doLimitTest(srcContentSize, rangeBegin, rangeEnd, chunkSize); + } + + @Test + public void testLimit_largerThanOneChunk() throws IOException { + int srcContentSize = _16MiB + (_256KiB * 3); + int rangeBegin = 384; + int rangeEnd = rangeBegin + _16MiB; + int chunkSize = _16MiB; + + doLimitTest(srcContentSize, rangeBegin, rangeEnd, chunkSize); + } + + private void doLimitTest(int srcContentSize, int rangeBegin, int rangeEnd, int chunkSize) + throws IOException { + Storage s = + StorageOptions.newBuilder() + .setProjectId("blob-read-channel-test") + .setHost(testBench.getBaseUri()) + .setCredentials(NoCredentials.getInstance()) + .build() + .getService(); + + String testNameMethodName = testName.getMethodName(); + String bucketName = String.format("bucket-%s", testNameMethodName.toLowerCase()); + String blobName = String.format("%s/src", testNameMethodName); + + Bucket bucket = s.create(BucketInfo.of(bucketName)); + BlobInfo src = BlobInfo.newBuilder(bucket, blobName).build(); + ByteBuffer content = dataGeneration.randByteBuffer(srcContentSize); + ByteBuffer expectedSubContent = content.duplicate(); + expectedSubContent.position(rangeBegin); + expectedSubContent.limit(rangeEnd); + try (WriteChannel writer = s.writer(src)) { + writer.write(content); + } + + ByteBuffer actual = ByteBuffer.allocate(rangeEnd - rangeBegin); + + try (ReadChannel reader = s.reader(src.getBlobId())) { + reader.setChunkSize(chunkSize); + reader.seek(rangeBegin); + reader.limit(rangeEnd); + reader.read(actual); + actual.flip(); + } + + assertThat(actual).isEqualTo(expectedSubContent); + } +} From a118c91ff4d74486ac7b6501fdffe1b7998cc4a4 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Fri, 18 Mar 2022 12:53:24 -0400 Subject: [PATCH 2/4] chore: min before toIntExact --- .../main/java/com/google/cloud/storage/BlobReadChannel.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannel.java index d0ed5277a..de53237c0 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannel.java @@ -123,8 +123,8 @@ public int read(ByteBuffer byteBuffer) throws IOException { if (endOfStream) { return -1; } - final int toRead = - Math.min(Math.toIntExact(limit - position), Math.max(byteBuffer.remaining(), chunkSize)); + final int toRead = Math.toIntExact( + Math.min(limit - position, Math.max(byteBuffer.remaining(), chunkSize))); try { ResultRetryAlgorithm algorithm = retryAlgorithmManager.getForObjectsGet(storageObject, requestOptions); From e90d184fc773d41b0e2724a6b2d2a311371d28c5 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Fri, 18 Mar 2022 12:54:12 -0400 Subject: [PATCH 3/4] chore: 2022 --- .../java/com/google/cloud/storage/it/ITBlobReadChannelTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobReadChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobReadChannelTest.java index 2b91ab070..3f1470a02 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobReadChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobReadChannelTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 Google LLC + * Copyright 2022 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 6279d3d02b33823aa04bc9f7ffa5649177d506c5 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Fri, 18 Mar 2022 12:58:51 -0400 Subject: [PATCH 4/4] chore: fmt --- .../com/google/cloud/storage/BlobReadChannel.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannel.java index de53237c0..678f138ef 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannel.java @@ -123,8 +123,8 @@ public int read(ByteBuffer byteBuffer) throws IOException { if (endOfStream) { return -1; } - final int toRead = Math.toIntExact( - Math.min(limit - position, Math.max(byteBuffer.remaining(), chunkSize))); + final int toRead = + Math.toIntExact(Math.min(limit - position, Math.max(byteBuffer.remaining(), chunkSize))); try { ResultRetryAlgorithm algorithm = retryAlgorithmManager.getForObjectsGet(storageObject, requestOptions); @@ -273,7 +273,15 @@ public ReadChannel restore() { @Override public int hashCode() { return Objects.hash( - serviceOptions, blob, requestOptions, lastEtag, position, isOpen, endOfStream, chunkSize, limit); + serviceOptions, + blob, + requestOptions, + lastEtag, + position, + isOpen, + endOfStream, + chunkSize, + limit); } @Override