Skip to content

Commit

Permalink
Merge pull request #453 from mziccard/bigquery
Browse files Browse the repository at this point in the history
Refactor BigQueryRpc and DefaultBigQueryRpc
  • Loading branch information
aozarov committed Dec 14, 2015
2 parents 36c7406 + 3b70b20 commit 0f765b7
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@
import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.QueryResponse;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.gcloud.bigquery.BigQueryException;

Expand Down Expand Up @@ -113,6 +111,10 @@ public Y y() {

Dataset create(Dataset dataset, Map<Option, ?> options) throws BigQueryException;

Table create(Table table, Map<Option, ?> options) throws BigQueryException;

Job create(Job job, Map<Option, ?> options) throws BigQueryException;

/**
* Delete the requested dataset.
*
Expand All @@ -123,6 +125,8 @@ public Y y() {

Dataset patch(Dataset dataset, Map<Option, ?> options) throws BigQueryException;

Table patch(Table table, Map<Option, ?> options) throws BigQueryException;

/**
* Returns the requested table or {@code null} if not found.
*
Expand All @@ -139,21 +143,16 @@ public Y y() {
Tuple<String, Iterable<Table>> listTables(String dataset, Map<Option, ?> options)
throws BigQueryException;

Table create(Table table, Map<Option, ?> options) throws BigQueryException;

/**
* Delete the requested table.
*
* @return {@code true} if table was deleted, {@code false} if it was not found
* @throws BigQueryException upon failure
*/
boolean deleteTable(String datasetId, String tableId, Map<Option, ?> options)
throws BigQueryException;
boolean deleteTable(String datasetId, String tableId) throws BigQueryException;

Table patch(Table table, Map<Option, ?> options) throws BigQueryException;

TableDataInsertAllResponse insertAll(TableReference table, TableDataInsertAllRequest request,
Map<Option, ?> options) throws BigQueryException;
TableDataInsertAllResponse insertAll(String datasetId, String tableId,
TableDataInsertAllRequest request) throws BigQueryException;

Tuple<String, Iterable<TableRow>> listTableData(String datasetId, String tableId,
Map<Option, ?> options) throws BigQueryException;
Expand All @@ -172,12 +171,18 @@ Tuple<String, Iterable<TableRow>> listTableData(String datasetId, String tableId
*/
Tuple<String, Iterable<Job>> listJobs(Map<Option, ?> options) throws BigQueryException;

Job create(Job job, Map<Option, ?> options) throws BigQueryException;

boolean cancel(String jobId, Map<Option, ?> options) throws BigQueryException;
/**
* Sends a job cancel request. This call will return immediately, and the client will need to poll
* for the job status to see if the cancel completed successfully.
*
* @return {@code true} if cancel was requested successfully, {@code false} if the job was not
* found
* @throws BigQueryException upon failure
*/
boolean cancel(String jobId) throws BigQueryException;

GetQueryResultsResponse getQueryResults(JobReference job, Map<Option, ?> options)
GetQueryResultsResponse getQueryResults(String jobId, Map<Option, ?> options)
throws BigQueryException;

QueryResponse query(QueryRequest request, Map<Option, ?> options) throws BigQueryException;
QueryResponse query(QueryRequest request) throws BigQueryException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import static com.google.gcloud.spi.BigQueryRpc.Option.DELETE_CONTENTS;
import static com.google.gcloud.spi.BigQueryRpc.Option.FIELDS;
import static com.google.gcloud.spi.BigQueryRpc.Option.MAX_RESULTS;
import static com.google.gcloud.spi.BigQueryRpc.Option.PAGE_TOKEN;
import static com.google.gcloud.spi.BigQueryRpc.Option.START_INDEX;
import static com.google.gcloud.spi.BigQueryRpc.Option.TIMEOUT;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
Expand All @@ -32,7 +34,6 @@
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobList;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.QueryResponse;
Expand All @@ -48,9 +49,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;

import static com.google.gcloud.spi.BigQueryRpc.Option.MAX_RESULTS;
import static com.google.gcloud.spi.BigQueryRpc.Option.PAGE_TOKEN;

import com.google.gcloud.bigquery.BigQueryException;
import com.google.gcloud.bigquery.BigQueryOptions;

Expand Down Expand Up @@ -103,7 +101,7 @@ public Dataset getDataset(String datasetId, Map<Option, ?> options) throws BigQu
.get(this.options.projectId(), datasetId)
.setFields(FIELDS.getString(options))
.execute();
} catch(IOException ex) {
} catch (IOException ex) {
BigQueryException serviceException = translate(ex);
if (serviceException.code() == HTTP_NOT_FOUND) {
return null;
Expand All @@ -124,15 +122,16 @@ public Tuple<String, Iterable<Dataset>> listDatasets(Map<Option, ?> options)
.execute();
Iterable<DatasetList.Datasets> datasets = datasetsList.getDatasets();
return Tuple.of(datasetsList.getNextPageToken(),
Iterables.transform(datasets != null ? datasets : ImmutableList.<DatasetList.Datasets>of(),
Iterables.transform(datasets != null ? datasets :
ImmutableList.<DatasetList.Datasets>of(),
new Function<DatasetList.Datasets, Dataset>() {
@Override
public Dataset apply(DatasetList.Datasets f) {
public Dataset apply(DatasetList.Datasets datasetPb) {
return new Dataset()
.setDatasetReference(f.getDatasetReference())
.setFriendlyName(f.getFriendlyName())
.setId(f.getId())
.setKind(f.getKind());
.setDatasetReference(datasetPb.getDatasetReference())
.setFriendlyName(datasetPb.getFriendlyName())
.setId(datasetPb.getId())
.setKind(datasetPb.getKind());
}
}));
} catch (IOException ex) {
Expand All @@ -151,6 +150,33 @@ public Dataset create(Dataset dataset, Map<Option, ?> options) throws BigQueryEx
}
}

@Override
public Table create(Table table, Map<Option, ?> options)
throws BigQueryException {
try {
// unset the type, as it is output only
table.setType(null);
return bigquery.tables()
.insert(this.options.projectId(), table.getTableReference().getDatasetId(), table)
.setFields(FIELDS.getString(options))
.execute();
} catch (IOException ex) {
throw translate(ex);
}
}

@Override
public Job create(Job job, Map<Option, ?> options) throws BigQueryException {
try {
return bigquery.jobs()
.insert(this.options.projectId(), job)
.setFields(FIELDS.getString(options))
.execute();
} catch (IOException ex) {
throw translate(ex);
}
}

@Override
public boolean deleteDataset(String datasetId, Map<Option, ?> options) throws BigQueryException {
try {
Expand Down Expand Up @@ -180,6 +206,21 @@ public Dataset patch(Dataset dataset, Map<Option, ?> options) throws BigQueryExc
}
}

@Override
public Table patch(Table table, Map<Option, ?> options) throws BigQueryException {
try {
// unset the type, as it is output only
table.setType(null);
TableReference reference = table.getTableReference();
return bigquery.tables()
.patch(this.options.projectId(), reference.getDatasetId(), reference.getTableId(), table)
.setFields(FIELDS.getString(options))
.execute();
} catch (IOException ex) {
throw translate(ex);
}
}

@Override
public Table getTable(String datasetId, String tableId, Map<Option, ?> options)
throws BigQueryException {
Expand All @@ -188,7 +229,7 @@ public Table getTable(String datasetId, String tableId, Map<Option, ?> options)
.get(this.options.projectId(), datasetId, tableId)
.setFields(FIELDS.getString(options))
.execute();
} catch(IOException ex) {
} catch (IOException ex) {
BigQueryException serviceException = translate(ex);
if (serviceException.code() == HTTP_NOT_FOUND) {
return null;
Expand All @@ -211,13 +252,13 @@ public Tuple<String, Iterable<Table>> listTables(String datasetId, Map<Option, ?
Iterables.transform(tables != null ? tables : ImmutableList.<TableList.Tables>of(),
new Function<TableList.Tables, Table>() {
@Override
public Table apply(TableList.Tables f) {
public Table apply(TableList.Tables tablePb) {
return new Table()
.setFriendlyName(f.getFriendlyName())
.setId(f.getId())
.setKind(f.getKind())
.setTableReference(f.getTableReference())
.setType(f.getType());
.setFriendlyName(tablePb.getFriendlyName())
.setId(tablePb.getId())
.setKind(tablePb.getKind())
.setTableReference(tablePb.getTableReference())
.setType(tablePb.getType());
}
}));
} catch (IOException ex) {
Expand All @@ -226,21 +267,7 @@ public Table apply(TableList.Tables f) {
}

@Override
public Table create(Table table, Map<Option, ?> options)
throws BigQueryException {
try {
return bigquery.tables()
.insert(this.options.projectId(), table.getTableReference().getDatasetId(), table)
.setFields(FIELDS.getString(options))
.execute();
} catch (IOException ex) {
throw translate(ex);
}
}

@Override
public boolean deleteTable(String datasetId, String tableId, Map<Option, ?> options)
throws BigQueryException {
public boolean deleteTable(String datasetId, String tableId) throws BigQueryException {
try {
bigquery.tables().delete(this.options.projectId(), datasetId, tableId).execute();
return true;
Expand All @@ -254,24 +281,11 @@ public boolean deleteTable(String datasetId, String tableId, Map<Option, ?> opti
}

@Override
public Table patch(Table table, Map<Option, ?> options) throws BigQueryException {
try {
TableReference reference = table.getTableReference();
return bigquery.tables()
.patch(this.options.projectId(), reference.getDatasetId(), reference.getTableId(), table)
.setFields(FIELDS.getString(options))
.execute();
} catch (IOException ex) {
throw translate(ex);
}
}

@Override
public TableDataInsertAllResponse insertAll(TableReference table,
TableDataInsertAllRequest request, Map<Option, ?> options) throws BigQueryException {
public TableDataInsertAllResponse insertAll(String datasetId, String tableId,
TableDataInsertAllRequest request) throws BigQueryException {
try {
return bigquery.tabledata()
.insertAll(this.options.projectId(), table.getDatasetId(), table.getTableId(), request)
.insertAll(this.options.projectId(), datasetId, tableId, request)
.execute();
} catch (IOException ex) {
throw translate(ex);
Expand All @@ -286,8 +300,8 @@ public Tuple<String, Iterable<TableRow>> listTableData(String datasetId, String
.list(this.options.projectId(), datasetId, tableId)
.setMaxResults(MAX_RESULTS.getLong(options))
.setPageToken(PAGE_TOKEN.getString(options))
.setStartIndex(START_INDEX.getLong(options) != null ?
BigInteger.valueOf(START_INDEX.getLong(options)) : null)
.setStartIndex(START_INDEX.getLong(options) != null
? BigInteger.valueOf(START_INDEX.getLong(options)) : null)
.execute();
return Tuple.<String, Iterable<TableRow>>of(tableDataList.getPageToken(),
tableDataList.getRows());
Expand All @@ -303,7 +317,7 @@ public Job getJob(String jobId, Map<Option, ?> options) throws BigQueryException
.get(this.options.projectId(), jobId)
.setFields(FIELDS.getString(options))
.execute();
} catch(IOException ex) {
} catch (IOException ex) {
BigQueryException serviceException = translate(ex);
if (serviceException.code() == HTTP_NOT_FOUND) {
return null;
Expand All @@ -329,22 +343,23 @@ public Tuple<String, Iterable<Job>> listJobs(Map<Option, ?> options) throws BigQ
Iterables.transform(jobs != null ? jobs : ImmutableList.<JobList.Jobs>of(),
new Function<JobList.Jobs, Job>() {
@Override
public Job apply(JobList.Jobs f) {
JobStatus statusPb = f.getStatus() != null ? f.getStatus() : new JobStatus();
public Job apply(JobList.Jobs jobPb) {
JobStatus statusPb = jobPb.getStatus() != null
? jobPb.getStatus() : new JobStatus();
if (statusPb.getState() == null) {
statusPb.setState(f.getState());
statusPb.setState(jobPb.getState());
}
if (statusPb.getErrorResult() == null) {
statusPb.setErrorResult(f.getErrorResult());
statusPb.setErrorResult(jobPb.getErrorResult());
}
return new Job()
.setConfiguration(f.getConfiguration())
.setId(f.getId())
.setJobReference(f.getJobReference())
.setKind(f.getKind())
.setStatistics(f.getStatistics())
.setStatus(f.getStatus())
.setUserEmail(f.getUserEmail());
.setConfiguration(jobPb.getConfiguration())
.setId(jobPb.getId())
.setJobReference(jobPb.getJobReference())
.setKind(jobPb.getKind())
.setStatistics(jobPb.getStatistics())
.setStatus(statusPb)
.setUserEmail(jobPb.getUserEmail());
}
}));
} catch (IOException ex) {
Expand All @@ -353,19 +368,7 @@ public Job apply(JobList.Jobs f) {
}

@Override
public Job create(Job job, Map<Option, ?> options) throws BigQueryException {
try {
return bigquery.jobs()
.insert(this.options.projectId(), job)
.setFields(FIELDS.getString(options))
.execute();
} catch (IOException ex) {
throw translate(ex);
}
}

@Override
public boolean cancel(String jobId, Map<Option, ?> options) throws BigQueryException {
public boolean cancel(String jobId) throws BigQueryException {
try {
bigquery.jobs().cancel(this.options.projectId(), jobId).execute();
return true;
Expand All @@ -379,17 +382,17 @@ public boolean cancel(String jobId, Map<Option, ?> options) throws BigQueryExcep
}

@Override
public GetQueryResultsResponse getQueryResults(JobReference job, Map<Option, ?> options)
public GetQueryResultsResponse getQueryResults(String jobId, Map<Option, ?> options)
throws BigQueryException {
try {
return bigquery.jobs().getQueryResults(this.options.projectId(), job.getJobId())
return bigquery.jobs().getQueryResults(this.options.projectId(), jobId)
.setMaxResults(MAX_RESULTS.getLong(options))
.setPageToken(PAGE_TOKEN.getString(options))
.setStartIndex(START_INDEX.getLong(options) != null ?
BigInteger.valueOf(START_INDEX.getLong(options)) : null)
.setStartIndex(START_INDEX.getLong(options) != null
? BigInteger.valueOf(START_INDEX.getLong(options)) : null)
.setTimeoutMs(TIMEOUT.getLong(options))
.execute();
} catch(IOException ex) {
} catch (IOException ex) {
BigQueryException serviceException = translate(ex);
if (serviceException.code() == HTTP_NOT_FOUND) {
return null;
Expand All @@ -399,8 +402,7 @@ public GetQueryResultsResponse getQueryResults(JobReference job, Map<Option, ?>
}

@Override
public QueryResponse query(QueryRequest request, Map<Option, ?> options)
throws BigQueryException {
public QueryResponse query(QueryRequest request) throws BigQueryException {
try {
return bigquery.jobs().query(this.options.projectId(), request).execute();
} catch (IOException ex) {
Expand Down

0 comments on commit 0f765b7

Please sign in to comment.