Skip to content

Commit

Permalink
Merge pull request #584 from mziccard/bigquery-hierachies
Browse files Browse the repository at this point in the history
Remove JobInfo hierarchy, add JobConfiguration hierarchy
  • Loading branch information
aozarov committed Jan 29, 2016
2 parents 44a1533 + 73250ea commit b4bddff
Show file tree
Hide file tree
Showing 45 changed files with 2,296 additions and 1,635 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ import com.google.gcloud.bigquery.BigQuery;
import com.google.gcloud.bigquery.BigQueryOptions;
import com.google.gcloud.bigquery.Field;
import com.google.gcloud.bigquery.JobStatus;
import com.google.gcloud.bigquery.LoadJobInfo;
import com.google.gcloud.bigquery.JobInfo;
import com.google.gcloud.bigquery.Schema;
import com.google.gcloud.bigquery.TableId;
import com.google.gcloud.bigquery.TableInfo;
Expand All @@ -144,7 +144,8 @@ if (info == null) {
bigquery.create(TableInfo.of(tableId, Schema.of(integerField)));
} else {
System.out.println("Loading data into table " + tableId);
LoadJobInfo loadJob = LoadJobInfo.of(tableId, "gs://bucket/path");
LoadJobConfiguration configuration = LoadJobConfiguration.of(tableId, "gs://bucket/path");
JobInfo loadJob = JobInfo.of(configuration);
loadJob = bigquery.create(loadJob);
while (loadJob.status().state() != JobStatus.State.DONE) {
Thread.sleep(1000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,10 @@ protected final boolean baseEquals(BaseTableInfo tableInfo) {
return Objects.equals(toPb(), tableInfo.toPb());
}

BaseTableInfo setProjectId(String projectId) {
return toBuilder().tableId(tableId().setProjectId(projectId)).build();
}

Table toPb() {
Table tablePb = new Table();
tablePb.setTableReference(tableId.toPb());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,9 @@ public static JobListOption startPageToken(String pageToken) {
* is not provided all job's fields are returned. {@code JobOption.fields()} can be used to
* specify only the fields of interest. {@link JobInfo#jobId()}, {@link JobStatus#state()},
* {@link JobStatus#error()} as well as type-specific configuration (e.g.
* {@link QueryJobInfo#query()} for Query Jobs) are always returned, even if not specified.
* {@link JobField#SELF_LINK} and {@link JobField#ETAG} can not be selected when listing jobs.
* {@link QueryJobConfiguration#query()} for Query Jobs) are always returned, even if not
* specified. {@link JobField#SELF_LINK} and {@link JobField#ETAG} can not be selected when
* listing jobs.
*/
public static JobListOption fields(JobField... fields) {
String selector = JobField.selector(fields);
Expand All @@ -397,8 +398,8 @@ private JobOption(BigQueryRpc.Option option, Object value) {
* Returns an option to specify the job's fields to be returned by the RPC call. If this option
* is not provided all job's fields are returned. {@code JobOption.fields()} can be used to
* specify only the fields of interest. {@link JobInfo#jobId()} as well as type-specific
* configuration (e.g. {@link QueryJobInfo#query()} for Query Jobs) are always returned, even if
* not specified.
* configuration (e.g. {@link QueryJobConfiguration#query()} for Query Jobs) are always
* returned, even if not specified.
*/
public static JobOption fields(JobField... fields) {
return new JobOption(BigQueryRpc.Option.FIELDS, JobField.selector(fields));
Expand Down Expand Up @@ -470,7 +471,7 @@ public static QueryResultsOption maxWaitTime(long maxWaitTime) {
*
* @throws BigQueryException upon failure
*/
<T extends JobInfo> T create(T job, JobOption... options) throws BigQueryException;
JobInfo create(JobInfo job, JobOption... options) throws BigQueryException;

/**
* Returns the requested dataset or {@code null} if not found.
Expand Down Expand Up @@ -611,14 +612,14 @@ Page<List<FieldValue>> listTableData(TableId tableId, TableDataListOption... opt
*
* @throws BigQueryException upon failure
*/
<T extends JobInfo> T getJob(String jobId, JobOption... options) throws BigQueryException;
JobInfo getJob(String jobId, JobOption... options) throws BigQueryException;

/**
* Returns the requested job or {@code null} if not found.
*
* @throws BigQueryException upon failure
*/
<T extends JobInfo> T getJob(JobId jobId, JobOption... options) throws BigQueryException;
JobInfo getJob(JobId jobId, JobOption... options) throws BigQueryException;

/**
* Lists the jobs.
Expand Down Expand Up @@ -665,9 +666,9 @@ Page<List<FieldValue>> listTableData(TableId tableId, TableDataListOption... opt

/**
* Returns a channel to write data to be inserted into a BigQuery table. Data format and other
* options can be configured using the {@link LoadConfiguration} parameter.
* options can be configured using the {@link WriteChannelConfiguration} parameter.
*
* @throws BigQueryException upon failure
*/
TableDataWriteChannel writer(LoadConfiguration loadConfiguration);
TableDataWriteChannel writer(WriteChannelConfiguration writeChannelConfiguration);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.google.gcloud.bigquery;

import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.common.collect.ImmutableSet;
import com.google.gcloud.BaseServiceException;
import com.google.gcloud.RetryHelper.RetryHelperException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest.Rows;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -159,7 +158,7 @@ public QueryResult nextPage() {
@Override
public DatasetInfo create(DatasetInfo dataset, DatasetOption... options)
throws BigQueryException {
final Dataset datasetPb = setProjectId(dataset).toPb();
final Dataset datasetPb = dataset.setProjectId(options().projectId()).toPb();
final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
try {
return DatasetInfo.fromPb(runWithRetries(new Callable<Dataset>() {
Expand All @@ -176,7 +175,7 @@ public Dataset call() {
@Override
public <T extends BaseTableInfo> T create(T table, TableOption... options)
throws BigQueryException {
final Table tablePb = setProjectId(table).toPb();
final Table tablePb = table.setProjectId(options().projectId()).toPb();
final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
try {
return BaseTableInfo.fromPb(runWithRetries(new Callable<Table>() {
Expand All @@ -191,8 +190,8 @@ public Table call() {
}

@Override
public <T extends JobInfo> T create(T job, JobOption... options) throws BigQueryException {
final Job jobPb = setProjectId(job).toPb();
public JobInfo create(JobInfo job, JobOption... options) throws BigQueryException {
final Job jobPb = job.setProjectId(options().projectId()).toPb();
final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
try {
return JobInfo.fromPb(runWithRetries(new Callable<Job>() {
Expand Down Expand Up @@ -295,7 +294,7 @@ public Boolean call() {
@Override
public DatasetInfo update(DatasetInfo dataset, DatasetOption... options)
throws BigQueryException {
final Dataset datasetPb = setProjectId(dataset).toPb();
final Dataset datasetPb = dataset.setProjectId(options().projectId()).toPb();
final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
try {
return DatasetInfo.fromPb(runWithRetries(new Callable<Dataset>() {
Expand All @@ -312,7 +311,7 @@ public Dataset call() {
@Override
public <T extends BaseTableInfo> T update(T table, TableOption... options)
throws BigQueryException {
final Table tablePb = setProjectId(table).toPb();
final Table tablePb = table.setProjectId(options().projectId()).toPb();
final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
try {
return BaseTableInfo.fromPb(runWithRetries(new Callable<Table>() {
Expand Down Expand Up @@ -442,12 +441,12 @@ public List<FieldValue> apply(TableRow rowPb) {
}

@Override
public <T extends JobInfo> T getJob(String jobId, JobOption... options) throws BigQueryException {
public JobInfo getJob(String jobId, JobOption... options) throws BigQueryException {
return getJob(JobId.of(jobId), options);
}

@Override
public <T extends JobInfo> T getJob(final JobId jobId, JobOption... options)
public JobInfo getJob(final JobId jobId, JobOption... options)
throws BigQueryException {
final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
try {
Expand All @@ -457,7 +456,7 @@ public Job call() {
return bigQueryRpc.getJob(jobId.job(), optionsMap);
}
}, options().retryParams(), EXCEPTION_HANDLER);
return answer == null ? null : JobInfo.<T>fromPb(answer);
return answer == null ? null : JobInfo.fromPb(answer);
} catch (RetryHelper.RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
Expand Down Expand Up @@ -508,7 +507,7 @@ public QueryResponse query(final QueryRequest request) throws BigQueryException
runWithRetries(new Callable<com.google.api.services.bigquery.model.QueryResponse>() {
@Override
public com.google.api.services.bigquery.model.QueryResponse call() {
return bigQueryRpc.query(setProjectId(request).toPb());
return bigQueryRpc.query(request.setProjectId(options().projectId()).toPb());
}
}, options().retryParams(), EXCEPTION_HANDLER);
QueryResponse.Builder builder = QueryResponse.builder();
Expand Down Expand Up @@ -596,8 +595,9 @@ private static QueryResult.Builder transformQueryResults(JobId jobId, List<Table
.results(transformTableData(rowsPb));
}

public TableDataWriteChannel writer(LoadConfiguration loadConfiguration) {
return new TableDataWriteChannel(options(), setProjectId(loadConfiguration));
public TableDataWriteChannel writer(WriteChannelConfiguration writeChannelConfiguration) {
return new TableDataWriteChannel(options(),
writeChannelConfiguration.setProjectId(options().projectId()));
}

private Map<BigQueryRpc.Option, ?> optionMap(Option... options) {
Expand All @@ -608,93 +608,4 @@ public TableDataWriteChannel writer(LoadConfiguration loadConfiguration) {
}
return optionMap;
}

private DatasetInfo setProjectId(DatasetInfo dataset) {
DatasetInfo.Builder datasetBuilder = dataset.toBuilder();
datasetBuilder.datasetId(setProjectId(dataset.datasetId()));
if (dataset.acl() != null) {
List<Acl> acls = Lists.newArrayListWithCapacity(dataset.acl().size());
for (Acl acl : dataset.acl()) {
if (acl.entity().type() == Acl.Entity.Type.VIEW) {
Dataset.Access accessPb = acl.toPb();
TableReference viewReferencePb = accessPb.getView();
if (viewReferencePb.getProjectId() == null) {
viewReferencePb.setProjectId(options().projectId());
}
acls.add(Acl.of(new Acl.View(TableId.fromPb(viewReferencePb))));
} else {
acls.add(acl);
}
}
datasetBuilder.acl(acls);
}
return datasetBuilder.build();
}

private DatasetId setProjectId(DatasetId dataset) {
return dataset.project() != null ? dataset
: DatasetId.of(options().projectId(), dataset.dataset());
}

private BaseTableInfo setProjectId(BaseTableInfo table) {
return table.toBuilder().tableId(setProjectId(table.tableId())).build();
}

private TableId setProjectId(TableId table) {
return table.project() != null ? table
: TableId.of(options().projectId(), table.dataset(), table.table());
}

private JobInfo setProjectId(JobInfo job) {
if (job instanceof CopyJobInfo) {
CopyJobInfo copyJob = (CopyJobInfo) job;
CopyJobInfo.Builder copyBuilder = copyJob.toBuilder();
copyBuilder.destinationTable(setProjectId(copyJob.destinationTable()));
copyBuilder.sourceTables(
Lists.transform(copyJob.sourceTables(), new Function<TableId, TableId>() {
@Override
public TableId apply(TableId tableId) {
return setProjectId(tableId);
}
}));
return copyBuilder.build();
}
if (job instanceof QueryJobInfo) {
QueryJobInfo queryJob = (QueryJobInfo) job;
QueryJobInfo.Builder queryBuilder = queryJob.toBuilder();
if (queryJob.destinationTable() != null) {
queryBuilder.destinationTable(setProjectId(queryJob.destinationTable()));
}
if (queryJob.defaultDataset() != null) {
queryBuilder.defaultDataset(setProjectId(queryJob.defaultDataset()));
}
return queryBuilder.build();
}
if (job instanceof ExtractJobInfo) {
ExtractJobInfo extractJob = (ExtractJobInfo) job;
ExtractJobInfo.Builder extractBuilder = extractJob.toBuilder();
extractBuilder.sourceTable(setProjectId(extractJob.sourceTable()));
return extractBuilder.build();
}
if (job instanceof LoadJobInfo) {
LoadJobInfo loadJob = (LoadJobInfo) job;
LoadJobInfo.Builder loadBuilder = loadJob.toBuilder();
return loadBuilder.configuration(setProjectId(loadJob.configuration())).build();
}
return job;
}

private QueryRequest setProjectId(QueryRequest request) {
QueryRequest.Builder builder = request.toBuilder();
if (request.defaultDataset() != null) {
builder.defaultDataset(setProjectId(request.defaultDataset()));
}
return builder.build();
}

private LoadConfiguration setProjectId(LoadConfiguration configuration) {
LoadConfiguration.Builder builder = configuration.toBuilder();
builder.destinationTable(setProjectId(configuration.destinationTable()));
return builder.build();
}
}
Loading

0 comments on commit b4bddff

Please sign in to comment.