Skip to content

Commit

Permalink
Add blocking waitFor method, remove whenDone and callback
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed May 23, 2016
1 parent e29f635 commit 9ba2701
Show file tree
Hide file tree
Showing 15 changed files with 333 additions and 456 deletions.
50 changes: 12 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ Complete source code can be found at
```java
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FormatOptions;
Expand All @@ -156,8 +155,6 @@ import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import java.util.List;
BigQuery bigquery = BigQueryOptions.defaultInstance().service();
TableId tableId = TableId.of("dataset", "table");
Table table = bigquery.getTable(tableId);
Expand All @@ -169,17 +166,12 @@ if (table == null) {
}
System.out.println("Loading data into table " + tableId);
Job loadJob = table.load(FormatOptions.csv(), "gs://bucket/path");
loadJob.whenDone(new Job.CompletionCallback() {
@Override
public void success(Job job) {
System.out.println("Job succeeded");
}
@Override
public void error(BigQueryError error, List<BigQueryError> executionErrors) {
System.out.println("Job completed with errors");
}
});
loadJob = loadJob.waitFor();
if (loadJob.status().error() != null) {
System.out.println("Job completed with errors");
} else {
System.out.println("Job succeeded");
}
```
Google Cloud Compute (Alpha)
Expand All @@ -202,31 +194,19 @@ import com.google.cloud.compute.Compute;
import com.google.cloud.compute.ComputeOptions;
import com.google.cloud.compute.Disk;
import com.google.cloud.compute.DiskId;
import com.google.cloud.compute.Operation.OperationError;
import com.google.cloud.compute.Operation.OperationWarning;
import com.google.cloud.compute.Snapshot;
import java.util.List;
final Compute compute = ComputeOptions.defaultInstance().service();
DiskId diskId = DiskId.of("us-central1-a", "disk-name");
Disk disk = compute.getDisk(diskId, Compute.DiskOption.fields());
if (disk != null) {
final String snapshotName = "disk-name-snapshot";
Operation operation = disk.createSnapshot(snapshotName);
operation.whenDone(new Operation.CompletionCallback() {
@Override
public void success(Operation operation) {
// use snapshot
Snapshot snapshot = compute.getSnapshot(snapshotName);
}
@Override
public void error(List<OperationError> errors, List<OperationWarning> warnings) {
// inspect errors
throw new RuntimeException("Snaphsot creation failed");
}
});
operation = operation.waitFor();
if (operation.errors() == null) {
// use snapshot
Snapshot snapshot = compute.getSnapshot(snapshotName);
}
}
```
The second snippet shows how to create a virtual machine instance. Complete source code can be found
Expand All @@ -242,10 +222,6 @@ import com.google.cloud.compute.InstanceId;
import com.google.cloud.compute.InstanceInfo;
import com.google.cloud.compute.MachineTypeId;
import com.google.cloud.compute.NetworkId;
import com.google.cloud.compute.Operation.OperationError;
import com.google.cloud.compute.Operation.OperationWarning;
import java.util.List;
Compute compute = ComputeOptions.defaultInstance().service();
ImageId imageId = ImageId.of("debian-cloud", "debian-8-jessie-v20160329");
Expand All @@ -256,9 +232,7 @@ InstanceId instanceId = InstanceId.of("us-central1-a", "instance-name");
MachineTypeId machineTypeId = MachineTypeId.of("us-central1-a", "n1-standard-1");
Operation operation =
compute.create(InstanceInfo.of(instanceId, machineTypeId, attachedDisk, networkInterface));
while (!operation.isDone()) {
Thread.sleep(1000L);
}
operation = operation.waitFor();
if (operation.errors() == null) {
// use instance
Instance instance = compute.getInstance(instanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* A Google BigQuery Job.
Expand All @@ -38,24 +38,6 @@ public class Job extends JobInfo {
private final BigQueryOptions options;
private transient BigQuery bigquery;

/**
* A callback for job completion.
*/
public interface CompletionCallback {
/**
* The method called when the job completes successfully.
*/
void success(Job job);

/**
* The method called when the job completes with errors. {@code error} is the final error that
* caused the job to fail (see {@link JobStatus#error()}). {@code executionErrors} are all the
* errors (possibly not fatal) encountered by the job during its execution (see
* {@link JobStatus#executionErrors()}).
*/
void error(BigQueryError error, List<BigQueryError> executionErrors);
}

/**
* A builder for {@code Job} objects.
*/
Expand Down Expand Up @@ -163,40 +145,51 @@ public boolean isDone() {
}

/**
* Waits until this job completes its execution, either failing or succeeding. If the job does not
* exist, this method returns without executing any method of the provided callback. If the job
* completed successfully the {@link CompletionCallback#success(Job)} method is called. If the job
* completed with errors the {@link CompletionCallback#error(BigQueryError, List)} method is
* called.
* Blocks until this job completes its execution, either failing or succeeding. The job status is
* checked every 500 milliseconds. This method returns current job's latest information. If the
* job no longer exists, this method returns {@code null}.
* <pre> {@code
* job.whenDone(new CompletionCallback() {
* void success(Job job) {
* // completed successfully
* }
* Job completedJob = job.waitFor();
* if (completedJob == null) {
* // job no longer exists
* } else if (completedJob.status().error() != null) {
* // job failed, handle error
* } else {
* // job completed successfully
* }}</pre>
*
* void error(BigQueryError error, List<BigQueryError> executionErrors) {
* // handle error
* }
* });}</pre>
* @throws BigQueryException upon failure
* @throws InterruptedException if the current thread gets interrupted while waiting for the job
* to complete
*/
public Job waitFor() throws InterruptedException {
return waitFor(500, TimeUnit.MILLISECONDS);
}

/**
* Blocks until this job completes its execution, either failing or succeeding. The
* {@code checkEvery} and {@code unit} parameters determine how often the job's status is checked.
* This method returns current job's latest information. If the job no longer exists, this method
* returns {@code null}.
* <pre> {@code
* Job completedJob = job.waitFor(1, TimeUnit.SECONDS);
* if (completedJob == null) {
* // job no longer exists
* } else if (completedJob.status().error() != null) {
* // job failed, handle error
* } else {
* // job completed successfully
* }}</pre>
*
* @throws BigQueryException upon failure
* @throws InterruptedException if the current thread gets interrupted while waiting for the job
* to complete
*/
public void whenDone(CompletionCallback callback) throws InterruptedException {
public Job waitFor(int checkEvery, TimeUnit unit) throws InterruptedException {
while (!isDone()) {
Thread.sleep(500L);
}
Job updatedJob = reload();
if (updatedJob == null) {
return;
}
BigQueryError error = updatedJob.status().error();
if (error != null) {
callback.error(error, updatedJob.status().executionErrors());
} else {
callback.success(updatedJob);
unit.sleep(checkEvery);
}
return reload();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,12 @@
* }
* System.out.println("Loading data into table " + tableId);
* Job loadJob = table.load(FormatOptions.csv(), "gs://bucket/path");
* loadJob.whenDone(new Job.CompletionCallback() {
* public void success(Job job) {
* System.out.println("Job succeeded");
* }
*
* public void error(BigQueryError error, List<BigQueryError> executionErrors) {
* System.out.println("Job completed with errors");
* }
* });}</pre>
* loadJob = loadJob.waitFor();
* if (loadJob.status().error() != null) {
* System.out.println("Job completed with errors");
* } else {
* System.out.println("Job succeeded");
* }}</pre>
*
* @see <a href="https://cloud.google.com/bigquery/">Google Cloud BigQuery</a>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@
import static org.junit.Assert.assertTrue;

import com.google.cloud.bigquery.JobStatistics.CopyStatistics;
import com.google.common.collect.ImmutableList;

import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Test;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class JobTest {

Expand Down Expand Up @@ -182,60 +181,71 @@ public void testIsDone_NotExists() throws Exception {
}

@Test
public void testWhenDone_Success() throws InterruptedException {
public void testWaitFor() throws InterruptedException {
initializeExpectedJob(2);
Job.CompletionCallback callback = EasyMock.mock(Job.CompletionCallback.class);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
JobStatus status = createStrictMock(JobStatus.class);
expect(status.state()).andReturn(JobStatus.State.DONE);
expect(status.error()).andReturn(null);
expect(bigquery.options()).andReturn(mockOptions);
Job completedJob = expectedJob.toBuilder().status(status).build();
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(completedJob);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(completedJob);
callback.success(completedJob);
EasyMock.expectLastCall();
replay(status, bigquery, callback);
replay(status, bigquery);
initializeJob();
job.whenDone(callback);
verify(status, callback);
assertSame(completedJob, job.waitFor());
verify(status);
}

@Test
public void testWhenDone_Error() throws InterruptedException {
initializeExpectedJob(2);
Job.CompletionCallback callback = EasyMock.mock(Job.CompletionCallback.class);
public void testWaitFor_Null() throws InterruptedException {
initializeExpectedJob(1);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
expect(bigquery.options()).andReturn(mockOptions);
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(null);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(null);
replay(bigquery);
initializeJob();
assertNull(job.waitFor());
}

@Test
public void testWaitForWithTimeUnit() throws InterruptedException {
initializeExpectedJob(3);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
BigQueryError error = new BigQueryError("reason", "location", "message");
List<BigQueryError> executionErrors = ImmutableList.of(error);
TimeUnit timeUnit = createStrictMock(TimeUnit.class);
timeUnit.sleep(42);
EasyMock.expectLastCall();
JobStatus status = createStrictMock(JobStatus.class);
expect(status.state()).andReturn(JobStatus.State.RUNNING);
expect(status.state()).andReturn(JobStatus.State.DONE);
expect(status.error()).andReturn(error);
expect(status.executionErrors()).andReturn(executionErrors);
expect(bigquery.options()).andReturn(mockOptions);
Job runningJob = expectedJob.toBuilder().status(status).build();
Job completedJob = expectedJob.toBuilder().status(status).build();
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(completedJob);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(completedJob);
callback.error(error, executionErrors);
EasyMock.expectLastCall();
replay(status, bigquery, callback);
replay(status, bigquery, timeUnit);
initializeJob();
job.whenDone(callback);
verify(status, callback);
assertSame(completedJob, job.waitFor(42, timeUnit));
verify(status, timeUnit);
}

@Test
public void testWhenDone_Null() throws InterruptedException {
initializeExpectedJob(1);
Job.CompletionCallback callback = EasyMock.mock(Job.CompletionCallback.class);
public void testWaitForWithTimeUnit_Null() throws InterruptedException {
initializeExpectedJob(2);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
TimeUnit timeUnit = createStrictMock(TimeUnit.class);
timeUnit.sleep(42);
EasyMock.expectLastCall();
expect(bigquery.options()).andReturn(mockOptions);
Job runningJob = expectedJob.toBuilder().status(new JobStatus(JobStatus.State.RUNNING)).build();
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(runningJob);
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(null);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(null);
replay(bigquery, callback);
replay(bigquery, timeUnit);
initializeJob();
job.whenDone(callback);
verify(callback);
assertNull(job.waitFor(42, timeUnit));
verify(bigquery, timeUnit);
}

@Test
Expand Down
Loading

0 comments on commit 9ba2701

Please sign in to comment.