From 6e2b2e1ce9e4993f2d4b666925bc6fbd106caf0c Mon Sep 17 00:00:00 2001 From: Noa Aviel Dove Date: Wed, 10 Jul 2024 20:05:52 -0700 Subject: [PATCH] Parquet to BigQuery import for GCP-backed AnVIL snapshots (#6355) --- scripts/post_deploy_tdr.py | 7 +- scripts/reindex.py | 24 ++++++ .../plugins/repository/tdr_anvil/__init__.py | 35 +++++++++ src/azul/terra.py | 76 ++++++++++++++++++- terraform/authentication.tf.json.template.py | 3 + 5 files changed, 140 insertions(+), 5 deletions(-) diff --git a/scripts/post_deploy_tdr.py b/scripts/post_deploy_tdr.py index fe3565c10b..222efccf3d 100644 --- a/scripts/post_deploy_tdr.py +++ b/scripts/post_deploy_tdr.py @@ -93,9 +93,10 @@ def verify_source(self, ) -> None: source = self.tdr.lookup_source(source_spec) log.info('TDR client is authorized for API access to %s.', source_spec) - require(source.project == source_spec.subdomain, - 'Actual Google project of TDR source differs from configured one', - source.project, source_spec.subdomain) + if source_spec.subdomain != config.google_project(): + require(source.project == source_spec.subdomain, + 'Actual Google project of TDR source differs from configured one', + source.project, source_spec.subdomain) # Uppercase is standard for multi-regions in the documentation but TDR # returns 'us' in lowercase require(source.location.lower() == config.tdr_source_location.lower(), diff --git a/scripts/reindex.py b/scripts/reindex.py index cd13cc6882..210bc16a53 100755 --- a/scripts/reindex.py +++ b/scripts/reindex.py @@ -26,9 +26,15 @@ from azul.logging import ( configure_script_logging, ) +from azul.plugins.repository import ( + tdr_anvil, +) from azul.plugins.repository.tdr import ( TDRPlugin, ) +from azul.terra import ( + TDRSourceSpec, +) log = logging.getLogger(__name__) @@ -105,6 +111,11 @@ default=False, action='store_true', help='Purge the queues before taking any action on the indices.') +parser.add_argument('--import', + default=False, + action='store_true', + dest='import_', + help='Import sources into BigQuery data from TDR') parser.add_argument('--nowait', '--no-wait', dest='wait', default=True, @@ -159,6 +170,19 @@ def main(argv: list[str]): parser.error('Cannot specify sources when performing a local reindex') assert False + if args.import_: + for catalog, sources in sources_by_catalog.items(): + if config.is_tdr_enabled(catalog) and config.is_anvil_enabled(catalog) and sources: + plugin = azul.repository_plugin(catalog) + assert isinstance(plugin, tdr_anvil.Plugin) + for source in sources: + spec = TDRSourceSpec.parse(source) + if spec.type == TDRSourceSpec.Type.parquet: + source = plugin.resolve_source(source) + plugin.import_tables(source) + else: + log.info('Skipping table import for catalog %r', catalog) + if args.deindex: require(not any((args.index, args.delete, args.create)), '--deindex is incompatible with --index, --create, and --delete.') diff --git a/src/azul/plugins/repository/tdr_anvil/__init__.py b/src/azul/plugins/repository/tdr_anvil/__init__.py index e0ea1e3c6a..4601a61d06 100644 --- a/src/azul/plugins/repository/tdr_anvil/__init__.py +++ b/src/azul/plugins/repository/tdr_anvil/__init__.py @@ -17,6 +17,9 @@ ) import attrs +from furl import ( + furl, +) from more_itertools import ( one, ) @@ -24,6 +27,7 @@ from azul import ( cached_property, config, + reject, require, uuids, ) @@ -740,3 +744,34 @@ def _columns(self, entity_type: EntityType) -> set[str]: entity_columns = {column['name'] for column in table['columns']} entity_columns.add('datarepo_row_id') return entity_columns + + def import_tables(self, source: TDRSourceRef): + """ + Import tables for an AnVIL snapshot into BigQuery via TDR's Parquet + export API. Only tables defined in the AnVIL schema will be imported. + Currently, only GS-backed snapshots are supported. + """ + require(source.spec.subdomain == config.google_project(), source) + + dataset_name = source.spec.name + self.tdr.create_dataset(dataset_name) + + urls_by_table = self.tdr.export_parquet_urls(source.id) + reject(urls_by_table is None, + 'No Parquet access information is available for snapshot %r.', source.spec) + + for table in anvil_schema['tables']: + table_name = table['name'] + urls = urls_by_table[table_name] + for url in urls: + require(url.origin == 'https://storage.googleapis.com', + 'Unsupported storage location for snapshot %r: %r', + source.spec, url) + url.load(furl(scheme='gs', + netloc=url.path.segments[0], + path=url.path.segments[1:])) + self.tdr.create_table(dataset_name=dataset_name, + table_name=table_name, + import_urls=urls, + overwrite=False, + clustering_fields=table['primaryKey']) diff --git a/src/azul/terra.py b/src/azul/terra.py index f2e6c25741..b0e38780d2 100644 --- a/src/azul/terra.py +++ b/src/azul/terra.py @@ -40,9 +40,15 @@ bigquery, ) from google.cloud.bigquery import ( + Dataset, + DatasetReference, + LoadJobConfig, + ParquetOptions, QueryJob, QueryJobConfig, QueryPriority, + SourceFormat, + WriteDisposition, ) from more_itertools import ( one, @@ -150,7 +156,6 @@ def parse(cls, spec: str) -> 'TDRSourceSpec': service, type, domain, subdomain, name = rest.split(':') assert service == 'tdr', service type = cls.Type(type) - reject(type == cls.Type.parquet, 'Parquet sources are not yet supported') domain = cls.Domain(domain) reject(domain == cls.Domain.azure, 'Azure sources are not yet supported') self = cls(prefix=prefix, @@ -257,7 +262,7 @@ def oauth2_scopes(self) -> Sequence[str]: return [ *super().oauth2_scopes(), 'https://www.googleapis.com/auth/devstorage.read_only', - 'https://www.googleapis.com/auth/bigquery.readonly' + 'https://www.googleapis.com/auth/bigquery' ] @@ -654,6 +659,73 @@ def get_duos(self, source: SourceRef) -> Optional[MutableJSON]: else: return self._check_response(url, response) + def create_dataset(self, dataset_name: str): + """ + Create a BigQuery dataset in the project and region configured for the + current deployment. + + :param dataset_name: Unqualified name of the dataset to create. + `google.cloud.exceptions.Conflict` will be raised + if a dataset with the same name already exists. + """ + bigquery = self._bigquery(self.credentials.project_id) + ref = DatasetReference(bigquery.project, dataset_name) + dataset = Dataset(ref) + dataset.location = config.tdr_source_location + log.info('Creating BigQuery dataset %r in region %r', + dataset.dataset_id, dataset.location) + bigquery.create_dataset(dataset) + + def create_table(self, + dataset_name: str, + table_name: str, + import_urls: Sequence[furl], + *, + overwrite: bool, + clustering_fields: Optional[Sequence[str]] = None): + """ + Create a BigQuery table in the project and region configured for the + current deployment. + + :param dataset_name: Unqualified name of the dataset to contain the new + table + + :param table_name: Unqualified name of the new table + + :param import_urls: URLs of Parquet file(s) to populate the table. These + must be `gs://` URLS and the GCS bucket's region + must be compatible with the target dataset's. See + https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet#limitations + + :param overwrite: Overwrite existing table with the same ID as the table + we're trying to create (true) or raise an exception if + such a table exists (false) + + :param clustering_fields: Fields defining clustering for the table. See + https://cloud.google.com/bigquery/docs/clustered-tables + """ + for url in import_urls: + require(url.scheme == 'gs', url) + table_id = f'{dataset_name}.{table_name}' + bigquery = self._bigquery(self.credentials.project_id) + write_disposition = ( + WriteDisposition.WRITE_TRUNCATE if overwrite else WriteDisposition.WRITE_EMPTY + ) + job_config = LoadJobConfig( + write_disposition=write_disposition, + clustering_fields=clustering_fields, + source_format=SourceFormat.PARQUET, + # Avoids convoluted data types for array fields + parquet_options=ParquetOptions.from_api_repr(dict(enable_list_inference=True)) + ) + log.info('Creating BigQuery table %r', + f'{bigquery.project}.{dataset_name}.{table_name}') + load_job = bigquery.load_table_from_uri(source_uris=list(map(str, import_urls)), + destination=table_id, + job_config=job_config) + load_job.result() + log.info('Table created successfully') + def export_parquet_urls(self, snapshot_id: str ) -> Optional[dict[str, list[mutable_furl]]]: diff --git a/terraform/authentication.tf.json.template.py b/terraform/authentication.tf.json.template.py index 782c1581b9..07f29b3c3e 100644 --- a/terraform/authentication.tf.json.template.py +++ b/terraform/authentication.tf.json.template.py @@ -60,6 +60,9 @@ "title": f"azul_{config.deployment_stage}", "permissions": [ "bigquery.jobs.create", + "bigquery.datasets.create", + "bigquery.tables.create", + "bigquery.tables.updateData", *[ f'bigquery.{resource}.{action}' for resource in ('capacityCommitments', 'reservations')