Skip to content

Commit

Permalink
Merge pull request #69 from aozarov/temp2
Browse files Browse the repository at this point in the history
extract reader/writer impl, increase their default buffer and minimize footprint when possible.
  • Loading branch information
aozarov committed May 19, 2015
2 parents c081812 + c394656 commit a43992a
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 224 deletions.
4 changes: 3 additions & 1 deletion src/main/java/com/google/gcloud/examples/StorageExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.gcloud.examples;

import com.google.gcloud.RetryParams;
import com.google.gcloud.spi.StorageRpc.Tuple;
import com.google.gcloud.storage.BatchRequest;
import com.google.gcloud.storage.BatchResponse;
Expand Down Expand Up @@ -498,7 +499,8 @@ public static void main(String... args) throws Exception {
printUsage();
return;
}
StorageServiceOptions.Builder optionsBuilder = StorageServiceOptions.builder();
StorageServiceOptions.Builder optionsBuilder =
StorageServiceOptions.builder().retryParams(RetryParams.getDefaultInstance());
StorageAction action;
if (args.length >= 2 && !ACTIONS.containsKey(args[0])) {
optionsBuilder.projectId(args[0]);
Expand Down
140 changes: 140 additions & 0 deletions src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2015 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.gcloud.storage;

import static com.google.gcloud.RetryHelper.runWithRetries;

import com.google.api.services.storage.model.StorageObject;
import com.google.gcloud.spi.StorageRpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.Callable;

/**
* Default implementation for BlobReadChannel.
*/
class BlobReadChannelImpl implements BlobReadChannel {

private static final int MIN_BUFFER_SIZE = 2 * 1024 * 1024;
private static final long serialVersionUID = 4821762590742862669L;

private final StorageServiceOptions serviceOptions;
private final Blob blob;
private final Map<StorageRpc.Option, ?> requestOptions;
private int position;
private boolean isOpen;
private boolean endOfStream;

private transient StorageRpc storageRpc;
private transient StorageObject storageObject;
private transient int bufferPos;
private transient byte[] buffer;

BlobReadChannelImpl(StorageServiceOptions serviceOptions, Blob blob,
Map<StorageRpc.Option, ?> requestOptions) {
this.serviceOptions = serviceOptions;
this.blob = blob;
this.requestOptions = requestOptions;
isOpen = true;
initTransients();
}

private void writeObject(ObjectOutputStream out) throws IOException {
if (buffer != null) {
position += bufferPos;
buffer = null;
bufferPos = 0;
endOfStream = false;
}
out.defaultWriteObject();
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
initTransients();
}

private void initTransients() {
storageRpc = serviceOptions.storageRpc();
storageObject = blob.toPb();
}

@Override
public boolean isOpen() {
return isOpen;
}

@Override
public void close() {
if (isOpen) {
buffer = null;
isOpen = false;
}
}

private void validateOpen() throws IOException {
if (!isOpen) {
throw new IOException("stream is closed");
}
}

@Override
public void seek(int position) throws IOException {
validateOpen();
this.position = position;
buffer = null;
bufferPos = 0;
endOfStream = false;
}

@Override
public int read(ByteBuffer byteBuffer) throws IOException {
validateOpen();
if (buffer == null) {
if (endOfStream) {
return -1;
}
final int toRead = Math.max(byteBuffer.remaining(), MIN_BUFFER_SIZE);
buffer = runWithRetries(new Callable<byte[]>() {
@Override
public byte[] call() {
return storageRpc.read(storageObject, requestOptions, position, toRead);
}
}, serviceOptions.retryParams(), StorageServiceImpl.EXCEPTION_HANDLER);
if (toRead > buffer.length) {
endOfStream = true;
if (buffer.length == 0) {
buffer = null;
return -1;
}
}
}
int toWrite = Math.min(buffer.length - bufferPos, byteBuffer.remaining());
byteBuffer.put(buffer, bufferPos, toWrite);
bufferPos += toWrite;
if (bufferPos >= buffer.length) {
position += buffer.length;
buffer = null;
bufferPos = 0;
}
return toWrite;
}
}
138 changes: 138 additions & 0 deletions src/main/java/com/google/gcloud/storage/BlobWriterChannelImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2015 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.gcloud.storage;

import static com.google.gcloud.RetryHelper.runWithRetries;
import static java.util.concurrent.Executors.callable;

import com.google.api.services.storage.model.StorageObject;
import com.google.gcloud.spi.StorageRpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;

/**
* Default implementation for BlobWriteChannel.
*/
class BlobWriterChannelImpl implements BlobWriteChannel {

private static final long serialVersionUID = 8675286882724938737L;
private static final int CHUNK_SIZE = 256 * 1024;
private static final int MIN_BUFFER_SIZE = 8 * CHUNK_SIZE;

private final StorageServiceOptions options;
private final Blob blob;
private final String uploadId;
private int position;
private byte[] buffer = new byte[0];
private int limit;
private boolean isOpen = true;

private transient StorageRpc storageRpc;
private transient StorageObject storageObject;

public BlobWriterChannelImpl(StorageServiceOptions options, Blob blob,
Map<StorageRpc.Option, ?> optionsMap) {
this.options = options;
this.blob = blob;
initTransients();
uploadId = options.storageRpc().open(storageObject, optionsMap);
}

private void writeObject(ObjectOutputStream out) throws IOException {
if (isOpen) {
flush(true);
}
out.defaultWriteObject();
}

private void flush(boolean compact) {
if (limit >= MIN_BUFFER_SIZE || compact && limit >= CHUNK_SIZE) {
final int length = limit - limit % CHUNK_SIZE;
runWithRetries(callable(new Runnable() {
@Override
public void run() {
storageRpc.write(uploadId, buffer, 0, storageObject, position, length, false);
}
}), options.retryParams(), StorageServiceImpl.EXCEPTION_HANDLER);
position += length;
limit -= length;
byte[] temp = new byte[compact ? limit : MIN_BUFFER_SIZE];
System.arraycopy(buffer, length, temp, 0, limit);
buffer = temp;
}
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
if (isOpen) {
initTransients();
}
}

private void initTransients() {
storageRpc = options.storageRpc();
storageObject = blob.toPb();
}

private void validateOpen() throws IOException {
if (!isOpen) {
throw new IOException("stream is closed");
}
}

@Override
public int write(ByteBuffer byteBuffer) throws IOException {
validateOpen();
int toWrite = byteBuffer.remaining();
int spaceInBuffer = buffer.length - limit;
if (spaceInBuffer >= toWrite) {
byteBuffer.get(buffer, limit, toWrite);
} else {
buffer = Arrays.copyOf(buffer,
Math.max(MIN_BUFFER_SIZE, buffer.length + toWrite - spaceInBuffer));
byteBuffer.get(buffer, limit, toWrite);
}
limit += toWrite;
flush(false);
return toWrite;
}

@Override
public boolean isOpen() {
return isOpen;
}

@Override
public void close() throws IOException {
if (isOpen) {
runWithRetries(callable(new Runnable() {
@Override
public void run() {
storageRpc.write(uploadId, buffer, 0, storageObject, position, limit, true);
}
}), options.retryParams(), StorageServiceImpl.EXCEPTION_HANDLER);
position += buffer.length;
isOpen = false;
buffer = null;
}
}
}
Loading

0 comments on commit a43992a

Please sign in to comment.