diff --git a/cdsobs/ingestion/readers/cuon.py b/cdsobs/ingestion/readers/cuon.py index 44ddfbe..f0f15cb 100644 --- a/cdsobs/ingestion/readers/cuon.py +++ b/cdsobs/ingestion/readers/cuon.py @@ -322,7 +322,14 @@ def read_cuon_netcdfs( # Check for emptiness if all([dt is None for dt in denormalized_tables]): raise EmptyBatchException - return pandas.concat(denormalized_tables) + result = pandas.concat(denormalized_tables) + allnan_cols = [] + for col in result: + if result[col].isnull().all(): + allnan_cols.append(col) + logger.info(f"Removing columns {allnan_cols} as they don't have any data") + result = result.drop(allnan_cols, axis=1) + return result def get_scheduler(): diff --git a/cdsobs/sanity_checks.py b/cdsobs/sanity_checks.py index a1464c6..6c28ac4 100644 --- a/cdsobs/sanity_checks.py +++ b/cdsobs/sanity_checks.py @@ -2,7 +2,13 @@ from datetime import datetime from pathlib import Path +try: + import cdsapi +except ImportError: + pass +import pandas import xarray +from cads_adaptors.adaptors.cadsobs.csv import to_csv from cdsobs.cli._object_storage import check_if_missing_in_object_storage from cdsobs.config import CDSObsConfig @@ -58,7 +64,8 @@ def _sanity_check_dataset( dataset_source=dataset_source, latitude_coverage=latitude_coverage, longitude_coverage=longitude_coverage, - time_coverage=(start_date, end_date), + year=[year], + month=[1], format="netCDF", variables=variables_from_service_definition, ) @@ -91,6 +98,70 @@ def _sanity_check_dataset( longitude_coverage, variables_from_service_definition, ) + # Transform to CSV and read the file + csv_path = to_csv(Path(tmpdir), output_path, retrieve_args) + df = pandas.read_csv(csv_path, comment="#") + # Get the equivalent file from the legacy CDS + csv_legacy_path = f"{tmpdir}/{dataset_name}_{dataset_source}.csv-obs.zip" + source_name_mapping = { + "insitu-observations-woudc-ozone-total-column-and-profiles": "observation_type", + "insitu-observations-gnss": "vertical_profile", + "insitu-observations-near-surface-temperature-us-climate-reference-network": "time_aggregation", + "insitu-observations-igra-baseline-network": "archive", + } + + sources_mapping = { + "OzoneSonde": "vertical_profile", + "IGS": "IGS daily", + "EPN": "EPN-repro2", + "IGS_R3": "IGS-repro3", + "uscrn_daily": "Daily", + "uscrn_hourly": "Hourly", + "uscrn_monthly": "Monthly", + "uscrn_subhourly": "Sub - hourly", + "IGRA": "Global radiosonde archive", + "IGRA_H": "Harmonised global radiosonde archive", + } + c = cdsapi.Client() + legacy_params = { + "variable": retrieve_args.params.variables, + "year": [str(yy) for yy in retrieve_args.params.year], # type: ignore + "month": "01", + "day": [ + "01", + "02", + "03", + "08", + "09", + "10", + "13", + "15", + "17", + "20", + "21", + "22", + "23", + "24", + "27", + "28", + "29", + "30", + ], + "format": "csv-obs.zip", + "area": [ + latitude_coverage[1], + longitude_coverage[0], + latitude_coverage[0], + longitude_coverage[1], + ], + } + if dataset_name in source_name_mapping: + legacy_params[source_name_mapping[dataset_name]] = sources_mapping[ + dataset_source + ] + c.retrieve(dataset_name, legacy_params, csv_legacy_path) + df_legacy = pandas.read_csv(csv_legacy_path, comment="#") + pandas.testing.assert_frame_equal(df, df_legacy) def check_retrieved_dataset( diff --git a/tests/system/check_missing_variables.py b/tests/system/check_missing_variables.py new file mode 100644 index 0000000..8223f6d --- /dev/null +++ b/tests/system/check_missing_variables.py @@ -0,0 +1,85 @@ +import os +from pathlib import Path + +import pandas +import pytest +import sqlalchemy as sa + +from cdsobs.api import run_ingestion_pipeline +from cdsobs.cdm.api import open_netcdf +from cdsobs.cdm.lite import auxiliary_variable_names +from cdsobs.ingestion.core import get_aux_vars_from_service_definition +from cdsobs.observation_catalogue.models import Catalogue +from cdsobs.service_definition.api import get_service_definition +from cdsobs.storage import S3Client +from tests.test_api import TEST_API_PARAMETERS +from tests.utils import get_test_years + + +@pytest.mark.parametrize("dataset_name,source", TEST_API_PARAMETERS) +def test_run_ingestion_pipeline( + dataset_name, source, test_session, test_config, caplog, tmp_path +): + start_year, end_year = get_test_years(source) + service_definition = get_service_definition(dataset_name) + os.environ["CADSOBS_AVOID_MULTIPROCESS"] = "0" + run_ingestion_pipeline( + dataset_name, + service_definition, + source, + test_session, + test_config, + start_year=start_year, + end_year=end_year, + update=False, + ) + # Check variables + variable_check_results_file = Path("variable_check_results.csv") + index_cols = ["dataset_name", "dataset_source"] + if variable_check_results_file.exists(): + results = pandas.read_csv(variable_check_results_file, index_col=index_cols) + else: + results = pandas.DataFrame( + columns=[ + "dataset_name", + "dataset_source", + "in_file_not_in_descriptions", + "in_descriptions_not_in_file", + ] + ).set_index(index_cols) + # Get the file + asset = test_session.scalar( + sa.select(Catalogue.asset).where(Catalogue.dataset == dataset_name) + ) + s3client = S3Client.from_config(test_config.s3config) + asset_filename = asset.split("/")[1] + asset_local_path = Path(tmp_path, asset_filename) + s3client.download_file( + s3client.get_bucket_name(dataset_name), asset_filename, asset_local_path + ) + dataset = open_netcdf(asset_local_path, decode_variables=True) + # Get variables in file + variables_in_file = set( + dataset.columns.tolist() + dataset.observed_variable.unique().tolist() + ) + # Get expected variables according to service definition file + aux_variables = get_aux_vars_from_service_definition(service_definition, source) + expected_variables = set(service_definition.sources[source].descriptions) - set( + aux_variables + ) + # Here we add some more variables to expected variables + for v in [ + "observed_variable", + "observation_value", + "units", + ] + auxiliary_variable_names: + if v in variables_in_file: + expected_variables.add(v) + in_file_not_in_descriptions = tuple(variables_in_file - expected_variables) + in_descriptions_not_in_file = tuple(expected_variables - variables_in_file) + + results.loc[(dataset_name, source), :] = pandas.Series( + index=("in_file_not_in_descriptions", "in_descriptions_not_in_file"), + data=[in_file_not_in_descriptions, in_descriptions_not_in_file], + ) + results.to_csv(variable_check_results_file) diff --git a/tests/test_api.py b/tests/test_api.py index dde9daf..9730c4c 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -5,74 +5,66 @@ import sqlalchemy as sa from cdsobs.api import run_ingestion_pipeline, run_make_cdm -from cdsobs.cdm.api import open_netcdf -from cdsobs.cdm.lite import auxiliary_variable_names -from cdsobs.ingestion.core import get_aux_vars_from_service_definition from cdsobs.observation_catalogue.models import Catalogue from cdsobs.service_definition.api import get_service_definition -from cdsobs.storage import S3Client from cdsobs.utils.logutils import get_logger from tests.utils import get_test_years logger = get_logger(__name__) +TEST_API_PARAMETERS = [ + ("insitu-observations-woudc-ozone-total-column-and-profiles", "OzoneSonde"), + ("insitu-observations-woudc-ozone-total-column-and-profiles", "TotalOzone"), + ( + "insitu-observations-igra-baseline-network", + "IGRA", + ), + ( + "insitu-observations-igra-baseline-network", + "IGRA_H", + ), + ( + "insitu-comprehensive-upper-air-observation-network", + "CUON", + ), + ( + "insitu-observations-gruan-reference-network", + "GRUAN", + ), + ( + "insitu-observations-near-surface-temperature-us-climate-reference-network", + "uscrn_subhourly", + ), + ( + "insitu-observations-near-surface-temperature-us-climate-reference-network", + "uscrn_hourly", + ), + ( + "insitu-observations-near-surface-temperature-us-climate-reference-network", + "uscrn_daily", + ), + ( + "insitu-observations-near-surface-temperature-us-climate-reference-network", + "uscrn_monthly", + ), + ( + "insitu-observations-gnss", + "IGS", + ), + ( + "insitu-observations-gnss", + "EPN", + ), + ( + "insitu-observations-gnss", + "IGS_R3", + ), +] -@pytest.mark.parametrize( - "dataset_name,source,test_update", - [ - ( - "insitu-observations-woudc-ozone-total-column-and-profiles", - "OzoneSonde", - False, - ), - ( - "insitu-observations-woudc-ozone-total-column-and-profiles", - "TotalOzone", - False, - ), - ("insitu-observations-igra-baseline-network", "IGRA", False), - ("insitu-observations-igra-baseline-network", "IGRA_H", False), - ("insitu-comprehensive-upper-air-observation-network", "CUON", False), - ("insitu-observations-gruan-reference-network", "GRUAN", False), - ( - "insitu-observations-near-surface-temperature-us-climate-reference-network", - "uscrn_subhourly", - False, - ), - ( - "insitu-observations-near-surface-temperature-us-climate-reference-network", - "uscrn_hourly", - False, - ), - ( - "insitu-observations-near-surface-temperature-us-climate-reference-network", - "uscrn_daily", - False, - ), - ( - "insitu-observations-near-surface-temperature-us-climate-reference-network", - "uscrn_monthly", - False, - ), - ( - "insitu-observations-gnss", - "IGS", - False, - ), - ( - "insitu-observations-gnss", - "EPN", - False, - ), - ( - "insitu-observations-gnss", - "IGS_R3", - False, - ), - ], -) + +@pytest.mark.parametrize("dataset_name,source", TEST_API_PARAMETERS) def test_run_ingestion_pipeline( - dataset_name, source, test_update, test_session, test_config, caplog, tmp_path + dataset_name, source, test_session, test_config, caplog, tmp_path ): start_year, end_year = get_test_years(source) service_definition = get_service_definition(dataset_name) @@ -94,62 +86,6 @@ def test_run_ingestion_pipeline( .where(Catalogue.dataset == dataset_name) ) assert counter > 0 - # Check variables - asset = test_session.scalar( - sa.select(Catalogue.asset).where(Catalogue.dataset == dataset_name) - ) - s3client = S3Client.from_config(test_config.s3config) - asset_filename = asset.split("/")[1] - asset_local_path = Path(tmp_path, asset_filename) - s3client.download_file( - s3client.get_bucket_name(dataset_name), asset_filename, asset_local_path - ) - dataset = open_netcdf(asset_local_path, decode_variables=True) - variables_in_file = set( - dataset.columns.tolist() + dataset.observed_variable.unique().tolist() - ) - aux_variables = get_aux_vars_from_service_definition(service_definition, source) - expected_variables = set(service_definition.sources[source].descriptions) - set( - aux_variables - ) - for v in [ - "observed_variable", - "observation_value", - "units", - ] + auxiliary_variable_names: - if v in variables_in_file: - expected_variables.add(v) - logger.info( - f"{variables_in_file - expected_variables} are in file but not in the descriptions" - ) - logger.info( - f"{expected_variables - variables_in_file} are not in file but are in the description" - ) - # assert variables_in_file == expected_variables - - if test_update: - # testing update flag - run_ingestion_pipeline( - dataset_name, - service_definition, - source, - test_session, - test_config, - start_year=start_year, - end_year=end_year, - update=False, - ) - - found_log = [ - "A partition with the chosen parameters already exists" in r.msg - for r in caplog.records - ] - assert any(found_log) - # no insertions have been made - assert ( - test_session.scalar(sa.select(sa.func.count()).select_from(Catalogue)) - == counter - ) def test_make_cdm(test_config, tmp_path, caplog): diff --git a/tests/test_run_sanity_checks.py b/tests/test_run_sanity_checks.py index bba82bb..888d5a8 100644 --- a/tests/test_run_sanity_checks.py +++ b/tests/test_run_sanity_checks.py @@ -1,7 +1,10 @@ +import pytest + from cdsobs.constants import TEST_YEARS from cdsobs.sanity_checks import run_sanity_checks +@pytest.mark.skip("Depends on cdsapi") def test_run_sanity_checks(test_config, test_repository): run_sanity_checks( test_config,