Skip to content

Commit

Permalink
Parquet to BigQuery import for GCP-backed AnVIL snapshots (#6355)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Jul 11, 2024
1 parent 7dd58bc commit 94a668b
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 0 deletions.
19 changes: 19 additions & 0 deletions scripts/reindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
from azul.logging import (
configure_script_logging,
)
from azul.plugins.repository import (
tdr_anvil,
)
from azul.plugins.repository.tdr import (
TDRPlugin,
)
Expand Down Expand Up @@ -105,6 +108,11 @@
default=False,
action='store_true',
help='Purge the queues before taking any action on the indices.')
parser.add_argument('--import',
default=False,
action='store_true',
dest='import_',
help='Import sources into BigQuery data from TDR')
parser.add_argument('--nowait', '--no-wait',
dest='wait',
default=True,
Expand Down Expand Up @@ -159,6 +167,17 @@ def main(argv: list[str]):
parser.error('Cannot specify sources when performing a local reindex')
assert False

if args.import_:
for catalog, sources in sources_by_catalog.items():
if config.is_tdr_enabled(catalog) and config.is_anvil_enabled(catalog) and sources:
plugin = azul.repository_plugin(catalog)
assert isinstance(plugin, tdr_anvil.Plugin)
for source in sources:
source = plugin.resolve_source(source)
plugin.import_tables(source)
else:
log.info('Skipping table import for catalog %r', catalog)

if args.deindex:
require(not any((args.index, args.delete, args.create)),
'--deindex is incompatible with --index, --create, and --delete.')
Expand Down
35 changes: 35 additions & 0 deletions src/azul/plugins/repository/tdr_anvil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
)

import attrs
from furl import (
furl,
)
from more_itertools import (
one,
)

from azul import (
cached_property,
config,
reject,
require,
uuids,
)
Expand Down Expand Up @@ -740,3 +744,34 @@ def _columns(self, entity_type: EntityType) -> set[str]:
entity_columns = {column['name'] for column in table['columns']}
entity_columns.add('datarepo_row_id')
return entity_columns

def import_tables(self, source: TDRSourceRef):
"""
Import tables for an AnVIL snapshot into BigQuery via TDR's parquet
export API. Only tables defined in the AnVIL schema will be imported.
Currently, only GS-backed snapshots are supported.
"""
require(source.spec.project == config.google_project(), source)

dataset_name = source.spec.name
self.tdr.create_dataset(dataset_name)

urls_by_table = self.tdr.export_parquet_urls(source.id)
reject(urls_by_table is None,
'No parquet access information is available for snapshot %r.', source.spec)

for table in anvil_schema['tables']:
table_name = table['name']
urls = urls_by_table[table_name]
for url in urls:
require(url.origin == 'https://storage.googleapis.com',
'Unsupported storage location for snapshot %r: %r',
source.spec, url)
url.load(furl(scheme='gs',
netloc=url.path.segments[0],
path=url.path.segments[1:]))
self.tdr.create_table(dataset_name=dataset_name,
table_name=table_name,
import_urls=urls,
overwrite=False,
clustering_fields=table['primaryKey'])
67 changes: 67 additions & 0 deletions src/azul/terra.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,73 @@ def get_duos(self, source: SourceRef) -> Optional[MutableJSON]:
else:
return self._check_response(url, response)

def create_dataset(self, dataset_name: str):
"""
Create a BigQuery dataset in the project and region configured for the
current deployment.
:param dataset_name: Unqualified name of the dataset to create.
`google.cloud.exceptions.Conflict` will be raised
if a dataset with the same name already exists.
"""
bigquery = self._bigquery(self.credentials.project_id)
ref = DatasetReference(bigquery.project, dataset_name)
dataset = Dataset(ref)
dataset.location = config.tdr_source_location
log.info('Creating BigQuery dataset %r in region %r',
dataset.dataset_id, dataset.location)
bigquery.create_dataset(dataset)

def create_table(self,
dataset_name: str,
table_name: str,
import_urls: Sequence[furl],
*,
overwrite: bool,
clustering_fields: Optional[Sequence[str]] = None):
"""
Create a BigQuery table in the project and region configured for the
current deployment.
:param dataset_name: Unqualified name of the dataset to contain the new
table
:param table_name: Unqualified name of the new table
:param import_urls: URLs of parquet file(s) to populate the table. These
must be `gs://` URLS and the GCS bucket's region
must be compatible with the target dataset's. See
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet#limitations
:param overwrite: Overwrite existing table with the same ID as the table
we're trying to create (true) or raise an exception if
such a table exists (false)
:param clustering_fields: Fields defining clustering for the table. See
https://cloud.google.com/bigquery/docs/clustered-tables
"""
for url in import_urls:
require(url.scheme == 'gs', url)
table_id = f'{dataset_name}.{table_name}'
bigquery = self._bigquery(self.credentials.project_id)
write_disposition = (
WriteDisposition.WRITE_TRUNCATE if overwrite else WriteDisposition.WRITE_EMPTY
)
job_config = LoadJobConfig(
write_disposition=write_disposition,
clustering_fields=clustering_fields,
source_format=SourceFormat.PARQUET,
# Avoids convoluted data types for array fields
parquet_options=ParquetOptions.from_api_repr(dict(enable_list_inference=True))
)
log.info('Creating BigQuery table %r',
f'{bigquery.project}.{dataset_name}.{table_name}')
load_job = bigquery.load_table_from_uri(source_uris=list(map(str, import_urls)),
destination=table_id,
job_config=job_config)
load_job.result()
log.info('Table created successfully')

def export_parquet_urls(self,
snapshot_id: str
) -> Optional[dict[str, list[mutable_furl]]]:
Expand Down

0 comments on commit 94a668b

Please sign in to comment.