diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java
index cc5e691e6b..efaf569a87 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java
@@ -19,6 +19,11 @@
import com.google.api.core.BetaApi;
import com.google.cloud.storage.GrpcStorageOptions.GrpcStorageDefaults;
import com.google.cloud.storage.Storage.BlobWriteOption;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
/**
* Factory class to select and construct {@link BlobWriteSessionConfig}s.
@@ -46,4 +51,54 @@ private BlobWriteSessionConfigs() {}
public static DefaultBlobWriteSessionConfig getDefault() {
return new DefaultBlobWriteSessionConfig(ByteSizeConstants._16MiB);
}
+
+ /**
+ * Create a new {@link BlobWriteSessionConfig} which will first buffer the content of the object
+ * to a temporary file under {@code java.io.tmpdir}.
+ *
+ *
Once the file on disk is closed, the entire file will then be uploaded to Google Cloud
+ * Storage.
+ *
+ * @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
+ * @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig)
+ */
+ @BetaApi
+ public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOException {
+ return bufferToDiskThenUpload(
+ Paths.get(System.getProperty("java.io.tmpdir"), "google-cloud-storage"));
+ }
+
+ /**
+ * Create a new {@link BlobWriteSessionConfig} which will first buffer the content of the object
+ * to a temporary file under the specified {@code path}.
+ *
+ *
Once the file on disk is closed, the entire file will then be uploaded to Google Cloud
+ * Storage.
+ *
+ * @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
+ * @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig)
+ */
+ @BetaApi
+ public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IOException {
+ return bufferToDiskThenUpload(ImmutableList.of(path));
+ }
+
+ /**
+ * Create a new {@link BlobWriteSessionConfig} which will first buffer the content of the object
+ * to a temporary file under one of the specified {@code paths}.
+ *
+ *
Once the file on disk is closed, the entire file will then be uploaded to Google Cloud
+ * Storage.
+ *
+ *
The specifics of how the work is spread across multiple paths is undefined and subject to
+ * change.
+ *
+ * @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
+ * @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig)
+ */
+ @BetaApi
+ public static BufferToDiskThenUpload bufferToDiskThenUpload(Collection paths)
+ throws IOException {
+ return new BufferToDiskThenUpload(ImmutableList.copyOf(paths), false);
+ }
}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferToDiskThenUpload.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferToDiskThenUpload.java
new file mode 100644
index 0000000000..fb20747a8c
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BufferToDiskThenUpload.java
@@ -0,0 +1,236 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.BetaApi;
+import com.google.api.core.InternalApi;
+import com.google.api.core.SettableApiFuture;
+import com.google.cloud.storage.Conversions.Decoder;
+import com.google.cloud.storage.Storage.BlobWriteOption;
+import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
+import com.google.cloud.storage.UnifiedOpts.Opts;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.storage.v2.WriteObjectResponse;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.stream.Collector;
+import javax.annotation.concurrent.Immutable;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+
+/**
+ * There are scenarios in which disk space is more plentiful than memory space. This new {@link
+ * BlobWriteSessionConfig} allows augmenting an instance of storage to produce {@link
+ * BlobWriteSession}s which will buffer to disk rather than holding things in memory.
+ *
+ * Once the file on disk is closed, the entire file will then be uploaded to GCS.
+ *
+ * @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
+ * @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig)
+ * @see BlobWriteSessionConfigs#bufferToDiskThenUpload(Path)
+ * @see BlobWriteSessionConfigs#bufferToDiskThenUpload(Collection)
+ */
+@Immutable
+@BetaApi
+public final class BufferToDiskThenUpload extends BlobWriteSessionConfig {
+ private static final long serialVersionUID = 9059242302276891867L;
+
+ /**
+ * non-final because of {@link java.io.Serializable}, however this field is effectively final as
+ * it is immutable and there is not reference mutator method.
+ */
+ @MonotonicNonNull private transient ImmutableList paths;
+
+ private final boolean includeLoggingSink;
+
+ /** Used for {@link java.io.Serializable} */
+ @MonotonicNonNull private volatile ArrayList absolutePaths;
+
+ @InternalApi
+ BufferToDiskThenUpload(ImmutableList paths, boolean includeLoggingSink) throws IOException {
+ this.paths = paths;
+ this.includeLoggingSink = includeLoggingSink;
+ }
+
+ @VisibleForTesting
+ @InternalApi
+ BufferToDiskThenUpload withIncludeLoggingSink() throws IOException {
+ return new BufferToDiskThenUpload(paths, true);
+ }
+
+ @InternalApi
+ @Override
+ WriterFactory createFactory(Clock clock) throws IOException {
+ Duration window = Duration.ofMinutes(10);
+ RecoveryFileManager recoveryFileManager =
+ RecoveryFileManager.of(paths, getRecoverVolumeSinkFactory(clock, window));
+ ThroughputSink gcs = ThroughputSink.windowed(ThroughputMovingWindow.of(window), clock);
+ gcs = includeLoggingSink ? ThroughputSink.tee(ThroughputSink.logged("gcs", clock), gcs) : gcs;
+ return new Factory(recoveryFileManager, clock, gcs);
+ }
+
+ private RecoveryFileManager.RecoverVolumeSinkFactory getRecoverVolumeSinkFactory(
+ Clock clock, Duration window) {
+ return path -> {
+ ThroughputSink windowed = ThroughputSink.windowed(ThroughputMovingWindow.of(window), clock);
+ if (includeLoggingSink) {
+ return ThroughputSink.tee(
+ ThroughputSink.logged(path.toAbsolutePath().toString(), clock), windowed);
+ } else {
+ return windowed;
+ }
+ };
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ if (absolutePaths == null) {
+ synchronized (this) {
+ if (absolutePaths == null) {
+ absolutePaths =
+ paths.stream()
+ .map(Path::toAbsolutePath)
+ .map(Path::toString)
+ .collect(
+ Collector.of(
+ ArrayList::new,
+ ArrayList::add,
+ (left, right) -> {
+ left.addAll(right);
+ return left;
+ }));
+ }
+ }
+ }
+ out.defaultWriteObject();
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ this.paths = absolutePaths.stream().map(Paths::get).collect(ImmutableList.toImmutableList());
+ }
+
+ private static final class Factory implements WriterFactory {
+
+ private final RecoveryFileManager recoveryFileManager;
+ private final Clock clock;
+ private final ThroughputSink gcs;
+
+ private Factory(RecoveryFileManager recoveryFileManager, Clock clock, ThroughputSink gcs) {
+ this.recoveryFileManager = recoveryFileManager;
+ this.clock = clock;
+ this.gcs = gcs;
+ }
+
+ @InternalApi
+ @Override
+ public WritableByteChannelSession, BlobInfo> writeSession(
+ StorageInternal storage,
+ BlobInfo info,
+ Opts opts,
+ Decoder d) {
+ return new Factory.WriteToFileThenUpload(
+ storage, info, opts, recoveryFileManager.newRecoveryFile(info));
+ }
+
+ private final class WriteToFileThenUpload
+ implements WritableByteChannelSession {
+
+ private final StorageInternal storage;
+ private final BlobInfo info;
+ private final Opts opts;
+ private final RecoveryFile rf;
+ private final SettableApiFuture result;
+
+ private WriteToFileThenUpload(
+ StorageInternal storage, BlobInfo info, Opts opts, RecoveryFile rf) {
+ this.info = info;
+ this.opts = opts;
+ this.rf = rf;
+ this.storage = storage;
+ this.result = SettableApiFuture.create();
+ }
+
+ @Override
+ public ApiFuture openAsync() {
+ try {
+ ApiFuture f = ApiFutures.immediateFuture(rf.writer());
+ return ApiFutures.transform(
+ f, Factory.WriteToFileThenUpload.Flusher::new, MoreExecutors.directExecutor());
+ } catch (IOException e) {
+ throw StorageException.coalesce(e);
+ }
+ }
+
+ @Override
+ public ApiFuture getResult() {
+ return result;
+ }
+
+ private final class Flusher implements WritableByteChannel {
+
+ private final WritableByteChannel delegate;
+
+ private Flusher(WritableByteChannel delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ return delegate.write(src);
+ }
+
+ @Override
+ public boolean isOpen() {
+ return delegate.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ try (RecoveryFile rf = Factory.WriteToFileThenUpload.this.rf) {
+ Path path = rf.getPath();
+ long size = Files.size(path);
+ ThroughputSink.computeThroughput(
+ clock,
+ gcs,
+ size,
+ () -> {
+ BlobInfo blob = storage.internalCreateFrom(path, info, opts);
+ result.set(blob);
+ });
+ } catch (StorageException | IOException e) {
+ result.setException(e);
+ throw e;
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java
index fdd67a7eb7..fad1f66e3a 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java
@@ -152,7 +152,8 @@
import org.checkerframework.checker.nullness.qual.Nullable;
@BetaApi
-final class GrpcStorageImpl extends BaseService implements StorageInternal {
+final class GrpcStorageImpl extends BaseService
+ implements Storage, StorageInternal {
private static final byte[] ZERO_BYTES = new byte[0];
private static final Set READ_OPS = ImmutableSet.of(StandardOpenOption.READ);
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/RecoveryFile.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/RecoveryFile.java
new file mode 100644
index 0000000000..d399ea9300
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/RecoveryFile.java
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Set;
+
+/**
+ * When uploading to GCS, there are times when memory buffers are not preferable. This class
+ * encapsulates the logic and lifecycle for a file written to local disk which can be used for
+ * upload recovery in the case an upload is interrupted.
+ */
+final class RecoveryFile implements AutoCloseable {
+ private static final Set writeOps =
+ ImmutableSet.of(StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+ private static final Set readOps = ImmutableSet.of(StandardOpenOption.READ);
+
+ private final Path path;
+ private final ThroughputSink throughputSink;
+ private final Runnable onCloseCallback;
+
+ RecoveryFile(Path path, ThroughputSink throughputSink, Runnable onCloseCallback) {
+ this.path = path;
+ this.throughputSink = throughputSink;
+ this.onCloseCallback = onCloseCallback;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public Path touch() throws IOException {
+ return Files.createFile(path);
+ }
+
+ public SeekableByteChannel reader() throws IOException {
+ return Files.newByteChannel(path, readOps);
+ }
+
+ public WritableByteChannel writer() throws IOException {
+ return throughputSink.decorate(Files.newByteChannel(path, writeOps));
+ }
+
+ @Override
+ public void close() throws IOException {
+ Files.delete(path);
+ onCloseCallback.run();
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("path", path)
+ .add("throughputSink", throughputSink)
+ .add("onCloseCallback", onCloseCallback)
+ .toString();
+ }
+
+ Unsafe unsafe() {
+ return new Unsafe();
+ }
+
+ final class Unsafe {
+ public Path touch() throws UnsafeIOException {
+ try {
+ return RecoveryFile.this.touch();
+ } catch (IOException e) {
+ throw new UnsafeIOException(e);
+ }
+ }
+ }
+
+ static final class UnsafeIOException extends RuntimeException {
+ private UnsafeIOException(IOException cause) {
+ super(cause);
+ }
+ }
+}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/RecoveryFileManager.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/RecoveryFileManager.java
new file mode 100644
index 0000000000..25303b5fa4
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/RecoveryFileManager.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Ints;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+final class RecoveryFileManager {
+
+ private final ImmutableList volumes;
+ /** Keep track of active info and file */
+ private final Map files;
+
+ /**
+ * Round-robin assign recovery files to the configured volumes. Use this index to keep track of
+ * which volume to assign to next.
+ */
+ private int nextVolumeIndex;
+
+ private RecoveryFileManager(List volumes) {
+ this.volumes = ImmutableList.copyOf(volumes);
+ this.files = Collections.synchronizedMap(new HashMap<>());
+ this.nextVolumeIndex = 0;
+ }
+
+ public RecoveryFile newRecoveryFile(BlobInfo info) {
+ int i = getNextVolumeIndex();
+ RecoveryVolume v = volumes.get(i);
+ int hashCode = info.hashCode();
+ String fileName = Base64.getUrlEncoder().encodeToString(Ints.toByteArray(hashCode));
+ Path path = v.basePath.resolve(fileName);
+ RecoveryFile recoveryFile = new RecoveryFile(path, v.sink, () -> files.remove(info));
+ files.put(info, recoveryFile);
+ return recoveryFile;
+ }
+
+ private synchronized int getNextVolumeIndex() {
+ return nextVolumeIndex = (nextVolumeIndex + 1) % volumes.size();
+ }
+
+ static RecoveryFileManager of(List volumes) throws IOException {
+ return of(volumes, p -> ThroughputSink.nullSink());
+ }
+
+ static RecoveryFileManager of(List volumes, RecoverVolumeSinkFactory factory)
+ throws IOException {
+ checkArgument(!volumes.isEmpty(), "At least one volume must be specified");
+ checkArgument(
+ volumes.stream().allMatch(p -> !Files.exists(p) || Files.isDirectory(p)),
+ "All provided volumes must either:\n1. Not yet exists\n2. Be directories");
+
+ for (Path v : volumes) {
+ if (!Files.exists(v)) {
+ Files.createDirectories(v);
+ }
+ }
+ ImmutableList recoveryVolumes =
+ volumes.stream()
+ .map(p -> RecoveryVolume.of(p, factory.apply(p)))
+ .collect(ImmutableList.toImmutableList());
+ return new RecoveryFileManager(recoveryVolumes);
+ }
+
+ @FunctionalInterface
+ interface RecoverVolumeSinkFactory {
+ ThroughputSink apply(Path p);
+ }
+
+ static final class RecoveryVolume {
+ private final Path basePath;
+ private final ThroughputSink sink;
+
+ private RecoveryVolume(Path basePath, ThroughputSink sink) {
+ this.basePath = basePath;
+ this.sink = sink;
+ }
+
+ public static RecoveryVolume of(Path basePath, ThroughputSink sink) {
+ return new RecoveryVolume(basePath, sink);
+ }
+ }
+}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java
index deb8a05043..53fdc4e9a6 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageInternal.java
@@ -21,9 +21,10 @@
import java.io.IOException;
import java.nio.file.Path;
-interface StorageInternal extends Storage {
+interface StorageInternal {
- Blob internalCreateFrom(Path path, BlobInfo info, Opts opts) throws IOException;
+ BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts opts)
+ throws IOException;
StorageWriteChannel internalWriter(BlobInfo info, Opts opts);
}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java
index 5ef6e37d10..776629cf34 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ThroughputSink.java
@@ -69,6 +69,10 @@ static ThroughputSink tee(ThroughputSink a, ThroughputSink b) {
return new TeeThroughputSink(a, b);
}
+ static ThroughputSink nullSink() {
+ return NullThroughputSink.INSTANCE;
+ }
+
final class Record {
private final long numBytes;
private final Instant begin;
@@ -166,6 +170,11 @@ public void recordThroughput(Record r) {
public WritableByteChannel decorate(WritableByteChannel wbc) {
return new ThroughputRecordingWritableByteChannel(wbc, this, clock);
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("prefix", prefix).add("clock", clock).toString();
+ }
}
final class ThroughputRecordingWritableByteChannel implements WritableByteChannel {
@@ -206,6 +215,15 @@ public boolean isOpen() {
public void close() throws IOException {
delegate.close();
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("delegate", delegate)
+ .add("sink", sink)
+ .add("clock", clock)
+ .toString();
+ }
}
final class TeeThroughputSink implements ThroughputSink {
@@ -227,6 +245,11 @@ public void recordThroughput(Record r) {
public WritableByteChannel decorate(WritableByteChannel wbc) {
return b.decorate(a.decorate(wbc));
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("a", a).add("b", b).toString();
+ }
}
final class ThroughputMovingWindowThroughputSink implements ThroughputSink {
@@ -247,5 +270,24 @@ public synchronized void recordThroughput(Record r) {
public WritableByteChannel decorate(WritableByteChannel wbc) {
return new ThroughputRecordingWritableByteChannel(wbc, this, clock);
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("w", w).add("clock", clock).toString();
+ }
+ }
+
+ final class NullThroughputSink implements ThroughputSink {
+ private static final NullThroughputSink INSTANCE = new NullThroughputSink();
+
+ private NullThroughputSink() {}
+
+ @Override
+ public void recordThroughput(Record r) {}
+
+ @Override
+ public WritableByteChannel decorate(WritableByteChannel wbc) {
+ return wbc;
+ }
}
}
diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BufferToDiskThenUploadTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BufferToDiskThenUploadTest.java
new file mode 100644
index 0000000000..522be99b2c
--- /dev/null
+++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BufferToDiskThenUploadTest.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import static com.google.cloud.storage.TestUtils.xxd;
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.cloud.storage.BlobWriteSessionConfig.WriterFactory;
+import com.google.cloud.storage.Conversions.Decoder;
+import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
+import com.google.cloud.storage.UnifiedOpts.Opts;
+import com.google.storage.v2.WriteObjectResponse;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+public final class BufferToDiskThenUploadTest {
+
+ private static final Decoder
+ WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER =
+ Conversions.grpc().blobInfo().compose(WriteObjectResponse::getResource);
+ @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+ @Rule public final TestName testName = new TestName();
+
+ @Test
+ public void happyPath() throws IOException {
+ Path tempDir = temporaryFolder.newFolder(testName.getMethodName()).toPath();
+
+ BufferToDiskThenUpload btdtu =
+ BlobWriteSessionConfigs.bufferToDiskThenUpload(tempDir).withIncludeLoggingSink();
+ TestClock clock = TestClock.tickBy(Instant.EPOCH, Duration.ofSeconds(1));
+ WriterFactory factory = btdtu.createFactory(clock);
+
+ BlobInfo blobInfo = BlobInfo.newBuilder("bucket", "object").build();
+ AtomicReference actualBytes = new AtomicReference<>(null);
+ WritableByteChannelSession, BlobInfo> writeSession =
+ factory.writeSession(
+ new StorageInternal() {
+ @Override
+ public BlobInfo internalCreateFrom(
+ Path path, BlobInfo info, Opts opts) throws IOException {
+ byte[] actual = Files.readAllBytes(path);
+ actualBytes.compareAndSet(null, actual);
+ return info;
+ }
+
+ @Override
+ public StorageWriteChannel internalWriter(BlobInfo info, Opts opts) {
+ return null;
+ }
+ },
+ blobInfo,
+ Opts.empty(),
+ WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER);
+
+ byte[] bytes = DataGenerator.base64Characters().genBytes(128);
+ try (WritableByteChannel open = writeSession.open()) {
+ open.write(ByteBuffer.wrap(bytes));
+ }
+ String xxdActual = xxd(actualBytes.get());
+ String xxdExpected = xxd(bytes);
+ assertThat(xxdActual).isEqualTo(xxdExpected);
+ }
+}
diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/RecoveryFileManagerTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/RecoveryFileManagerTest.java
new file mode 100644
index 0000000000..abf68893bb
--- /dev/null
+++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/RecoveryFileManagerTest.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.ByteStreams;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Objects;
+import java.util.stream.Stream;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+public final class RecoveryFileManagerTest {
+ private static final int _128KiB = 128 * 1024;
+
+ @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Rule public final TestName testName = new TestName();
+
+ private final TestClock clock = TestClock.tickBy(Instant.EPOCH, Duration.ofSeconds(1));
+
+ @Test
+ public void happyPath() throws IOException {
+ Path tempDir = temporaryFolder.newFolder(testName.getMethodName()).toPath();
+ RecoveryFileManager rfm =
+ RecoveryFileManager.of(
+ ImmutableList.of(tempDir),
+ path -> ThroughputSink.logged(path.toAbsolutePath().toString(), clock));
+
+ BlobInfo info = BlobInfo.newBuilder("bucket", "object").build();
+ try (RecoveryFile recoveryFile = rfm.newRecoveryFile(info)) {
+
+ byte[] bytes = DataGenerator.base64Characters().genBytes(_128KiB);
+ try (WritableByteChannel writer = recoveryFile.writer()) {
+ writer.write(ByteBuffer.wrap(bytes));
+ }
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (SeekableByteChannel r = recoveryFile.reader();
+ WritableByteChannel w = Channels.newChannel(baos)) {
+ long copy = ByteStreams.copy(r, w);
+ assertThat(copy).isEqualTo(_128KiB);
+ }
+
+ assertThat(baos.toByteArray()).isEqualTo(bytes);
+ }
+
+ try (Stream stream = Files.list(tempDir)) {
+ boolean b = stream.anyMatch(Objects::nonNull);
+ assertThat(b).isFalse();
+ }
+ }
+
+ @Test
+ public void argValidation_nonEmpty() {
+ IllegalArgumentException iae =
+ assertThrows(
+ IllegalArgumentException.class, () -> RecoveryFileManager.of(ImmutableList.of()));
+
+ assertThat(iae).hasMessageThat().isNotEmpty();
+ }
+
+ @Test
+ public void argValidation_fileInsteadOfDirectory() throws IOException {
+ Path tempDir = temporaryFolder.newFile(testName.getMethodName()).toPath();
+
+ IllegalArgumentException iae =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> RecoveryFileManager.of(ImmutableList.of(tempDir)));
+
+ assertThat(iae).hasMessageThat().isNotEmpty();
+ }
+
+ @Test
+ public void argValidation_directoryDoesNotExistIsCreated() throws IOException {
+ Path tempDir = temporaryFolder.newFolder(testName.getMethodName()).toPath();
+
+ Path subPathA = tempDir.resolve("sub/path/a");
+
+ assertThat(Files.exists(subPathA)).isFalse();
+ RecoveryFileManager rfm = RecoveryFileManager.of(ImmutableList.of(subPathA));
+ assertThat(Files.exists(subPathA)).isTrue();
+ }
+
+ @Test
+ public void fileAssignmentIsRoundRobin() throws IOException {
+ Path tempDir1 = temporaryFolder.newFolder(testName.getMethodName() + "1").toPath();
+ Path tempDir2 = temporaryFolder.newFolder(testName.getMethodName() + "2").toPath();
+ Path tempDir3 = temporaryFolder.newFolder(testName.getMethodName() + "3").toPath();
+ RecoveryFileManager rfm =
+ RecoveryFileManager.of(ImmutableList.of(tempDir1, tempDir2, tempDir3));
+
+ BlobInfo info1 = BlobInfo.newBuilder("bucket", "object1").build();
+ BlobInfo info2 = BlobInfo.newBuilder("bucket", "object2").build();
+ BlobInfo info3 = BlobInfo.newBuilder("bucket", "object3").build();
+ try (RecoveryFile recoveryFile1 = rfm.newRecoveryFile(info1);
+ RecoveryFile recoveryFile2 = rfm.newRecoveryFile(info2);
+ RecoveryFile recoveryFile3 = rfm.newRecoveryFile(info3)) {
+
+ ImmutableSet paths =
+ Stream.of(recoveryFile1, recoveryFile2, recoveryFile3)
+ .map(rf -> rf.unsafe().touch())
+ .map(Path::toAbsolutePath)
+ .collect(ImmutableSet.toImmutableSet());
+
+ ImmutableSet parentDirs =
+ Stream.of(recoveryFile1, recoveryFile2, recoveryFile3)
+ .map(RecoveryFile::getPath)
+ .map(Path::getParent)
+ .collect(ImmutableSet.toImmutableSet());
+
+ assertThat(paths).hasSize(3);
+ assertThat(parentDirs).isEqualTo(ImmutableSet.of(tempDir1, tempDir2, tempDir3));
+ }
+ }
+}
diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/SerializationTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/SerializationTest.java
index 87b88a78b8..ae3a140f52 100644
--- a/google-cloud-storage/src/test/java/com/google/cloud/storage/SerializationTest.java
+++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/SerializationTest.java
@@ -170,35 +170,47 @@ protected Serializable[] serializableObjects() {
.add(UnifiedOpts.md5MatchExtractor())
.build();
- return new Serializable[] {
- ACL_DOMAIN,
- ACL_GROUP,
- ACL_PROJECT_,
- ACL_USER,
- ACL_RAW,
- ACL,
- BLOB_INFO,
- BLOB,
- BUCKET_INFO,
- BUCKET,
- ORIGIN,
- CORS,
- PAGE_RESULT,
- BLOB_LIST_OPTIONS,
- BLOB_SOURCE_OPTIONS,
- BLOB_TARGET_OPTIONS,
- BUCKET_LIST_OPTIONS,
- BUCKET_SOURCE_OPTIONS,
- BUCKET_TARGET_OPTIONS,
- STORAGE_EXCEPTION,
- optionsDefault1,
- optionsDefault2,
- optionsHttp1,
- optionsHttp2,
- optionsGrpc1,
- optionsGrpc2,
- serializableOpts
- };
+ try {
+ GrpcStorageOptions grpcStorageOptionsBufferToTemp =
+ StorageOptions.grpc()
+ .setCredentials(NoCredentials.getInstance())
+ .setProjectId("project1")
+ .setBlobWriteSessionConfig(BlobWriteSessionConfigs.bufferToTempDirThenUpload())
+ .build();
+
+ return new Serializable[] {
+ ACL_DOMAIN,
+ ACL_GROUP,
+ ACL_PROJECT_,
+ ACL_USER,
+ ACL_RAW,
+ ACL,
+ BLOB_INFO,
+ BLOB,
+ BUCKET_INFO,
+ BUCKET,
+ ORIGIN,
+ CORS,
+ PAGE_RESULT,
+ BLOB_LIST_OPTIONS,
+ BLOB_SOURCE_OPTIONS,
+ BLOB_TARGET_OPTIONS,
+ BUCKET_LIST_OPTIONS,
+ BUCKET_SOURCE_OPTIONS,
+ BUCKET_TARGET_OPTIONS,
+ STORAGE_EXCEPTION,
+ optionsDefault1,
+ optionsDefault2,
+ optionsHttp1,
+ optionsHttp2,
+ optionsGrpc1,
+ optionsGrpc2,
+ serializableOpts,
+ grpcStorageOptionsBufferToTemp
+ };
+ } catch (IOException ioe) {
+ throw new AssertionError(ioe);
+ }
}
@Override
diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java
index 1bb24fb975..1113d0231a 100644
--- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java
+++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java
@@ -61,6 +61,18 @@ public void allDefaults() throws Exception {
doTest(storage);
}
+ @Test
+ public void bufferToTempDirThenUpload() throws Exception {
+ GrpcStorageOptions options =
+ ((GrpcStorageOptions) storage.getOptions())
+ .toBuilder()
+ .setBlobWriteSessionConfig(BlobWriteSessionConfigs.bufferToTempDirThenUpload())
+ .build();
+ try (Storage s = options.getService()) {
+ doTest(s);
+ }
+ }
+
@Test
public void overrideDefaultBufferSize() throws Exception {
GrpcStorageOptions options =