Skip to content

Commit

Permalink
Merge pull request #52 from ecmwf-projects/improve-copy-and-delete
Browse files Browse the repository at this point in the history
Improve copy and delete
  • Loading branch information
garciampred authored Dec 31, 2024
2 parents f4e5fa4 + fc50665 commit ac22663
Show file tree
Hide file tree
Showing 13 changed files with 228 additions and 68 deletions.
13 changes: 7 additions & 6 deletions cdsobs/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from sqlalchemy.orm import Session

from cdsobs.cdm.api import (
apply_unit_changes,
check_cdm_compliance,
define_units,
)
from cdsobs.config import CDSObsConfig, DatasetConfig
from cdsobs.ingestion.api import (
Expand Down Expand Up @@ -294,11 +294,12 @@ def _read_homogenise_and_partition(
# Check CDM compliance
check_cdm_compliance(homogenised_data, dataset_metadata.cdm_tables)
# Apply unit changes
homogenised_data = apply_unit_changes(
homogenised_data,
service_definition.sources[source],
dataset_metadata.cdm_code_tables["observed_variable"],
)
if "units" not in homogenised_data.columns:
homogenised_data = define_units(
homogenised_data,
service_definition.sources[source],
dataset_metadata.cdm_code_tables["observed_variable"],
)
year = time_space_batch.time_batch.year
lon_tile_size = dataset_config.get_tile_size("lon", source, year)
lat_tile_size = dataset_config.get_tile_size("lat", source, year)
Expand Down
2 changes: 1 addition & 1 deletion cdsobs/cdm/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def check_cdm_compliance(
return table_field_mappings


def apply_unit_changes(
def define_units(
homogenised_data: pandas.DataFrame,
source_definition: SourceDefinition,
cdm_variable_table: CDMCodeTable,
Expand Down
4 changes: 3 additions & 1 deletion cdsobs/cli/_copy_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def copy_dataset(


def _copy_dataset_impl(cdsobs_config_yml, dataset, dest_config_yml, dest_dataset):
if dest_dataset is None:
dest_dataset = dataset
check_params(dest_config_yml, dataset, dest_dataset)

try:
Expand Down Expand Up @@ -191,7 +193,7 @@ def catalogue_copy(
for entry in entries:
# This is needed to load the constraints as it is a deferred attribute.
# However if we load them the other attributes will dissappear from __dict__
# There is no way apparently if doing this better in sqlalchemy
# There is no way apparently of doing this better in sqlalchemy
entry_dict = {
col.name: getattr(entry, col.name) for col in entry.__table__.columns
}
Expand Down
42 changes: 30 additions & 12 deletions cdsobs/cli/_delete_dataset.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
from pathlib import Path

import dask
import sqlalchemy.orm
import typer
from click import prompt
from fastapi.encoders import jsonable_encoder
from rich.console import Console
from sqlalchemy import delete, func, select

from cdsobs.cli._utils import (
config_yml_typer,
list_parser,
)
from cdsobs.config import CDSObsConfig
from cdsobs.observation_catalogue.database import get_session
from cdsobs.observation_catalogue.models import Catalogue
from cdsobs.observation_catalogue.repositories.cads_dataset import CadsDatasetRepository
from cdsobs.observation_catalogue.repositories.catalogue import CatalogueRepository
from cdsobs.observation_catalogue.schemas.catalogue import (
CatalogueSchema,
Expand All @@ -26,7 +30,9 @@
def delete_dataset(
cdsobs_config_yml: Path = config_yml_typer,
dataset: str = typer.Option(..., help="dataset to delete", prompt=True),
dataset_source: str = typer.Option("", help="dataset source to delete"),
dataset_source: str = typer.Option(
None, help="dataset source to delete. By default it will delete all."
),
time: str = typer.Option(
"",
help="Filter by an exact date or by an interval of two dates. For example: "
Expand All @@ -35,7 +41,7 @@ def delete_dataset(
):
"""Permanently delete the given dataset from the catalogue and the storage."""
confirm = prompt(
"This will delete the dataset permanently."
"This will delete the data permanently."
" This action cannot be undone. "
"Please type again the name of the dataset to confirm"
)
Expand All @@ -55,8 +61,15 @@ def delete_dataset(
except (Exception, KeyboardInterrupt):
catalogue_rollback(catalogue_session, deleted_entries)
raise
if len(deleted_entries):
console.print(f"[bold green] Dataset {dataset} deleted. [/bold green]")
nd = len(deleted_entries)
console.print(f"[bold green] {nd} entries deleted from {dataset}. [/bold green]")
nremaining = catalogue_session.scalar(select(func.count()).select_from(Catalogue))
if nremaining == 0:
CadsDatasetRepository(catalogue_session).delete_dataset(dataset)
console.print(
f"[bold green] Deleted {dataset} from datasets table as it was left empty. "
f"[/bold green]"
)


def delete_from_catalogue(
Expand All @@ -78,22 +91,27 @@ def delete_from_catalogue(
entries = catalogue_repo.get_by_filters(filters)
if not len(entries):
console.print(f"[red] No entries for dataset {dataset} found")
deleted_entries = []
try:
for entry in entries:
catalogue_repo.remove(record_id=entry.id)
deleted_entries.append(entry)
catalogue_session.execute(delete(Catalogue).where(*filters))
catalogue_session.commit()
except (Exception, KeyboardInterrupt):
catalogue_rollback(catalogue_session, deleted_entries)
return deleted_entries
catalogue_rollback(catalogue_session, entries)
return entries


def delete_from_s3(deleted_entries, s3_client):
assets = [e.asset for e in deleted_entries]
for asset in assets:
bucket, name = asset.split("/")

def delete_asset(asset_to_delete):
bucket, name = asset_to_delete.split("/")
s3_client.delete_file(bucket, name)

delayed_deletes = []
for asset in assets:
delayed_deletes.append(dask.delayed(delete_asset)(asset))

dask.compute(*delayed_deletes)


def catalogue_rollback(catalogue_session, deleted_entries):
schemas = [CatalogueSchema(**jsonable_encoder(e)) for e in deleted_entries]
Expand Down
2 changes: 1 addition & 1 deletion cdsobs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

DATE_FORMAT = "%Y-%m-%d, %H:%M:%S"

CONFIG_YML = Path(cdsobs_path, "data/cdsobs_config_template.yml")
CONFIG_YML = Path(Path(cdsobs_path).parent, "tests", "data", "cdsobs_config_test.yml")

DS_TEST_NAME = "insitu-observations-woudc-ozone-total-column-and-profiles"

Expand Down
28 changes: 14 additions & 14 deletions cdsobs/data/cdsobs_config_template.yml
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
ingestion_databases:
main:
db_user: user
pwd: testpass
host: localhost
port: 25432
db_name: baron
db_user: someuser
pwd: somepassword
host: somehost
port: 5431
db_name: ingestion
catalogue_db:
db_user: docker
db_user: someuser
pwd: docker
host: localhost
port: 5433
db_name: cataloguedbtest
port: 5432
db_name: catalogue-dev
s3config:
access_key: minioadmin
secret_key: minioadmin
host: 127.0.0.1
port: 9000
secure: false
namespace: cds2-obs-dev
access_key: somekey
secret_key: some_secret_key
host: object-store.os-api.cci2.ecmwf.int
port: 443
secure: true
namespace: somenamespace
datasets:
- name: insitu-observations-woudc-ozone-total-column-and-profiles
lon_tile_size: 180
Expand Down
2 changes: 1 addition & 1 deletion cdsobs/ingestion/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def validate_and_homogenise(
data_renamed = data
# Add z coordinate if needed
if (
"z_coordinate" not in data_renamed
"z_coordinate" not in data_renamed.columns
and source_definition.space_columns is not None
and source_definition.space_columns.z is not None
):
Expand Down
10 changes: 6 additions & 4 deletions cdsobs/ingestion/readers/netcdf.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from pathlib import Path
from typing import Tuple

import pandas
import xarray

from cdsobs.config import CDSObsConfig
from cdsobs.ingestion.api import EmptyBatchException
from cdsobs.ingestion.core import TimeSpaceBatch
from cdsobs.retrieve.filter_datasets import get_var_code_dict
from cdsobs.service_definition.service_definition_models import ServiceDefinition
from cdsobs.utils.logutils import get_logger

Expand All @@ -20,7 +20,7 @@ def read_flat_netcdfs(
source: str,
time_space_batch: TimeSpaceBatch,
input_dir: str,
) -> Tuple[pandas.DataFrame, pandas.Series]:
) -> pandas.DataFrame:
if time_space_batch.space_batch != "global":
logger.warning("This reader does not support subsetting in space.")
time_batch = time_space_batch.time_batch
Expand All @@ -30,7 +30,9 @@ def read_flat_netcdfs(
)
if netcdf_path.exists():
data = xarray.open_dataset(netcdf_path).to_pandas()
data_types = data.dtypes
else:
raise EmptyBatchException
return data, data_types # type: ignore
# Decode variable names
code_dict = get_var_code_dict(config.cdm_tables_location)
data["observed_variable"] = data["observed_variable"].map(code_dict)
return data
51 changes: 33 additions & 18 deletions cdsobs/ingestion/serialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pandas

from cdsobs import constants
from cdsobs.cdm.api import CdmDataset, to_cdm_dataset
from cdsobs.cdm.api import CdmDataset, define_units, to_cdm_dataset
from cdsobs.config import CDSObsConfig
from cdsobs.ingestion.api import read_batch_data
from cdsobs.ingestion.core import (
Expand Down Expand Up @@ -131,24 +131,10 @@ def to_netcdf(
# Encode variable names as integer
if encode_variables:
logger.info("Encoding observed variables using the CDM variable codes.")
code_table = cdm_dataset.dataset_params.cdm_code_tables[
"observed_variable"
].table
# strip to remove extra spaces
var2code = get_var2code(code_table)
encoded_data = (
cdm_dataset.dataset["observed_variable"]
.str.encode("UTF-8")
.map(var2code)
.astype("uint8")
)
cdm_code_tables = cdm_dataset.dataset_params.cdm_code_tables
data = cdm_dataset.dataset
encoded_data, var2code_subset = encode_observed_variables(cdm_code_tables, data)
cdm_dataset.dataset["observed_variable"] = encoded_data
codes_in_data = encoded_data.unique()
var2code_subset = {
var.decode("ascii"): code
for var, code in var2code.items()
if code in codes_in_data
}
encoding["observed_variable"]["dtype"] = encoded_data.dtype
attrs["observed_variable"] = dict(
labels=list(var2code_subset), codes=list(var2code_subset.values())
Expand All @@ -166,6 +152,22 @@ def to_netcdf(
return output_path


def encode_observed_variables(cdm_code_tables, data):
code_table = cdm_code_tables["observed_variable"].table
# strip to remove extra spaces
var2code = get_var2code(code_table)
encoded_data = (
data["observed_variable"].str.encode("UTF-8").map(var2code).astype("uint8")
)
codes_in_data = encoded_data.unique()
var2code_subset = {
var.decode("ascii"): code
for var, code in var2code.items()
if code in codes_in_data
}
return encoded_data, var2code_subset


def get_var2code(code_table):
code_dict = pandas.Series(
index=code_table["name"].str.strip().str.replace(" ", "_").str.encode("ascii"),
Expand Down Expand Up @@ -280,15 +282,28 @@ def batch_to_netcdf(
for field in homogenised_data:
if homogenised_data[field].dtype == "string":
homogenised_data[field] = homogenised_data[field].str.encode("UTF-8")
homogenised_data = define_units(
homogenised_data,
service_definition.sources[source],
dataset_params.cdm_code_tables["observed_variable"],
)
encoded_data, var2code_subset = encode_observed_variables(
dataset_params.cdm_code_tables, homogenised_data
)
homogenised_data["observed_variable"] = encoded_data
homogenised_data_xr = homogenised_data.to_xarray()
if service_definition.global_attributes is not None:
homogenised_data.attrs = {
**homogenised_data.attrs,
**service_definition.global_attributes,
}
homogenised_data_xr["observed_variable"].attrs = dict(
labels=list(var2code_subset), codes=list(var2code_subset.values())
)
encoding = get_encoding_with_compression_xarray(
homogenised_data_xr, string_transform="str_to_char"
)
encoding["observed_variable"]["dtype"] = str(encoded_data.dtype)
logger.info(f"Writing de-normalized and CDM mapped data to {output_path}")
homogenised_data_xr.to_netcdf(
output_path, encoding=encoding, engine="netcdf4", format="NETCDF4"
Expand Down
4 changes: 4 additions & 0 deletions cdsobs/observation_catalogue/repositories/cads_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,7 @@ def get_dataset(self, dataset_name: str) -> CadsDataset | None:
return self.session.scalar(
sa.select(CadsDataset).filter(CadsDataset.name == dataset_name)
)

def delete_dataset(self, dataset_name: str):
dataset = self.get_dataset(dataset_name)
self.session.delete(dataset)
Loading

0 comments on commit ac22663

Please sign in to comment.