From e0191b518e50a49fae0691894b50f0c5f33fc6af Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Tue, 25 Jul 2023 15:01:12 -0400 Subject: [PATCH] feat: introduce new BlobWriteSession (#2123) When writing a new Blob to GCS, there are secondary session related state and actions which can't be represented by WriteChannel. BlobWriteSession provides a new construct to allow retrieving the resultant object which is created after the WritableByteChannel is closed. Along with the new session, configuration for this is now performed at the StorageOptions level where cross session considerations can influence the implementation of the returned session. The configurable option present for this new StorageWriterConfig is chunkSize. In the future new configurations will be added with their corresponding options. For example, in a future release it will be possible to change from in memory buffering to instead buffer to disk thereby reducing heap usage. --- .../clirr-ignored-differences.xml | 24 +-- .../cloud/storage/BlobWriteSession.java | 73 ++++++++ .../cloud/storage/BlobWriteSessionConfig.java | 59 +++++++ .../storage/BlobWriteSessionConfigs.java | 49 ++++++ .../cloud/storage/BlobWriteSessions.java | 48 ++++++ .../cloud/storage/CrossTransportUtils.java | 67 +++++++ .../DefaultBlobWriteSessionConfig.java | 163 ++++++++++++++++++ .../google/cloud/storage/GapicCopyWriter.java | 2 +- .../cloud/storage/GrpcBlobReadChannel.java | 2 +- .../cloud/storage/GrpcBlobWriteChannel.java | 2 +- .../google/cloud/storage/GrpcStorageImpl.java | 62 ++++--- .../cloud/storage/GrpcStorageOptions.java | 32 +++- .../com/google/cloud/storage/Storage.java | 50 ++++++ .../google/cloud/storage/StorageInternal.java | 29 ++++ .../com/google/cloud/storage/UnifiedOpts.java | 2 +- .../storage/TransportCompatibilityTest.java | 2 +- .../storage/it/ITBlobWriteSessionTest.java | 121 +++++++++++++ .../runner/registry/AbstractStorageProxy.java | 6 + 18 files changed, 740 insertions(+), 53 deletions(-) create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSession.java create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfig.java create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessions.java create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/CrossTransportUtils.java create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java diff --git a/google-cloud-storage/clirr-ignored-differences.xml b/google-cloud-storage/clirr-ignored-differences.xml index ad681c4be3..84d9907047 100644 --- a/google-cloud-storage/clirr-ignored-differences.xml +++ b/google-cloud-storage/clirr-ignored-differences.xml @@ -1,29 +1,11 @@ - + 7012 - com/google/cloud/storage/UnbufferedWritableByteChannelSession$UnbufferedWritableByteChannel - * write(*) - - - - 7012 - com/google/cloud/storage/spi/v1/StorageRpc - * getStorage() - - - - 8001 - com/google/cloud/storage/Hasher$ConstantConcatValueHasher - - - - - 7002 - com/google/cloud/storage/HttpDownloadSessionBuilder$ReadableByteChannelSessionBuilder - com.google.cloud.storage.HttpDownloadSessionBuilder$ReadableByteChannelSessionBuilder setCallback(java.util.function.Consumer) + com/google/cloud/storage/Storage + com.google.cloud.storage.BlobWriteSession blobWriteSession(com.google.cloud.storage.BlobInfo, com.google.cloud.storage.Storage$BlobWriteOption[]) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSession.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSession.java new file mode 100644 index 0000000000..02ea23a6a7 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSession.java @@ -0,0 +1,73 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.api.core.ApiFuture; +import com.google.api.core.BetaApi; +import java.io.IOException; +import java.nio.channels.WritableByteChannel; + +/** + * A session to write an object to Google Cloud Storage. + * + *

A session can only write a single version of an object. If writing multiple versions of an + * object a new session must be created each time. + * + *

Provides an api that allows writing to and retrieving the resulting {@link BlobInfo} after + * write finalization. + * + *

The underlying implementation is dictated based upon the specified {@link + * BlobWriteSessionConfig} provided at {@link StorageOptions} creation time. + * + * @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig) + * @see BlobWriteSessionConfig + * @see BlobWriteSessionConfigs + * @since 2.26.0 This new api is in preview and is subject to breaking changes. + */ +@BetaApi +public interface BlobWriteSession { + + /** + * Open the {@link WritableByteChannel} for this session. + * + *

A session may only be {@code open}ed once. If multiple calls to open are made, an illegal + * state exception will be thrown + * + *

Upon calling {@link WritableByteChannel#close()} the object creation will be finalized, and + * {@link #getResult()}s future should resolve. + * + * @throws IOException When creating the {@link WritableByteChannel} if an unrecoverable + * underlying IOException occurs it can be rethrown + * @throws IllegalStateException if open is called more than once + * @since 2.26.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + WritableByteChannel open() throws IOException; + + /** + * Return an {@link ApiFuture}{@code } which will represent the state of the object upon + * finalization and success response from Google Cloud Storage. + * + *

This future will not resolve until: 1. The object is successfully finalized and created in + * Google Cloud Storage 2. A terminal failure occurs, the terminal failure will become the + * exception result + * + * @since 2.26.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + ApiFuture getResult(); +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfig.java new file mode 100644 index 0000000000..de8622c754 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfig.java @@ -0,0 +1,59 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.api.core.InternalApi; +import com.google.cloud.storage.Conversions.Decoder; +import com.google.cloud.storage.Storage.BlobWriteOption; +import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; +import com.google.cloud.storage.UnifiedOpts.Opts; +import com.google.storage.v2.WriteObjectResponse; +import java.io.IOException; +import java.time.Clock; + +/** + * A sealed internal implementation only class which provides the means of configuring a {@link + * BlobWriteSession}. + * + *

A {@code BlobWriteSessionConfig} will be used to configure all {@link BlobWriteSession}s + * produced by an instance of {@link Storage}. + * + * @see BlobWriteSessionConfigs + * @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig) + * @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...) + * @since 2.26.0 This new api is in preview and is subject to breaking changes. + */ +// When we have java modules, actually seal this to internal extension only +@InternalApi +public abstract class BlobWriteSessionConfig { + + @InternalApi + BlobWriteSessionConfig() {} + + @InternalApi + abstract WriterFactory createFactory(Clock clock) throws IOException; + + @InternalApi + interface WriterFactory { + @InternalApi + WritableByteChannelSession writeSession( + StorageInternal s, + BlobInfo info, + Opts opts, + Decoder d); + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java new file mode 100644 index 0000000000..cc5e691e6b --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java @@ -0,0 +1,49 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.api.core.BetaApi; +import com.google.cloud.storage.GrpcStorageOptions.GrpcStorageDefaults; +import com.google.cloud.storage.Storage.BlobWriteOption; + +/** + * Factory class to select and construct {@link BlobWriteSessionConfig}s. + * + * @see BlobWriteSessionConfig + * @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig) + * @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...) + * @since 2.26.0 This new api is in preview and is subject to breaking changes. + */ +@BetaApi +public final class BlobWriteSessionConfigs { + + private BlobWriteSessionConfigs() {} + + /** + * Factory to produce the default configuration for uploading an object to Cloud Storage. + * + *

Configuration of the chunk size can be performed via {@link + * DefaultBlobWriteSessionConfig#withChunkSize(int)}. + * + * @see GrpcStorageDefaults#getDefaultStorageWriterConfig() + * @since 2.26.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + public static DefaultBlobWriteSessionConfig getDefault() { + return new DefaultBlobWriteSessionConfig(ByteSizeConstants._16MiB); + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessions.java new file mode 100644 index 0000000000..878552a125 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessions.java @@ -0,0 +1,48 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.api.core.ApiFuture; +import java.io.IOException; +import java.nio.channels.WritableByteChannel; + +final class BlobWriteSessions { + + private BlobWriteSessions() {} + + static BlobWriteSession of(WritableByteChannelSession s) { + return new WritableByteChannelSessionAdapter(s); + } + + static final class WritableByteChannelSessionAdapter implements BlobWriteSession { + private final WritableByteChannelSession delegate; + + private WritableByteChannelSessionAdapter(WritableByteChannelSession delegate) { + this.delegate = delegate; + } + + @Override + public WritableByteChannel open() throws IOException { + return delegate.open(); + } + + @Override + public ApiFuture getResult() { + return delegate.getResult(); + } + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/CrossTransportUtils.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/CrossTransportUtils.java new file mode 100644 index 0000000000..1c5aa1d97d --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/CrossTransportUtils.java @@ -0,0 +1,67 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.cloud.storage.TransportCompatibility.Transport; +import java.util.Arrays; +import java.util.stream.Collectors; + +final class CrossTransportUtils { + + static T throwHttpJsonOnly(String methodName) { + return throwHttpJsonOnly(Storage.class, methodName); + } + + static T throwHttpJsonOnly(Class clazz, String methodName) { + return throwTransportOnly(clazz, methodName, Transport.HTTP); + } + + static T throwGrpcOnly(String methodName) { + return throwGrpcOnly(Storage.class, methodName); + } + + static T throwGrpcOnly(Class clazz, String methodName) { + return throwTransportOnly(clazz, methodName, Transport.GRPC); + } + + static T throwTransportOnly(Class clazz, String methodName, Transport transport) { + String builder; + switch (transport) { + case HTTP: + builder = "StorageOptions.http()"; + break; + case GRPC: + builder = "StorageOptions.grpc()"; + break; + default: + throw new IllegalStateException( + String.format("Broken Java Enum: %s received value: '%s'", Transport.class, transport)); + } + String message = + String.format( + "%s#%s is only supported for %s transport. Please use %s to construct a compatible instance.", + clazz.getName(), methodName, transport, builder); + throw new UnsupportedOperationException(message); + } + + static String fmtMethodName(String name, Class... args) { + return name + + "(" + + Arrays.stream(args).map(Class::getName).collect(Collectors.joining(", ")) + + ")"; + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java new file mode 100644 index 0000000000..dfea0d4190 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java @@ -0,0 +1,163 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.core.BetaApi; +import com.google.api.core.InternalApi; +import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel; +import com.google.cloud.storage.Conversions.Decoder; +import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; +import com.google.cloud.storage.UnifiedOpts.Opts; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.storage.v2.WriteObjectResponse; +import java.nio.channels.WritableByteChannel; +import java.time.Clock; +import javax.annotation.concurrent.Immutable; + +/** + * Default Configuration to represent uploading to Google Cloud Storage in a chunked manner. + * + *

Perform a resumable upload, uploading at most {@code chunkSize} bytes each PUT. + * + *

Configuration of chunk size can be performed via {@link + * DefaultBlobWriteSessionConfig#withChunkSize(int)}. + * + *

An instance of this class will provide a {@link BlobWriteSession} is logically equivalent to + * the following: + * + *

{@code
+ * Storage storage = ...;
+ * WriteChannel writeChannel = storage.writer(BlobInfo, BlobWriteOption);
+ * writeChannel.setChunkSize(chunkSize);
+ * }
+ * + * @since 2.26.0 This new api is in preview and is subject to breaking changes. + */ +@Immutable +@BetaApi +public final class DefaultBlobWriteSessionConfig extends BlobWriteSessionConfig { + + private final int chunkSize; + + @InternalApi + DefaultBlobWriteSessionConfig(int chunkSize) { + this.chunkSize = chunkSize; + } + + /** + * The number of bytes each chunk can be. + * + *

Default: {@code 16777216 (16 MiB)} + * + * @see #withChunkSize(int) + * @since 2.26.0 This new api is in preview and is subject to breaking changes. + */ + public int getChunkSize() { + return chunkSize; + } + + /** + * Create a new instance with the {@code chunkSize} set to the specified value. + * + *

Default: {@code 16777216 (16 MiB)} + * + * @param chunkSize The number of bytes each chunk should be. Must be >= {@code 262144 (256 KiB)} + * @return The new instance + * @see #getChunkSize() + * @since 2.26.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + public DefaultBlobWriteSessionConfig withChunkSize(int chunkSize) { + Preconditions.checkArgument( + chunkSize >= ByteSizeConstants._256KiB, + "chunkSize must be >= %d", + ByteSizeConstants._256KiB); + return new DefaultBlobWriteSessionConfig(chunkSize); + } + + @Override + @InternalApi + WriterFactory createFactory(Clock clock) { + return new Factory(chunkSize); + } + + @InternalApi + private static final class Factory implements WriterFactory { + + private final int chunkSize; + + private Factory(int chunkSize) { + this.chunkSize = chunkSize; + } + + @InternalApi + @Override + public WritableByteChannelSession writeSession( + StorageInternal s, + BlobInfo info, + Opts opts, + Decoder d) { + // todo: invert this + // make GrpcBlobWriteChannel use this factory to produce its WriteSession + if (s instanceof GrpcStorageImpl) { + GrpcStorageImpl g = (GrpcStorageImpl) s; + GrpcBlobWriteChannel writer = g.internalWriter(info, opts); + writer.setChunkSize(chunkSize); + WritableByteChannelSession session = + writer.newLazyWriteChannel().getSession(); + return new DecoratedWritableByteChannelSession<>(session, d); + } + return CrossTransportUtils.throwGrpcOnly(DefaultBlobWriteSessionConfig.class, ""); + } + } + + private static final class DecoratedWritableByteChannelSession + implements WritableByteChannelSession { + + private final WritableByteChannelSession delegate; + private final Decoder decoder; + + private DecoratedWritableByteChannelSession( + WritableByteChannelSession delegate, Decoder decoder) { + this.delegate = delegate; + this.decoder = decoder; + } + + @Override + public WBC open() { + try { + return WritableByteChannelSession.super.open(); + } catch (Exception e) { + throw StorageException.coalesce(e); + } + } + + @Override + public ApiFuture openAsync() { + return delegate.openAsync(); + } + + @Override + public ApiFuture getResult() { + return ApiFutures.transform( + delegate.getResult(), decoder::decode, MoreExecutors.directExecutor()); + } + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicCopyWriter.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicCopyWriter.java index cae70d6767..038ff46672 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicCopyWriter.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicCopyWriter.java @@ -87,6 +87,6 @@ public void copyChunk() { @Override public RestorableState capture() { - return GrpcStorageImpl.throwHttpJsonOnly(CopyWriter.class, "capture"); + return CrossTransportUtils.throwHttpJsonOnly(CopyWriter.class, "capture"); } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java index b58b9663f7..4ae3f24466 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java @@ -43,7 +43,7 @@ final class GrpcBlobReadChannel extends BaseStorageReadChannel { @Override public RestorableState capture() { - return GrpcStorageImpl.throwHttpJsonOnly(ReadChannel.class, "capture"); + return CrossTransportUtils.throwHttpJsonOnly(ReadChannel.class, "capture"); } @Override diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobWriteChannel.java index f3520180b3..a1b1d30306 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobWriteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobWriteChannel.java @@ -50,7 +50,7 @@ final class GrpcBlobWriteChannel extends BaseStorageWriteChannel capture() { - return GrpcStorageImpl.throwHttpJsonOnly(WriteChannel.class, "capture"); + return CrossTransportUtils.throwHttpJsonOnly(WriteChannel.class, "capture"); } @Override diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index d7d4059196..fdd67a7eb7 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -18,6 +18,8 @@ import static com.google.cloud.storage.ByteSizeConstants._16MiB; import static com.google.cloud.storage.ByteSizeConstants._256KiB; +import static com.google.cloud.storage.CrossTransportUtils.fmtMethodName; +import static com.google.cloud.storage.CrossTransportUtils.throwHttpJsonOnly; import static com.google.cloud.storage.GrpcToHttpStatusCodeTranslation.resultRetryAlgorithmToCodes; import static com.google.cloud.storage.StorageV2ProtoUtils.bucketAclEntityOrAltEq; import static com.google.cloud.storage.StorageV2ProtoUtils.objectAclEntityOrAltEq; @@ -44,6 +46,7 @@ import com.google.cloud.Policy; import com.google.cloud.WriteChannel; import com.google.cloud.storage.Acl.Entity; +import com.google.cloud.storage.BlobWriteSessionConfig.WriterFactory; import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel; import com.google.cloud.storage.Conversions.Decoder; import com.google.cloud.storage.HmacKey.HmacKeyMetadata; @@ -131,7 +134,6 @@ import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; @@ -145,13 +147,12 @@ import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.UnaryOperator; -import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; import org.checkerframework.checker.nullness.qual.Nullable; @BetaApi -final class GrpcStorageImpl extends BaseService implements Storage { +final class GrpcStorageImpl extends BaseService implements StorageInternal { private static final byte[] ZERO_BYTES = new byte[0]; private static final Set READ_OPS = ImmutableSet.of(StandardOpenOption.READ); @@ -163,22 +164,30 @@ final class GrpcStorageImpl extends BaseService implements Stora private static final BucketSourceOption[] EMPTY_BUCKET_SOURCE_OPTIONS = new BucketSourceOption[0]; final StorageClient storageClient; + final WriterFactory writerFactory; final GrpcConversions codecs; final GrpcRetryAlgorithmManager retryAlgorithmManager; final SyntaxDecoders syntaxDecoders; + private final Decoder writeObjectResponseBlobInfoDecoder; // workaround for https://github.com/googleapis/java-storage/issues/1736 private final Opts defaultOpts; @Deprecated private final ProjectId defaultProjectId; GrpcStorageImpl( - GrpcStorageOptions options, StorageClient storageClient, Opts defaultOpts) { + GrpcStorageOptions options, + StorageClient storageClient, + WriterFactory writerFactory, + Opts defaultOpts) { super(options); this.storageClient = storageClient; + this.writerFactory = writerFactory; this.defaultOpts = defaultOpts; this.codecs = Conversions.grpc(); this.retryAlgorithmManager = options.getRetryAlgorithmManager(); this.syntaxDecoders = new SyntaxDecoders(); + this.writeObjectResponseBlobInfoDecoder = + codecs.blobInfo().compose(WriteObjectResponse::getResource); this.defaultProjectId = UnifiedOpts.projectId(options.getProjectId()); } @@ -278,15 +287,21 @@ public Blob createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... options) @Override public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options) throws IOException { + Opts opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts); + return internalCreateFrom(path, blobInfo, opts); + } + + @Override + public Blob internalCreateFrom(Path path, BlobInfo info, Opts opts) + throws IOException { requireNonNull(path, "path must be non null"); if (Files.isDirectory(path)) { throw new StorageException(0, path + " is a directory"); } - Opts opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts); GrpcCallContext grpcCallContext = opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); - WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts); + WriteObjectRequest req = getWriteObjectRequest(info, opts); ClientStreamingCallable write = storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext); @@ -714,9 +729,14 @@ public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption. @Override public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) { Opts opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts); + return internalWriter(blobInfo, opts); + } + + @Override + public GrpcBlobWriteChannel internalWriter(BlobInfo info, Opts opts) { GrpcCallContext grpcCallContext = opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); - WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts); + WriteObjectRequest req = getWriteObjectRequest(info, opts); Hasher hasher = Hasher.noop(); return new GrpcBlobWriteChannel( storageClient.writeObjectCallable(), @@ -1483,6 +1503,15 @@ public boolean deleteNotification(String bucket, String notificationId) { Decoder.identity())); } + @BetaApi + @Override + public BlobWriteSession blobWriteSession(BlobInfo info, BlobWriteOption... options) { + Opts opts = Opts.unwrap(options).resolveFrom(info); + WritableByteChannelSession writableByteChannelSession = + writerFactory.writeSession(this, info, opts, writeObjectResponseBlobInfoDecoder); + return BlobWriteSessions.of(writableByteChannelSession); + } + @Override public GrpcStorageOptions getOptions() { return (GrpcStorageOptions) super.getOptions(); @@ -1720,25 +1749,6 @@ public boolean tryAdvance(Consumer action) { return StreamSupport.stream(spliterator, false); } - static T throwHttpJsonOnly(String methodName) { - return throwHttpJsonOnly(Storage.class, methodName); - } - - static T throwHttpJsonOnly(Class clazz, String methodName) { - String message = - String.format( - "%s#%s is only supported for HTTP_JSON transport. Please use StorageOptions.http() to construct a compatible instance.", - clazz.getName(), methodName); - throw new UnsupportedOperationException(message); - } - - private static String fmtMethodName(String name, Class... args) { - return name - + "(" - + Arrays.stream(args).map(Class::getName).collect(Collectors.joining(", ")) - + ")"; - } - ReadObjectRequest getReadObjectRequest(BlobId blob, Opts opts) { Object object = codecs.blobId().encode(blob); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java index d623745a20..8bb3115c52 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java @@ -42,6 +42,7 @@ import com.google.cloud.Tuple; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.spi.ServiceRpcFactory; +import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.TransportCompatibility.Transport; import com.google.cloud.storage.UnifiedOpts.Opts; import com.google.cloud.storage.UnifiedOpts.UserProject; @@ -58,6 +59,7 @@ import java.io.IOException; import java.io.Serializable; import java.net.URI; +import java.time.Clock; import java.util.List; import java.util.Locale; import java.util.Map; @@ -82,6 +84,7 @@ public final class GrpcStorageOptions extends StorageOptions private final Duration terminationAwaitDuration; private final boolean attemptDirectPath; private final GrpcInterceptorProvider grpcInterceptorProvider; + private final BlobWriteSessionConfig blobWriteSessionConfig; private GrpcStorageOptions(Builder builder, GrpcStorageDefaults serviceDefaults) { super(builder, serviceDefaults); @@ -94,6 +97,7 @@ private GrpcStorageOptions(Builder builder, GrpcStorageDefaults serviceDefaults) builder.terminationAwaitDuration, serviceDefaults.getTerminationAwaitDuration()); this.attemptDirectPath = builder.attemptDirectPath; this.grpcInterceptorProvider = builder.grpcInterceptorProvider; + this.blobWriteSessionConfig = builder.blobWriteSessionConfig; } @Override @@ -346,6 +350,8 @@ public static final class Builder extends StorageOptions.Builder { private boolean attemptDirectPath = GrpcStorageDefaults.INSTANCE.isAttemptDirectPath(); private GrpcInterceptorProvider grpcInterceptorProvider = GrpcStorageDefaults.INSTANCE.grpcInterceptorProvider(); + private BlobWriteSessionConfig blobWriteSessionConfig = + GrpcStorageDefaults.INSTANCE.getDefaultStorageWriterConfig(); Builder() {} @@ -506,6 +512,21 @@ public GrpcStorageOptions.Builder setGrpcInterceptorProvider( return this; } + /** + * @see BlobWriteSessionConfig + * @see BlobWriteSessionConfigs + * @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...) + * @see GrpcStorageDefaults#getDefaultStorageWriterConfig() + * @since 2.26.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + public GrpcStorageOptions.Builder setBlobWriteSessionConfig( + @NonNull BlobWriteSessionConfig blobWriteSessionConfig) { + requireNonNull(blobWriteSessionConfig, "blobWriteSessionConfig must be non null"); + this.blobWriteSessionConfig = blobWriteSessionConfig; + return this; + } + /** @since 2.14.0 This new api is in preview and is subject to breaking changes. */ @BetaApi @Override @@ -569,6 +590,12 @@ public boolean isAttemptDirectPath() { public GrpcInterceptorProvider grpcInterceptorProvider() { return INTERCEPTOR_PROVIDER; } + + /** @since 2.26.0 This new api is in preview and is subject to breaking changes. */ + @BetaApi + public BlobWriteSessionConfig getDefaultStorageWriterConfig() { + return BlobWriteSessionConfigs.getDefault(); + } } /** @@ -618,7 +645,10 @@ public Storage create(StorageOptions options) { StorageSettings storageSettings = t.x(); Opts defaultOpts = t.y(); return new GrpcStorageImpl( - grpcStorageOptions, StorageClient.create(storageSettings), defaultOpts); + grpcStorageOptions, + StorageClient.create(storageSettings), + grpcStorageOptions.blobWriteSessionConfig.createFactory(Clock.systemUTC()), + defaultOpts); } catch (IOException e) { throw new IllegalStateException( "Unable to instantiate gRPC com.google.cloud.storage.Storage client.", e); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java index daab166f56..0372957788 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Storage.java @@ -16,6 +16,8 @@ package com.google.cloud.storage; +import static com.google.cloud.storage.CrossTransportUtils.fmtMethodName; +import static com.google.cloud.storage.CrossTransportUtils.throwGrpcOnly; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static java.util.Objects.requireNonNull; @@ -4615,4 +4617,52 @@ List testIamPermissions( */ @Override default void close() throws Exception {} + + /** + * Create a new {@link BlobWriteSession} for the specified {@code blobInfo} and {@code options}. + * + *

The returned {@code BlobWriteSession} can be used to write an individual version, a new + * session must be created each time you want to create a new version. + * + *

By default, any MD5 value in the provided {@code blobInfo} is ignored unless the option + * {@link BlobWriteOption#md5Match()} is included in {@code options}. + * + *

By default, any CRC32c value in the provided {@code blobInfo} is ignored unless the option + * {@link BlobWriteOption#crc32cMatch()} is included in {@code options}. + * + *

Example of creating an object using {@code BlobWriteSession}:

+ * + *
{@code
+   * String bucketName = "my-unique-bucket";
+   * String blobName = "my-blob-name";
+   * BlobId blobId = BlobId.of(bucketName, blobName);
+   * BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build();
+   * ReadableByteChannel readableByteChannel = ...;
+   * BlobWriteSession blobWriteSession = storage.blobWriteSession(blobInfo, BlobWriteOption.doesNotExist());
+   *
+   * // open the channel for writing
+   * try (WritableByteChannel writableByteChannel = blobWriteSession.open()) {
+   *   // copy all bytes
+   *   ByteStreams.copy(readableByteChannel, writableByteChannel);
+   * } catch (IOException e) {
+   *   // handle IOException
+   * }
+   *
+   * // get the resulting object metadata
+   * ApiFuture resultFuture = blobWriteSession.getResult();
+   * BlobInfo gen1 = resultFuture.get();
+   * }
+ * + * @param blobInfo blob to create + * @param options blob write options + * @since 2.26.0 This new api is in preview and is subject to breaking changes. + * @see BlobWriteSessionConfig + * @see BlobWriteSessionConfigs + * @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig) + */ + @BetaApi + @TransportCompatibility({Transport.GRPC}) + default BlobWriteSession blobWriteSession(BlobInfo blobInfo, BlobWriteOption... options) { + return throwGrpcOnly(fmtMethodName("blobWriteSession", BlobInfo.class, BlobWriteOption.class)); + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java new file mode 100644 index 0000000000..deb8a05043 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java @@ -0,0 +1,29 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; +import com.google.cloud.storage.UnifiedOpts.Opts; +import java.io.IOException; +import java.nio.file.Path; + +interface StorageInternal extends Storage { + + Blob internalCreateFrom(Path path, BlobInfo info, Opts opts) throws IOException; + + StorageWriteChannel internalWriter(BlobInfo info, Opts opts); +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/UnifiedOpts.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/UnifiedOpts.java index f51bc05da4..f794c29bcb 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/UnifiedOpts.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/UnifiedOpts.java @@ -1083,7 +1083,7 @@ private MatchGlob(String val) { @Override public Mapper listObjects() { - return GrpcStorageImpl.throwHttpJsonOnly( + return CrossTransportUtils.throwHttpJsonOnly( com.google.cloud.storage.Storage.BlobListOption.class, "matchGlob(String)"); } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/TransportCompatibilityTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/TransportCompatibilityTest.java index d22b18294d..76219beccd 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/TransportCompatibilityTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/TransportCompatibilityTest.java @@ -38,7 +38,7 @@ public void verifyUnsupportedMethodsGenerateMeaningfulException() { .setCredentials(NoCredentials.getInstance()) .build(); @SuppressWarnings("resource") - Storage s = new GrpcStorageImpl(options, null, Opts.empty()); + Storage s = new GrpcStorageImpl(options, null, null, Opts.empty()); ImmutableList messages = Stream.>of( s::batch, diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java new file mode 100644 index 0000000000..1bb24fb975 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java @@ -0,0 +1,121 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage.it; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.BlobWriteSession; +import com.google.cloud.storage.BlobWriteSessionConfigs; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.DataGenerator; +import com.google.cloud.storage.GrpcStorageOptions; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.BlobWriteOption; +import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.TransportCompatibility.Transport; +import com.google.cloud.storage.it.runner.StorageITRunner; +import com.google.cloud.storage.it.runner.annotations.Backend; +import com.google.cloud.storage.it.runner.annotations.Inject; +import com.google.cloud.storage.it.runner.annotations.SingleBackend; +import com.google.cloud.storage.it.runner.annotations.StorageFixture; +import com.google.cloud.storage.it.runner.registry.Generator; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(StorageITRunner.class) +@SingleBackend(Backend.PROD) +public final class ITBlobWriteSessionTest { + + @Inject + @StorageFixture(Transport.GRPC) + public Storage storage; + + @Inject public BucketInfo bucket; + + @Inject public Generator generator; + + @Test + public void allDefaults() throws Exception { + doTest(storage); + } + + @Test + public void overrideDefaultBufferSize() throws Exception { + GrpcStorageOptions options = + ((GrpcStorageOptions) storage.getOptions()) + .toBuilder() + .setBlobWriteSessionConfig( + BlobWriteSessionConfigs.getDefault().withChunkSize(256 * 1024)) + .build(); + try (Storage s = options.getService()) { + doTest(s); + } + } + + @Test + public void closingAnOpenedSessionWithoutCallingWriteShouldMakeAnEmptyObject() + throws IOException, ExecutionException, InterruptedException, TimeoutException { + BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + BlobWriteSession session = storage.blobWriteSession(info, BlobWriteOption.doesNotExist()); + + WritableByteChannel open = session.open(); + open.close(); + BlobInfo gen1 = session.getResult().get(1, TimeUnit.SECONDS); + System.out.println("gen1 = " + gen1); + + assertThat(gen1.getSize()).isEqualTo(0); + } + + @Test + public void attemptingToOpenASessionWhichResultsInFailureShouldThrowAStorageException() { + // attempt to write to a bucket which we have not created + String badBucketName = bucket.getName() + "x"; + BlobInfo info = BlobInfo.newBuilder(badBucketName, generator.randomObjectName()).build(); + + BlobWriteSession session = storage.blobWriteSession(info, BlobWriteOption.doesNotExist()); + StorageException se = assertThrows(StorageException.class, () -> session.open().close()); + + assertThat(se.getCode()).isEqualTo(404); + assertThat(se).hasMessageThat().contains(badBucketName); + } + + private void doTest(Storage underTest) throws Exception { + BlobWriteSession sess = + underTest.blobWriteSession( + BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(), + BlobWriteOption.doesNotExist()); + + byte[] bytes = DataGenerator.base64Characters().genBytes(512 * 1024); + try (WritableByteChannel w = sess.open()) { + w.write(ByteBuffer.wrap(bytes)); + } + + BlobInfo gen1 = sess.getResult().get(10, TimeUnit.SECONDS); + + byte[] allBytes = storage.readAllBytes(gen1.getBlobId()); + + assertThat(allBytes).isEqualTo(bytes); + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/AbstractStorageProxy.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/AbstractStorageProxy.java index 11440cb190..d264e5a6d0 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/AbstractStorageProxy.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/AbstractStorageProxy.java @@ -25,6 +25,7 @@ import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.BlobWriteSession; import com.google.cloud.storage.Bucket; import com.google.cloud.storage.BucketInfo; import com.google.cloud.storage.CopyWriter; @@ -478,6 +479,11 @@ public boolean deleteNotification(String bucket, String notificationId) { return delegate.deleteNotification(bucket, notificationId); } + @Override + public BlobWriteSession blobWriteSession(BlobInfo blobInfo, BlobWriteOption... options) { + return delegate.blobWriteSession(blobInfo, options); + } + @Override public void close() throws Exception { delegate.close();