diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index f36d80978efd..e24f41afd6b0 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -83,6 +83,17 @@ def __init__(self, project=None, credentials=None, _http=None): project=project, credentials=credentials, _http=_http) self._connection = Connection(self) + def _clone(self, project): + """Create a new client for another project. + + Helper for creating dataset / table instances in remote projects. + + :rtype: :class:`Client` + :returns: a new instance, bound to the supplied project, using + the same credentials / http object as this instance. + """ + return self.__class__(project, self._credentials, self._http) + def list_projects(self, max_results=None, page_token=None): """List projects for the project associated with this client. diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index 953a2c265580..3a945e8728ee 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -14,6 +14,7 @@ """Define API Jobs.""" +import copy import threading import six @@ -333,6 +334,11 @@ def _set_properties(self, api_response): # For Future interface self._set_future_result() + def _job_statistics(self): + """Helper for properties derived from job statistics.""" + statistics = self._properties.get('statistics', {}) + return statistics.get(self._JOB_TYPE, {}) + @classmethod def _get_resource_config(cls, resource): """Helper for :meth:`from_api_repr` @@ -964,6 +970,20 @@ def __init__(self, name, source, destination_uris, client): https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.extract.printHeader """ + @property + def destination_uri_file_counts(self): + """Return file counts from job statistics, if present. + + See: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#statistics.extract.destinationUriFileCounts + + :rtype: int or None + :returns: number of DML rows affectd by the job, or None if job is not + yet complete. + """ + query_stats = self._job_statistics() + return query_stats.get('destinationUriFileCounts') + def _populate_config_resource(self, configuration): """Helper for _build_resource: copy config properties to resource""" if self.compression is not None: @@ -1277,6 +1297,170 @@ def from_api_repr(cls, resource, client): job._set_properties(resource) return job + @property + def query_plan(self): + """Return query plan from job statistics, if present. + + See: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#statistics.query.queryPlan + + :rtype: list of dict + :returns: mappings describing the query plan, or an empty list + if the query has not yet completed. + """ + query_stats = self._job_statistics() + plan_entries = query_stats.get('queryPlan', ()) + return [copy.deepcopy(entry) for entry in plan_entries] + + @property + def total_bytes_processed(self): + """Return total bytes processed from job statistics, if present. + + See: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#statistics.query.totalBytesProcessed + + :rtype: int or None + :returns: total bytes processed by the job, or None if job is not + yet complete. + """ + query_stats = self._job_statistics() + return query_stats.get('totalBytesProcessed') + + @property + def total_bytes_billed(self): + """Return total bytes billed from job statistics, if present. + + See: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#statistics.query.totalBytesBilled + + :rtype: int or None + :returns: total bytes processed by the job, or None if job is not + yet complete. + """ + query_stats = self._job_statistics() + return query_stats.get('totalBytesBilled') + + @property + def billing_tier(self): + """Return billing tier from job statistics, if present. + + See: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#statistics.query.billingTier + + :rtype: int or None + :returns: billing tier used by the job, or None if job is not + yet complete. + """ + query_stats = self._job_statistics() + return query_stats.get('billingTier') + + @property + def cache_hit(self): + """Return billing tier from job statistics, if present. + + See: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#statistics.query.cacheHit + + :rtype: bool or None + :returns: whether the query results were returned from cache, or None + if job is not yet complete. + """ + query_stats = self._job_statistics() + return query_stats.get('cacheHit') + + @property + def referenced_tables(self): + """Return referenced tables from job statistics, if present. + + See: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#statistics.query.referencedTables + + :rtype: list of dict + :returns: mappings describing the query plan, or an empty list + if the query has not yet completed. + """ + tables = [] + client = self._require_client(None) + query_stats = self._job_statistics() + clients_by_project = {client.project: client} + datasets_by_project_name = {} + + for table in query_stats.get('referencedTables', ()): + + t_project = table['projectId'] + t_client = clients_by_project.get(t_project) + if t_client is None: + t_client = client._clone(t_project) + clients_by_project[t_project] = t_client + + ds_name = table['datasetId'] + t_dataset = datasets_by_project_name.get((t_project, ds_name)) + if t_dataset is None: + t_dataset = t_client.dataset(ds_name) + datasets_by_project_name[(t_project, ds_name)] = t_dataset + + t_name = table['tableId'] + tables.append(t_dataset.table(t_name)) + + return tables + + @property + def schema(self): + """Return schema from job statistics, if present. + + See: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#statistics.query.schema + + :rtype: list of :class:`~google.cloud.bigquery.schema.SchemaField` + :returns: fields describing the query's result set, or an empty list + if the query has not yet completed. + """ + query_stats = self._job_statistics() + return _parse_schema_resource(query_stats.get('schema', {})) + + @property + def num_dml_affected_rows(self): + """Return total bytes billed from job statistics, if present. + + See: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#statistics.query.numDmlAffectedRows + + :rtype: int or None + :returns: number of DML rows affectd by the job, or None if job is not + yet complete. + """ + query_stats = self._job_statistics() + return query_stats.get('numDmlAffectedRows') + + @property + def undeclared_query_paramters(self): + """Return undeclared query parameters from job statistics, if present. + + See: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#statistics.query.undeclaredQueryParamters + + :rtype: list of dict + :returns: mappings describing the undeclared parameters, or an empty + list if the query has not yet completed. + """ + query_stats = self._job_statistics() + undeclared = query_stats.get('undeclaredQueryParamters', ()) + return [copy.deepcopy(parameter) for parameter in undeclared] + + @property + def statement_type(self): + """Return statement type from job statistics, if present. + + See: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#statistics.query.statementType + + :rtype: str or None + :returns: type of statement used by the job, or None if job is not + yet complete. + """ + query_stats = self._job_statistics() + return query_stats.get('statementType') + def query_results(self): """Construct a QueryResults instance, bound to this job. diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index e71f3b99fbe0..962432498946 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -45,6 +45,18 @@ def test_ctor(self): self.assertIs(client._connection.credentials, creds) self.assertIs(client._connection.http, http) + def test_clone(self): + PROJECT = 'PROJECT' + OTHER_PROJECT = 'OTHER-PROJECT' + creds = _make_credentials() + http = object() + client = self._make_one(project=PROJECT, credentials=creds, _http=http) + + cloned = client._clone(OTHER_PROJECT) + self.assertEqual(cloned.project, OTHER_PROJECT) + self.assertIs(cloned._credentials, creds) + self.assertIs(cloned._http, http) + def test_list_projects_defaults(self): import six from google.cloud.bigquery.client import Project diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 46326441a5e1..d0b7194b60fd 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -42,7 +42,7 @@ def test_missing_reason(self): class _Base(object): PROJECT = 'project' SOURCE1 = 'http://example.com/source1.csv' - DS_NAME = 'datset_name' + DS_NAME = 'dataset_name' TABLE_NAME = 'table_name' JOB_NAME = 'job_name' @@ -1182,6 +1182,23 @@ def test_ctor(self): self.assertIsNone(job.field_delimiter) self.assertIsNone(job.print_header) + def test_destination_uri_file_counts(self): + file_counts = 23 + client = _Client(self.PROJECT) + source = _Table(self.SOURCE_TABLE) + job = self._make_one(self.JOB_NAME, source, [self.DESTINATION_URI], + client) + self.assertIsNone(job.destination_uri_file_counts) + + statistics = job._properties['statistics'] = {} + self.assertIsNone(job.destination_uri_file_counts) + + extract_stats = statistics['extract'] = {} + self.assertIsNone(job.destination_uri_file_counts) + + extract_stats['destinationUriFileCounts'] = file_counts + self.assertEqual(job.destination_uri_file_counts, file_counts) + def test_from_api_repr_missing_identity(self): self._setUpConstants() client = _Client(self.PROJECT) @@ -1632,6 +1649,263 @@ def test_cancelled(self): self.assertTrue(job.cancelled()) + def test_query_plan(self): + plan_entries = [{ + 'name': 'NAME', + 'id': 1234, + 'waitRatioAvg': 2.71828, + 'waitRatioMax': 3.14159, + 'readRatioAvg': 1.41421, + 'readRatioMax': 1.73205, + 'computeRatioAvg': 0.69315, + 'computeRatioMax': 1.09861, + 'writeRatioAvg': 3.32193, + 'writeRatioMax': 2.30258, + 'recordsRead': 100, + 'recordsWritten': 1, + 'status': 'STATUS', + 'steps': [{ + 'kind': 'KIND', + 'substeps': ['SUBSTEP1', 'SUBSTEP2'], + }], + }] + client = _Client(self.PROJECT) + job = self._make_one(self.JOB_NAME, self.QUERY, client) + self.assertEqual(job.query_plan, []) + + statistics = job._properties['statistics'] = {} + self.assertEqual(job.query_plan, []) + + query_stats = statistics['query'] = {} + self.assertEqual(job.query_plan, []) + + query_stats['queryPlan'] = plan_entries + self.assertEqual(job.query_plan, plan_entries) + + def test_total_bytes_processed(self): + total_bytes = 1234 + client = _Client(self.PROJECT) + job = self._make_one(self.JOB_NAME, self.QUERY, client) + self.assertIsNone(job.total_bytes_processed) + + statistics = job._properties['statistics'] = {} + self.assertIsNone(job.total_bytes_processed) + + query_stats = statistics['query'] = {} + self.assertIsNone(job.total_bytes_processed) + + query_stats['totalBytesProcessed'] = total_bytes + self.assertEqual(job.total_bytes_processed, total_bytes) + + def test_total_bytes_billed(self): + total_bytes = 1234 + client = _Client(self.PROJECT) + job = self._make_one(self.JOB_NAME, self.QUERY, client) + self.assertIsNone(job.total_bytes_billed) + + statistics = job._properties['statistics'] = {} + self.assertIsNone(job.total_bytes_billed) + + query_stats = statistics['query'] = {} + self.assertIsNone(job.total_bytes_billed) + + query_stats['totalBytesBilled'] = total_bytes + self.assertEqual(job.total_bytes_billed, total_bytes) + + def test_billing_tier(self): + billing_tier = 1 + client = _Client(self.PROJECT) + job = self._make_one(self.JOB_NAME, self.QUERY, client) + self.assertIsNone(job.billing_tier) + + statistics = job._properties['statistics'] = {} + self.assertIsNone(job.billing_tier) + + query_stats = statistics['query'] = {} + self.assertIsNone(job.billing_tier) + + query_stats['billingTier'] = billing_tier + self.assertEqual(job.billing_tier, billing_tier) + + def test_cache_hit(self): + client = _Client(self.PROJECT) + job = self._make_one(self.JOB_NAME, self.QUERY, client) + self.assertIsNone(job.cache_hit) + + statistics = job._properties['statistics'] = {} + self.assertIsNone(job.cache_hit) + + query_stats = statistics['query'] = {} + self.assertIsNone(job.cache_hit) + + query_stats['cacheHit'] = True + self.assertTrue(job.cache_hit) + + def test_referenced_tables(self): + from google.cloud.bigquery.dataset import Dataset + from google.cloud.bigquery.table import Table + + referenced_tables = [{ + 'projectId': self.PROJECT, + 'datasetId': 'dataset', + 'tableId': 'local1', + }, { + 'projectId': self.PROJECT, + 'datasetId': 'dataset', + 'tableId': 'local2', + }, { + + 'projectId': 'other-project-123', + 'datasetId': 'other-dataset', + 'tableId': 'remote', + }] + client = _Client(self.PROJECT) + job = self._make_one(self.JOB_NAME, self.QUERY, client) + self.assertEqual(job.referenced_tables, []) + + statistics = job._properties['statistics'] = {} + self.assertEqual(job.referenced_tables, []) + + query_stats = statistics['query'] = {} + self.assertEqual(job.referenced_tables, []) + + query_stats['referencedTables'] = referenced_tables + + local1, local2, remote = job.referenced_tables + + self.assertIsInstance(local1, Table) + self.assertEqual(local1.name, 'local1') + self.assertIsInstance(local1._dataset, Dataset) + self.assertEqual(local1._dataset.name, 'dataset') + self.assertIs(local1._dataset._client, client) + + self.assertIsInstance(local2, Table) + self.assertEqual(local2.name, 'local2') + self.assertIs(local2._dataset, local1._dataset) + + self.assertIsInstance(remote, Table) + self.assertEqual(remote.name, 'remote') + self.assertIsInstance(remote._dataset, Dataset) + self.assertEqual(remote._dataset.name, 'other-dataset') + self.assertIsNot(remote._dataset._client, client) + self.assertIsInstance(remote._dataset._client, _Client) + self.assertEqual(remote._dataset._client.project, 'other-project-123') + + def test_schema(self): + from google.cloud.bigquery.table import _parse_schema_resource + + schema = { + 'fields': [{ + 'name': 'full_name', + 'type': 'STRING', + 'mode': 'NULLABLE', + 'description': 'DESCRIPTION' + }, { + 'name': 'phone_number', + 'type': 'STRING', + 'mode': 'REPEATED', + }, { + 'name': 'address', + 'type': 'RECORD', + 'mode': 'REPEATED', + 'fields': [{ + 'name': 'street_address', + 'type': 'STRING', + 'mode': 'NULLABLE', + }, { + 'name': 'city', + 'type': 'STRING', + 'mode': 'NULLABLE', + }, { + 'name': 'state', + 'type': 'STRING', + 'mode': 'NULLABLE', + }, { + 'name': 'zip', + 'type': 'STRING', + 'mode': 'NULLABLE', + }], + }], + } + client = _Client(self.PROJECT) + job = self._make_one(self.JOB_NAME, self.QUERY, client) + self.assertEqual(job.schema, ()) + + statistics = job._properties['statistics'] = {} + self.assertEqual(job.schema, ()) + + query_stats = statistics['query'] = {} + self.assertEqual(job.schema, ()) + + query_stats['schema'] = schema + + self.assertEqual(job.schema, _parse_schema_resource(schema)) + + def test_num_dml_affected_rows(self): + num_rows = 1234 + client = _Client(self.PROJECT) + job = self._make_one(self.JOB_NAME, self.QUERY, client) + self.assertIsNone(job.num_dml_affected_rows) + + statistics = job._properties['statistics'] = {} + self.assertIsNone(job.num_dml_affected_rows) + + query_stats = statistics['query'] = {} + self.assertIsNone(job.num_dml_affected_rows) + + query_stats['numDmlAffectedRows'] = num_rows + self.assertEqual(job.num_dml_affected_rows, num_rows) + + def test_undeclared_query_paramters(self): + undeclared = [{ + "name": 'my_scalar', + "parameterType": { + "type": 'STRING', + }, + }, { + "name": 'my_array', + "parameterType": { + "type": 'ARRAY', + "arrayType": 'INT64', + }, + }, { + "name": 'my_struct', + "parameterType": { + "type": 'STRUCT', + "structTypes": [{ + "name": 'count', + "type": 'INT64', + }], + }, + }] + client = _Client(self.PROJECT) + job = self._make_one(self.JOB_NAME, self.QUERY, client) + self.assertEqual(job.undeclared_query_paramters, []) + + statistics = job._properties['statistics'] = {} + self.assertEqual(job.undeclared_query_paramters, []) + + query_stats = statistics['query'] = {} + self.assertEqual(job.undeclared_query_paramters, []) + + query_stats['undeclaredQueryParamters'] = undeclared + self.assertEqual(job.undeclared_query_paramters, undeclared) + + def test_statement_type(self): + statement_type = 'SELECT' + client = _Client(self.PROJECT) + job = self._make_one(self.JOB_NAME, self.QUERY, client) + self.assertIsNone(job.statement_type) + + statistics = job._properties['statistics'] = {} + self.assertIsNone(job.statement_type) + + query_stats = statistics['query'] = {} + self.assertIsNone(job.statement_type) + + query_stats['statementType'] = statement_type + self.assertEqual(job.statement_type, statement_type) + def test_query_results(self): from google.cloud.bigquery.query import QueryResults @@ -2075,6 +2349,9 @@ def __init__(self, project='project', connection=None): self.project = project self._connection = connection + def _clone(self, project): + return self.__class__(project, connection=self._connection) + def dataset(self, name): from google.cloud.bigquery.dataset import Dataset