Skip to content

Commit

Permalink
Parquet to BigQuery import for GCP-backed AnVIL snapshots (#6355)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Sep 4, 2024
1 parent 12586df commit 8f8f247
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 5 deletions.
7 changes: 4 additions & 3 deletions scripts/post_deploy_tdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ def verify_source(self,
) -> None:
source = self.tdr.lookup_source(source_spec)
log.info('TDR client is authorized for API access to %s.', source_spec)
require(source.project == source_spec.subdomain,
'Actual Google project of TDR source differs from configured one',
source.project, source_spec.subdomain)
if source_spec.subdomain != config.google_project():
require(source.project == source_spec.subdomain,
'Actual Google project of TDR source differs from configured one',
source.project, source_spec.subdomain)
# Uppercase is standard for multi-regions in the documentation but TDR
# returns 'us' in lowercase
require(source.location.lower() == config.tdr_source_location.lower(),
Expand Down
24 changes: 24 additions & 0 deletions scripts/reindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,15 @@
from azul.logging import (
configure_script_logging,
)
from azul.plugins.repository import (
tdr_anvil,
)
from azul.plugins.repository.tdr import (
TDRPlugin,
)
from azul.terra import (
TDRSourceSpec,
)

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -105,6 +111,11 @@
default=False,
action='store_true',
help='Purge the queues before taking any action on the indices.')
parser.add_argument('--import',
default=False,
action='store_true',
dest='import_',
help='Import sources into BigQuery data from TDR')
parser.add_argument('--nowait', '--no-wait',
dest='wait',
default=True,
Expand Down Expand Up @@ -159,6 +170,19 @@ def main(argv: list[str]):
parser.error('Cannot specify sources when performing a local reindex')
assert False

if args.import_:
for catalog, sources in sources_by_catalog.items():
if config.is_tdr_enabled(catalog) and config.is_anvil_enabled(catalog) and sources:
plugin = azul.repository_plugin(catalog)
assert isinstance(plugin, tdr_anvil.Plugin)
for source in sources:
spec = TDRSourceSpec.parse(source)
if spec.type == TDRSourceSpec.Type.parquet:
source = plugin.resolve_source(source)
plugin.import_tables(source)
else:
log.info('Skipping table import for catalog %r', catalog)

if args.deindex:
require(not any((args.index, args.delete, args.create)),
'--deindex is incompatible with --index, --create, and --delete.')
Expand Down
35 changes: 35 additions & 0 deletions src/azul/plugins/repository/tdr_anvil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
)

import attrs
from furl import (
furl,
)
from more_itertools import (
one,
)

from azul import (
cached_property,
config,
reject,
require,
uuids,
)
Expand Down Expand Up @@ -740,3 +744,34 @@ def _columns(self, entity_type: EntityType) -> set[str]:
entity_columns = {column['name'] for column in table['columns']}
entity_columns.add('datarepo_row_id')
return entity_columns

def import_tables(self, source: TDRSourceRef):
"""
Import tables for an AnVIL snapshot into BigQuery via TDR's Parquet
export API. Only tables defined in the AnVIL schema will be imported.
Currently, only GS-backed snapshots are supported.
"""
require(source.spec.subdomain == config.google_project(), source)

dataset_name = source.spec.name
self.tdr.create_dataset(dataset_name)

urls_by_table = self.tdr.export_parquet_urls(source.id)
reject(urls_by_table is None,
'No Parquet access information is available for snapshot %r.', source.spec)

for table in anvil_schema['tables']:
table_name = table['name']
urls = urls_by_table[table_name]
for url in urls:
require(url.origin == 'https://storage.googleapis.com',
'Unsupported storage location for snapshot %r: %r',
source.spec, url)
url.load(furl(scheme='gs',
netloc=url.path.segments[0],
path=url.path.segments[1:]))
self.tdr.create_table(dataset_name=dataset_name,
table_name=table_name,
import_urls=urls,
overwrite=False,
clustering_fields=table['primaryKey'])
76 changes: 74 additions & 2 deletions src/azul/terra.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@
bigquery,
)
from google.cloud.bigquery import (
Dataset,
DatasetReference,
LoadJobConfig,
ParquetOptions,
QueryJob,
QueryJobConfig,
QueryPriority,
SourceFormat,
WriteDisposition,
)
from more_itertools import (
one,
Expand Down Expand Up @@ -150,7 +156,6 @@ def parse(cls, spec: str) -> 'TDRSourceSpec':
service, type, domain, subdomain, name = rest.split(':')
assert service == 'tdr', service
type = cls.Type(type)
reject(type == cls.Type.parquet, 'Parquet sources are not yet supported')
domain = cls.Domain(domain)
reject(domain == cls.Domain.azure, 'Azure sources are not yet supported')
self = cls(prefix=prefix,
Expand Down Expand Up @@ -257,7 +262,7 @@ def oauth2_scopes(self) -> Sequence[str]:
return [
*super().oauth2_scopes(),
'https://www.googleapis.com/auth/devstorage.read_only',
'https://www.googleapis.com/auth/bigquery.readonly'
'https://www.googleapis.com/auth/bigquery'
]


Expand Down Expand Up @@ -655,6 +660,73 @@ def get_duos(self, source: SourceRef) -> Optional[MutableJSON]:
else:
return self._check_response(url, response)

def create_dataset(self, dataset_name: str):
"""
Create a BigQuery dataset in the project and region configured for the
current deployment.
:param dataset_name: Unqualified name of the dataset to create.
`google.cloud.exceptions.Conflict` will be raised
if a dataset with the same name already exists.
"""
bigquery = self._bigquery(self.credentials.project_id)
ref = DatasetReference(bigquery.project, dataset_name)
dataset = Dataset(ref)
dataset.location = config.tdr_source_location
log.info('Creating BigQuery dataset %r in region %r',
dataset.dataset_id, dataset.location)
bigquery.create_dataset(dataset)

def create_table(self,
dataset_name: str,
table_name: str,
import_urls: Sequence[furl],
*,
overwrite: bool,
clustering_fields: Optional[Sequence[str]] = None):
"""
Create a BigQuery table in the project and region configured for the
current deployment.
:param dataset_name: Unqualified name of the dataset to contain the new
table
:param table_name: Unqualified name of the new table
:param import_urls: URLs of Parquet file(s) to populate the table. These
must be `gs://` URLS and the GCS bucket's region
must be compatible with the target dataset's. See
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet#limitations
:param overwrite: Overwrite existing table with the same ID as the table
we're trying to create (true) or raise an exception if
such a table exists (false)
:param clustering_fields: Fields defining clustering for the table. See
https://cloud.google.com/bigquery/docs/clustered-tables
"""
for url in import_urls:
require(url.scheme == 'gs', url)
table_id = f'{dataset_name}.{table_name}'
bigquery = self._bigquery(self.credentials.project_id)
write_disposition = (
WriteDisposition.WRITE_TRUNCATE if overwrite else WriteDisposition.WRITE_EMPTY
)
job_config = LoadJobConfig(
write_disposition=write_disposition,
clustering_fields=clustering_fields,
source_format=SourceFormat.PARQUET,
# Avoids convoluted data types for array fields
parquet_options=ParquetOptions.from_api_repr(dict(enable_list_inference=True))
)
log.info('Creating BigQuery table %r',
f'{bigquery.project}.{dataset_name}.{table_name}')
load_job = bigquery.load_table_from_uri(source_uris=list(map(str, import_urls)),
destination=table_id,
job_config=job_config)
load_job.result()
log.info('Table created successfully')

def export_parquet_urls(self,
snapshot_id: str
) -> Optional[dict[str, list[mutable_furl]]]:
Expand Down
3 changes: 3 additions & 0 deletions terraform/authentication.tf.json.template.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
"title": f"azul_{config.deployment_stage}",
"permissions": [
"bigquery.jobs.create",
"bigquery.datasets.create",
"bigquery.tables.create",
"bigquery.tables.updateData",
*[
f'bigquery.{resource}.{action}'
for resource in ('capacityCommitments', 'reservations')
Expand Down

0 comments on commit 8f8f247

Please sign in to comment.