Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: port ParallelCompositeUploadBlobWriteSessionConfig to work with HttpStorageOptions #2474

Merged
merged 1 commit into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@
* Break the stream of bytes into smaller part objects uploading each part in parallel. Then
* composing the parts together to make the ultimate object.
* </td>
* <td>gRPC</td>
* <td>gRPC, HTTP</td>
* <td>
* <ol>
* <li>
Expand Down Expand Up @@ -342,7 +342,7 @@ public static JournalingBlobWriteSessionConfig journaling(Collection<Path> paths
* @since 2.28.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
@TransportCompatibility({Transport.GRPC})
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
public static ParallelCompositeUploadBlobWriteSessionConfig parallelCompositeUpload() {
return ParallelCompositeUploadBlobWriteSessionConfig.withDefaults();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@
*/
@Immutable
@BetaApi
@TransportCompatibility({Transport.GRPC})
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
public final class ParallelCompositeUploadBlobWriteSessionConfig extends BlobWriteSessionConfig
implements BlobWriteSessionConfig.GrpcCompatible {
implements BlobWriteSessionConfig.HttpCompatible, BlobWriteSessionConfig.GrpcCompatible {

private static final int MAX_PARTS_PER_COMPOSE = 32;
private final int maxPartsPerCompose;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,10 @@ private BlobInfo definePart(BlobInfo ultimateObject, PartRange partRange, long o
PART_INDEX.appendTo(partRange, builder);
OBJECT_OFFSET.appendTo(offset, builder);
b.setMetadata(builder.build());
// the value of a kms key name will contain the exact version when read from gcs
// however, gcs will not accept that version resource identifier when creating a new object
// strip it out, so it can be included as a query string parameter instead
b.setKmsKeyName(null);
b = partMetadataFieldDecorator.apply(b);
return b.build();
}
Expand Down Expand Up @@ -507,7 +511,11 @@ private ApiFuture<Boolean> deleteAsync(BlobId id) {
@VisibleForTesting
@NonNull
static Opts<ObjectTargetOpt> getPartOpts(Opts<ObjectTargetOpt> opts) {
return opts.filter(TO_EXCLUDE_FROM_PARTS).prepend(DOES_NOT_EXIST);
return opts.filter(TO_EXCLUDE_FROM_PARTS)
.prepend(DOES_NOT_EXIST)
// disable gzip transfer encoding for HTTP, it causes a significant bottleneck uploading
// the parts
.prepend(Opts.from(UnifiedOpts.disableGzipContent()));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;

/**
* Facade which makes an instance of {@link RewindableContent} appear as an input stream.
*
* <p>It does this by calling {@link RewindableContent#writeTo(GatheringByteChannel)} on an
* anonymous channel which closes over the read destination.
*/
final class RewindableContentInputStream extends InputStream {

private final RewindableContent content;

RewindableContentInputStream(RewindableContent content) {
this.content = content;
}

@Override
public int read() throws IOException {
byte[] tmp = new byte[1];
int read = read(tmp);
if (read == -1) {
return -1;
} else {
return tmp[0] & 0xFF;
}
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
// define a byte buffer as the destination for our write
ByteBuffer dst = ByteBuffer.wrap(b, off, len);
int remaining = dst.remaining();
if (remaining == 0) {
return 0;
}
long written =
content.writeTo(
new AnonWritableByteChannel() {
@Override
public long write(ByteBuffer[] srcs, int offset, int length) {
// srcs here is the bytes of content
long total = 0;
for (int i = offset; i < length; i++) {
ByteBuffer src = srcs[i];
// copy what we can from our src to the dst buffer
long written = Buffers.copy(src, dst);
total += written;
}
return total;
}
});
// if the dst has space, but we didn't write anything means we didn't have anything to write
if (written == 0) {
return -1;
}
return Math.toIntExact(written);
}

private abstract static class AnonWritableByteChannel implements UnbufferedWritableByteChannel {

@Override
public boolean isOpen() {
return true;
}

@Override
public void close() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1694,4 +1694,72 @@ public BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetOp
}
return codecs.blobInfo().decode(object);
}

@Override
public BlobInfo internalDirectUpload(BlobInfo info, Opts<ObjectTargetOpt> opts, ByteBuffer buf) {

BlobInfo.Builder builder =
info.toBuilder()
.setMd5(
BaseEncoding.base64().encode(Hashing.md5().hashBytes(buf.duplicate()).asBytes()))
.setCrc32c(
BaseEncoding.base64()
.encode(Ints.toByteArray(Hashing.crc32c().hashBytes(buf.duplicate()).asInt())));
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();

BlobInfo updated = opts.blobInfoMapper().apply(builder).build();
final StorageObject encoded = codecs.blobInfo().encode(updated);
ResultRetryAlgorithm<?> algorithm =
retryAlgorithmManager.getForObjectsCreate(encoded, optionsMap);
RewindableContent content = RewindableContent.of(buf);
return run(
algorithm,
() -> {
content.rewindTo(0);
return storageRpc.create(encoded, new RewindableContentInputStream(content), optionsMap);
},
Conversions.json().blobInfo()::decode);
}

/**
* Behavioral difference compared to {@link #delete(BlobId, BlobSourceOption...)} instead of
* returning false when an object does not exist, we throw an exception.
*/
@Override
public Void internalObjectDelete(BlobId id, Opts<ObjectSourceOpt> opts) {
final StorageObject storageObject = codecs.blobId().encode(id);
ImmutableMap<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
ResultRetryAlgorithm<?> algorithm =
retryAlgorithmManager.getForObjectsDelete(storageObject, optionsMap);
return run(
algorithm,
() -> {
boolean deleted = storageRpc.delete(storageObject, optionsMap);
// HttpStorageRpc turns a 404 into false, our code needs to know 404
if (!deleted) {
throw new StorageException(404, "NOT_FOUND", null, null);
}
return null;
},
Function.identity());
}

@Override
public BlobInfo internalObjectGet(BlobId blobId, Opts<ObjectSourceOpt> opts) {
StorageObject storedObject = codecs.blobId().encode(blobId);
ImmutableMap<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
ResultRetryAlgorithm<?> algorithm =
retryAlgorithmManager.getForObjectsGet(storedObject, optionsMap);
return run(
algorithm,
() -> {
StorageObject storageObject = storageRpc.get(storedObject, optionsMap);
// HttpStorageRpc turns a 404 into null, our code needs to know 404
if (storageObject == null) {
throw new StorageException(404, "NOT_FOUND", null, null);
}
return storageObject;
},
codecs.blobInfo()::decode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.client.util.Data;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.Storage.Objects.Compose;
import com.google.api.services.storage.Storage.Objects.Get;
import com.google.api.services.storage.Storage.Objects.Insert;
import com.google.api.services.storage.model.Bucket;
Expand Down Expand Up @@ -755,13 +756,15 @@ public StorageObject compose(
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_COMPOSE);
Scope scope = tracer.withSpan(span);
try {
return storage
.objects()
.compose(target.getBucket(), target.getName(), request)
.setIfMetagenerationMatch(Option.IF_METAGENERATION_MATCH.getLong(targetOptions))
.setIfGenerationMatch(Option.IF_GENERATION_MATCH.getLong(targetOptions))
.setUserProject(Option.USER_PROJECT.getString(targetOptions))
.execute();
Compose compose =
storage
.objects()
.compose(target.getBucket(), target.getName(), request)
.setIfMetagenerationMatch(Option.IF_METAGENERATION_MATCH.getLong(targetOptions))
.setIfGenerationMatch(Option.IF_GENERATION_MATCH.getLong(targetOptions))
.setUserProject(Option.USER_PROJECT.getString(targetOptions));
setEncryptionHeaders(compose.getRequestHeaders(), ENCRYPTION_KEY_PREFIX, targetOptions);
return compose.execute();
} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
throw translate(ex);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import static com.google.cloud.storage.ByteSizeConstants._256KiB;
import static com.google.cloud.storage.TestUtils.xxd;
import static com.google.common.truth.Truth.assertThat;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.junit.Test;

public final class RewindableContentInputStreamTest {

@Test
public void read_empty() throws IOException {
RewindableContent content = RewindableContent.empty();
try (RewindableContentInputStream in = new RewindableContentInputStream(content)) {
int read = in.read();
assertThat(read).isEqualTo(-1);
}
}

@Test
public void readB_emptySrc() throws IOException {
RewindableContent content = RewindableContent.empty();
try (RewindableContentInputStream in = new RewindableContentInputStream(content)) {
int read = in.read(new byte[1]);
assertThat(read).isEqualTo(-1);
}
}

@Test
public void readB_emptyDst() throws IOException {
byte[] bytes = DataGenerator.base64Characters().genBytes(1);
RewindableContent content = RewindableContent.of(ByteBuffer.wrap(bytes));
try (RewindableContentInputStream in = new RewindableContentInputStream(content)) {
byte[] tmp = new byte[0];
int read = in.read(tmp);
assertThat(read).isEqualTo(0);
}
}

@Test
public void readB_singleByte() throws IOException {
byte[] bytes = DataGenerator.base64Characters().genBytes(1);
RewindableContent content = RewindableContent.of(ByteBuffer.wrap(bytes));
try (RewindableContentInputStream in = new RewindableContentInputStream(content)) {
byte[] tmp = new byte[_256KiB];
int read = in.read(tmp);
assertThat(read).isEqualTo(1);
assertThat(tmp[0]).isEqualTo(bytes[0]);
}
}

@Test
public void read_singleByte() throws IOException {
byte[] bytes = DataGenerator.base64Characters().genBytes(1);
RewindableContent content = RewindableContent.of(ByteBuffer.wrap(bytes));
try (RewindableContentInputStream in = new RewindableContentInputStream(content)) {
int read = in.read();
assertThat(read).isEqualTo(bytes[0]);
}
}

@Test
public void readB_multiContent() throws IOException {
byte[] bytes = DataGenerator.base64Characters().genBytes(30);
RewindableContent content =
RewindableContent.of(
ByteBuffer.wrap(bytes, 0, 10),
ByteBuffer.wrap(bytes, 10, 10),
ByteBuffer.wrap(bytes, 20, 10));
try (RewindableContentInputStream in = new RewindableContentInputStream(content)) {
byte[] tmp = new byte[_256KiB];
int read = in.read(tmp);
assertThat(read).isEqualTo(30);
assertThat(xxd(ByteString.copyFrom(tmp, 0, read))).isEqualTo(xxd(bytes));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.DataGenerator;
import com.google.cloud.storage.GrpcStorageOptions;
import com.google.cloud.storage.HttpStorageOptions;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferAllocationStrategy;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.ExecutorSupplier;
Expand Down Expand Up @@ -73,7 +74,7 @@

@RunWith(StorageITRunner.class)
@CrossRun(
transports = {Transport.GRPC},
transports = {Transport.HTTP, Transport.GRPC},
backends = {Backend.PROD})
public final class ITParallelCompositeUploadBlobWriteSessionConfigTest {

Expand Down Expand Up @@ -125,6 +126,12 @@ public void setUp() throws Exception {
.toBuilder()
.setBlobWriteSessionConfig(pcu)
.build();
} else if (transport == Transport.HTTP) {
storageOptions =
((HttpStorageOptions) injectedStorage.getOptions())
.toBuilder()
.setBlobWriteSessionConfig(pcu)
.build();
}
assertWithMessage("unable to resolve options").that(storageOptions).isNotNull();
//noinspection DataFlowIssue
Expand Down
Loading