From 763767f930e512d536e468fa5d517dcdee1212d5 Mon Sep 17 00:00:00 2001 From: dwest77 Date: Fri, 20 Dec 2024 13:15:52 +0000 Subject: [PATCH 01/35] Test typing rendering for documentation --- docs/source/errors.rst | 2 +- docs/source/filehandlers.rst | 8 ++ docs/source/index.rst | 1 + padocc/core/errors.py | 258 +++++++++++++++++++++++++++++------ padocc/core/filehandlers.py | 110 ++++++++++----- padocc/core/logs.py | 11 +- poetry.lock | 13 +- pyproject.toml | 1 + 8 files changed, 326 insertions(+), 78 deletions(-) create mode 100644 docs/source/filehandlers.rst diff --git a/docs/source/errors.rst b/docs/source/errors.rst index 1a379fe..8c38042 100644 --- a/docs/source/errors.rst +++ b/docs/source/errors.rst @@ -3,6 +3,6 @@ Custom Pipeline Errors **A summary of the custom errors that are experienced through running the pipeline.** -.. automodule:: pipeline.errors +.. automodule:: padocc.core.errors :members: :show-inheritance: \ No newline at end of file diff --git a/docs/source/filehandlers.rst b/docs/source/filehandlers.rst new file mode 100644 index 0000000..8f730f3 --- /dev/null +++ b/docs/source/filehandlers.rst @@ -0,0 +1,8 @@ +Padocc Filehandlers +====================== + +**A summary of the custom errors that are experienced through running the pipeline.** + +.. automodule:: padocc.core.filehandlers + :members: + :show-inheritance: \ No newline at end of file diff --git a/docs/source/index.rst b/docs/source/index.rst index ad097fc..fcd1763 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -48,6 +48,7 @@ The pipeline consists of four central phases, with an additional phase for inges :maxdepth: 1 :caption: Pipeline Source: + Filehandlers Initialisation Scanning Compute diff --git a/padocc/core/errors.py b/padocc/core/errors.py index 8b215e9..f901f0c 100644 --- a/padocc/core/errors.py +++ b/padocc/core/errors.py @@ -7,7 +7,7 @@ import logging import traceback -from typing import Optional +from typing import Optional, Union from .filehandlers import CSVFileHandler @@ -15,9 +15,9 @@ def error_handler( err : Exception, logger: logging.Logger, phase: str, + dryrun: bool = False, + subset_bypass: bool = False, jobid: Optional[str] = None, - dryrun: Optional[bool] = False, - subset_bypass: Optional[bool] = False, status_fh: Optional[CSVFileHandler] = None ): @@ -60,14 +60,22 @@ def get_status(tb: list) -> str: class KerchunkException(Exception): - def __init__(self, proj_code, groupdir): + def __init__(self, proj_code: Union[str,None], groupdir: Union[str,None]) -> None: self.proj_code = proj_code self.groupdir = groupdir - super().__init__(self.message) + if hasattr(self, 'message'): + msg = getattr(self,'message') + super().__init__(msg) class PartialDriverError(KerchunkException): """All drivers failed (NetCDF3/Hdf5/Tiff) for one or more files within the list""" - def __init__(self,filenums=None, verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + filenums: Union[int,None] = None, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: self.message = f"All drivers failed when performing conversion for files {filenums}" super().__init__(proj_code, groupdir) if verbose < 1: @@ -77,7 +85,12 @@ def get_str(self): class NaNComparisonError(KerchunkException): """When comparing NaN values between objects - different values found""" - def __init__(self, verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: self.message = f"NaN values do not match between comparison objects" super().__init__(proj_code, groupdir) if verbose < 1: @@ -87,7 +100,14 @@ def get_str(self): class RemoteProtocolError(KerchunkException): """All drivers failed (NetCDF3/Hdf5/Tiff) for one or more files within the list""" - def __init__(self,filenums=None, verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + filenums: Union[int,None] = None, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: + self.message = f"All drivers failed when performing conversion for files {filenums}" super().__init__(proj_code, groupdir) if verbose < 1: @@ -97,7 +117,12 @@ def get_str(self): class KerchunkDriverFatalError(KerchunkException): """All drivers failed (NetCDF3/Hdf5/Tiff) - run without driver bypass to assess the issue with each driver type.""" - def __init__(self,verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: self.message = "All drivers failed when performing conversion" super().__init__(proj_code, groupdir) if verbose < 1: @@ -107,7 +132,12 @@ def get_str(self): class IdenticalVariablesError(KerchunkException): """All variables found to be suitably identical between files as to not stack or concatenate""" - def __init__(self,verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: self.message = "All variables are identical across files" super().__init__(proj_code, groupdir) if verbose < 1: @@ -117,7 +147,15 @@ def get_str(self): class XKShapeToleranceError(KerchunkException): """Attempted validation using a tolerance for shape mismatch on concat-dims, shape difference exceeds tolerance allowance.""" - def __init__(self,tolerance=0, diff=0, dim='',verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + tolerance: int = 0, + diff: int = 0, + dim: str = '', + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: self.message = f"Shape difference ({diff}) exceeds allowed tolerance ({tolerance}) for dimension ({dim})" super().__init__(proj_code, groupdir) if verbose < 1: @@ -127,7 +165,12 @@ def get_str(self): class BlacklistProjectCode(KerchunkException): """The project code you are trying to run for is on the list of project codes to ignore.""" - def __init__(self, verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: self.message = 'Project Code listed in blacklist for bad data - will not be processed.' super().__init__(proj_code, groupdir) if verbose < 1: @@ -137,7 +180,13 @@ def get_str(self): class MissingVariableError(KerchunkException): """A variable is missing from the environment or set of arguments.""" - def __init__(self, vtype='$', verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + vtype: str = "$", + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: self.message = f'Missing variable: {vtype}' super().__init__(proj_code, groupdir) if verbose < 1: @@ -147,7 +196,14 @@ def get_str(self): class ExpectTimeoutError(KerchunkException): """The process is expected to time out given timing estimates.""" - def __init__(self, required=0, current='', verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + required: int = 0, + current: str = '', + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: self.message = f'Scan requires minimum {required} - current {current}' super().__init__(proj_code, groupdir) if verbose < 1: @@ -157,7 +213,14 @@ def get_str(self): class ExpectMemoryError(KerchunkException): """The process is expected to run out of memory given size estimates.""" - def __init__(self, required='', current='', verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + required: int = 0, + current: str = '', + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: self.message = f'Scan requires minimum {required} - current {current}' super().__init__(proj_code, groupdir) if verbose < 1: @@ -167,7 +230,12 @@ def get_str(self): class ProjectCodeError(KerchunkException): """Could not find the correct project code from the list of project codes for this run.""" - def __init__(self, verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: self.message = f'Project Code Extraction Failed' super().__init__(proj_code, groupdir) if verbose < 1: @@ -177,7 +245,13 @@ def get_str(self): class FilecapExceededError(KerchunkException): """During scanning, could not find suitable files within the set of files specified.""" - def __init__(self, nfiles=0, verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + nfiles: int = 0, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: self.message = f'Filecap exceeded: {nfiles} files attempted' super().__init__(proj_code, groupdir) if verbose < 1: @@ -187,7 +261,12 @@ def get_str(self): class ChunkDataError(KerchunkException): """Overflow Error from pandas during decoding of chunk information, most likely caused by bad data retrieval.""" - def __init__(self, verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: self.message = f'Decoding resulted in overflow - received chunk data contains junk (attempted 3 times)' super().__init__(proj_code, groupdir) if verbose < 1: @@ -197,7 +276,13 @@ def get_str(self): class NoValidTimeSlicesError(KerchunkException): """Unable to find any time slices to test within the object.""" - def __init__(self, message='Kerchunk', verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + message: str = 'kerchunk', + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: self.message = f'No valid timeslices found for {message}' super().__init__(proj_code, groupdir) if verbose < 1: @@ -207,7 +292,15 @@ def get_str(self): class VariableMismatchError(KerchunkException): """During testing, variables present in the NetCDF file are not present in Kerchunk""" - def __init__(self, missing={}, verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + missing: Union[dict, None] = None, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: + missing = missing or {} + self.message = f'Missing variables {missing} in Kerchunk file' super().__init__(proj_code, groupdir) if verbose < 1: @@ -217,7 +310,20 @@ def get_str(self): class ShapeMismatchError(KerchunkException): """Shapes of ND arrays do not match between Kerchunk and Xarray objects - when using a subset of the Netcdf files.""" - def __init__(self, var={}, first={}, second={}, verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + var: Union[dict,None] = None, + first: Union[dict,None] = None, + second: Union[dict,None] = None, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: + + var = var or {} + first = first or {} + second = second or {} + self.message = f'Kerchunk/NetCDF mismatch for variable {var} with shapes - K {first} vs X {second}' super().__init__(proj_code, groupdir) if verbose < 1: @@ -227,8 +333,15 @@ def get_str(self): class TrueShapeValidationError(KerchunkException): """Shapes of ND arrays do not match between Kerchunk and Xarray objects - when using the complete set of files.""" - def __init__(self, message='Kerchunk', verbose=0, proj_code=None, groupdir=None): - self.message = f'Kerchunk/NetCDF mismatch with shapes using full dataset - check logs' + def __init__( + self, + message: str = 'kerchunk', + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: + + self.message = f'{message} mismatch with shapes using full dataset - check logs' super().__init__(proj_code, groupdir) if verbose < 1: self.__class__.__module__ = 'builtins' @@ -237,7 +350,13 @@ def get_str(self): class NoOverwriteError(KerchunkException): """Output file already exists and the process does not have forceful overwrite (-f) set.""" - def __init__(self, verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: + self.message = 'Output file already exists and forceful overwrite not set.' super().__init__(proj_code, groupdir) if verbose < 1: @@ -247,8 +366,13 @@ def get_str(self): class MissingKerchunkError(KerchunkException): """Kerchunk file not found.""" - def __init__(self, message="No suitable kerchunk file found for validation.", verbose=0, proj_code=None, groupdir=None): - self.message = message + def __init__( + self, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: + self.message = "No suitable kerchunk file found." super().__init__(proj_code, groupdir) if verbose < 1: self.__class__.__module__ = 'builtins' @@ -257,8 +381,13 @@ def get_str(self): class ValidationError(KerchunkException): """One or more checks within validation have failed - most likely elementwise comparison of data.""" - def __init__(self, message="Fatal comparison failure for Kerchunk/NetCDF", verbose=0, proj_code=None, groupdir=None): - self.message = message + def __init__( + self, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: + self.message = "Fatal Validation Error" super().__init__(proj_code, groupdir) if verbose < 1: self.__class__.__module__ = 'builtins' @@ -267,8 +396,13 @@ def get_str(self): class ComputeError(KerchunkException): """Compute stage failed - likely due to invalid config/use of the classes""" - def __init__(self, message="Invalid configuration for the Compute stage", verbose=0, proj_code=None, groupdir=None): - self.message = message + def __init__( + self, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: + self.message = "Invalid configuration for the Compute stage" super().__init__(proj_code, groupdir) if verbose < 1: self.__class__.__module__ = 'builtins' @@ -277,8 +411,14 @@ def get_str(self): class SoftfailBypassError(KerchunkException): """Validation could not be completed because some arrays only contained NaN values which cannot be compared.""" - def __init__(self, message="Kerchunk validation failed softly with no bypass - rerun with bypass flag", verbose=0, proj_code=None, groupdir=None): - self.message = message + def __init__( + self, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: + + self.message = "Kerchunk validation failed softly with no bypass - rerun with bypass flag" super().__init__(proj_code, groupdir) if verbose < 1: self.__class__.__module__ = 'builtins' @@ -287,8 +427,14 @@ def get_str(self): class ConcatenationError(KerchunkException): """Variables could not be concatenated over time and are not duplicates - no known solution""" - def __init__(self, message="Variables could not be concatenated over time and are not duplicates - no known solution", verbose=0, proj_code=None, groupdir=None): - self.message = message + def __init__( + self, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: + + self.message = "Variables could not be concatenated over time and are not duplicates - no known solution" super().__init__(proj_code, groupdir) if verbose < 1: self.__class__.__module__ = 'builtins' @@ -297,7 +443,16 @@ def get_str(self): class ConcatFatalError(KerchunkException): """Chunk sizes differ between refs - files cannot be concatenated""" - def __init__(self, var=None, chunk1=None, chunk2=None, verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + var: Union[str,None] = None, + chunk1: Union[int,None] = None, + chunk2: Union[int,None] = None, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: + self.message = f"Chunk sizes differ between refs for {var}: {chunk1} - {chunk2} - files cannot be concatenated" super().__init__(proj_code, groupdir) if verbose < 1: @@ -307,7 +462,14 @@ def get_str(self): class SourceNotFoundError(KerchunkException): """Source File could not be located.""" - def __init__(self, sfile=None, verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + sfile: Union[str, None], + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: + self.message = f"Source file could not be located: {sfile}" super().__init__(proj_code, groupdir) if verbose < 1: @@ -318,7 +480,13 @@ def get_str(self): # Potentially useful but currently unused. class ArchiveConnectError(KerchunkException): """Connection to the CEDA Archive could not be established""" - def __init__(self, verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: + self.message = f"Connection verification to the CEDA archive failed - {proj_code}" super().__init__(proj_code, groupdir) if verbose < 1: @@ -328,7 +496,13 @@ def get_str(self): class KerchunkDecodeError(KerchunkException): """Decoding of Kerchunk file failed - likely a time array issue.""" - def __init__(self, verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: + self.message = f"Decoding of Kerchunk file failed - likely a time array issue." super().__init__(proj_code, groupdir) if verbose < 1: @@ -338,7 +512,13 @@ def get_str(self): class FullsetRequiredError(KerchunkException): """This project must be validated using the full set of files.""" - def __init__(self, verbose=0, proj_code=None, groupdir=None): + def __init__( + self, + verbose: int = 0, + proj_code: Union[str,None] = None, + groupdir: Union[str,None] = None + ) -> None: + self.message = f"This project must be validated by opening the full set of files." super().__init__(proj_code, groupdir) if verbose < 1: diff --git a/padocc/core/filehandlers.py b/padocc/core/filehandlers.py index 20da9e0..7b69ca7 100644 --- a/padocc/core/filehandlers.py +++ b/padocc/core/filehandlers.py @@ -8,6 +8,7 @@ from datetime import datetime import logging from typing import Generator +from typing import Optional, Union from padocc.core import LoggedOperation, FalseLogger @@ -41,12 +42,12 @@ def __init__( self, dir : str, filename : str, - logger : logging.Logger | FalseLogger = None, - label : str = None, - fh : str = None, - logid : str = None, - dryrun : bool = None, - forceful : bool = None, + logger : Optional[Union[logging.Logger,FalseLogger]] = None, + label : Union[str,None] = None, + fh : Optional[str] = None, + logid : Optional[str] = None, + dryrun : Optional[bool] = None, + forceful : bool = False, verbose : int = 0 ) -> None: """ @@ -93,7 +94,7 @@ def __init__( logid=logid, verbose=verbose) - def __contains__(self, item) -> bool: + def __contains__(self, item: object) -> bool: """ Enables checking 'if x in fh'. """ @@ -132,15 +133,21 @@ def close(self): self.logger.debug(f'Saving file {self._file}') self._set_content() - def set(self, value): + def set(self, value: object, index: Union[object,None] = None): """ Reset the whole value of the private ``_value`` attribute. """ self._check_value() - self._value = value + if index is not None: + self._value[index] = value + else: + self._value = value - def get(self, index: str = None, default: str = None): + def get( + self, + index: object = None, + default: object = None): """ Get the value of the private ``_value`` attribute. Can also get a parameter from this item as you would with a dictionary, if possible @@ -151,12 +158,23 @@ def get(self, index: str = None, default: str = None): if index is None: return self._value + # Issue #1 try: return self._value.get(index, default) except AttributeError: raise AttributeError( f'Filehandler for {self._file} does not support getting specific items.' ) + + def _set_file(self): + raise NotImplementedError( + 'Set file not implemented for FileIO Mixin - please use a filehandler.' + ) + + def _get_content(self): + raise NotImplementedError( + 'Get content not implemented for FileIO Mixin - please use a filehandler.' + ) def _check_save(self) -> bool: """ @@ -200,11 +218,12 @@ def __len__(self) -> int: def __iter__(self) -> Generator[str, None, None]: """Iterator for the set of values""" + # Issue #2 for i in self._value: if i is not None: yield i - def __getitem__(self, index): + def __getitem__(self, index: object): """ Override FileIOMixin class for getting index """ @@ -218,7 +237,7 @@ def __getitem__(self, index): return self._value[index] - def __setitem__(self, index: int, value) -> None: + def __setitem__(self, index: object, value: object) -> None: """ Enables setting items in filehandlers 'fh[0] = 1' """ @@ -232,11 +251,11 @@ def __setitem__(self, index: int, value) -> None: self._value[index] = value - def append(self, newvalue) -> None: + def append(self, newvalue: object) -> None: """Add a new value to the internal list""" self._value.append(newvalue) - def set(self, value: list): + def set(self, value: object): """ Extends the set function of the parent, creates a copy of the input list so the original parameter is preserved. @@ -272,11 +291,11 @@ def __init__( dir: str, filename: str, logger: logging.Logger | FalseLogger = None, - conf: dict = None, + conf: Union[dict,None] = None, **kwargs ) -> None: - self._conf = conf + self._conf = conf or {} super().__init__(dir, filename, logger=logger, **kwargs) def __str__(self) -> str: @@ -323,37 +342,37 @@ def __setitem__(self, index: str, value) -> None: self._value[index] = value return None - def set(self, value: dict): + def set(self, value: object, index: Union[object,None] = None) -> None: """ Wrapper to create a detached dict copy """ - super().set(dict(value)) + super().set(dict(value), index=index) - def _set_file(self): + def _set_file(self) -> None: if '.json' not in self._file: self._file = f'{self.dir}/{self._file}.json' else: self._file = f'{self.dir}/{self._file}' # Get/set routines for the filesystem files. - def _get_content(self): + def _get_content(self) -> None: if self.file_exists(): try: with open(self._file) as f: self._value = json.load(f) except json.decoder.JSONDecodeError: - self._value={} + self._value = {} else: self.create_file() self._value = {} - def _set_content(self): + def _set_content(self) -> None: if super()._check_save(): self._apply_conf() with open(self._file,'w') as f: f.write(json.dumps(self._value)) - def _apply_conf(self): + def _apply_conf(self) -> None: """ Update value with properties from conf - fill missing values. @@ -368,8 +387,16 @@ def _apply_conf(self): self._conf = None class KerchunkFile(JSONFileHandler): + """ + Filehandler for Kerchunk file, enables substitution/replacement + for local/remote links, and updating content. + """ - def add_download_link(self) -> dict: + def add_download_link( + self, + sub: str = '/', + replace: str = 'https://dap.ceda.ac.uk' + ) -> None: """ Add the download link to this Kerchunk File """ @@ -377,12 +404,12 @@ def add_download_link(self) -> dict: for key in refs.keys(): if len(refs[key]) == 3: - if refs[key][0][0] == '/': - refs[key][0] = 'https://dap.ceda.ac.uk' + refs[key][0] + if refs[key][0][0] == sub: + refs[key][0] = replace + refs[key][0] self.set(refs) - def add_kerchunk_history(self, version_no) -> dict: + def add_kerchunk_history(self, version_no: str) -> None: """ Add kerchunk variables to the metadata for this dataset, including creation/update date and version/revision number. @@ -415,7 +442,11 @@ def add_kerchunk_history(self, version_no) -> dict: self.set(attrs, index='refs') class ZarrStore(FileIOMixin): - def clear(self): + """ + Filehandler for Zarr store in Padocc - enables Filesystem + operations on component files. + """ + def clear(self) -> None: if not self._dryrun: os.system(f'rm -rf {self._file}') else: @@ -426,6 +457,9 @@ def _set_file(self): self._file = f'{self.dir}/{self.filename}' class TextFileHandler(ListIOMixin): + """ + Filehandler for text files. + """ description = "Text File handler for padocc config files." def _set_file(self): @@ -435,22 +469,32 @@ def _set_file(self): self._file = f'{self.dir}/{self._file}' class LogFileHandler(ListIOMixin): + """Log File handler for padocc phase logs.""" description = "Log File handler for padocc phase logs." - def __init__(self, dir, filename, logger, extra_path, **kwargs): + def __init__( + self, + dir: str, + filename: str, + extra_path: str = '', + logger: Union[logging.Logger, FalseLogger, None] = None, + **kwargs + ) -> None: + self._extra_path = extra_path super().__init__(dir, filename, logger, **kwargs) - def _set_file(self): + def _set_file(self) -> None: self._file = f'{self.dir}/{self._extra_path}{self._file}.log' class CSVFileHandler(ListIOMixin): + """CSV File handler for padocc config files""" description = "CSV File handler for padocc config files" - def _set_file(self): + def _set_file(self) -> None: self._file = f'{self.dir}/{self._file}.csv' - def __iter__(self): + def __iter__(self) -> Generator[str]: for i in self._value: if i is not None: yield i.replace(' ','').split(',') @@ -459,7 +503,7 @@ def update_status( self, phase: str, status: str, - jobid : str = '', + jobid : Optional[str] = None, dryrun: bool = False ) -> None: diff --git a/padocc/core/logs.py b/padocc/core/logs.py index 8291cd9..394427a 100644 --- a/padocc/core/logs.py +++ b/padocc/core/logs.py @@ -4,6 +4,9 @@ import logging import os +from typing import Union, Optional + +from .utils import FalseLogger levels = [ logging.WARN, @@ -25,10 +28,10 @@ class LoggedOperation: """ def __init__( self, - logger : logging.Logger = None, - label : str = None, - fh : str = None, - logid : str = None, + logger : Union[logging.Logger,FalseLogger, None] = None, + label : Union[str,None] = None, + fh : Union[str,None] = None, + logid : Union[str,None] = None, verbose: int = 0 ) -> None: diff --git a/poetry.lock b/poetry.lock index 50efa2f..9df7048 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3789,6 +3789,17 @@ files = [ docs = ["myst-parser", "pydata-sphinx-theme", "sphinx"] test = ["argcomplete (>=3.0.3)", "mypy (>=1.7.0)", "pre-commit", "pytest (>=7.0,<8.2)", "pytest-mock", "pytest-mypy-testing"] +[[package]] +name = "types-pyyaml" +version = "6.0.12.20240917" +description = "Typing stubs for PyYAML" +optional = false +python-versions = ">=3.8" +files = [ + {file = "types-PyYAML-6.0.12.20240917.tar.gz", hash = "sha256:d1405a86f9576682234ef83bcb4e6fff7c9305c8b1fbad5e0bcd4f7dbdc9c587"}, + {file = "types_PyYAML-6.0.12.20240917-py3-none-any.whl", hash = "sha256:392b267f1c0fe6022952462bf5d6523f31e37f6cea49b14cee7ad634b6301570"}, +] + [[package]] name = "typing-extensions" version = "4.12.2" @@ -4190,4 +4201,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "f69dd587cefd00c0517ce729f654c5ab80071f997bbeff01f87819fccb956feb" +content-hash = "dccf0780de448b44fa42f42f973c943f2701c6fe8852b261409ceb5996924eab" diff --git a/pyproject.toml b/pyproject.toml index bdb939b..a6174a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ sphinx = "7.1.2" sphinx-rtd-theme = "2.0.0" cfapyx = "2024.11.27" myst-nb = "^1.1.2" +types-pyyaml = "^6.0.12.20240917" [build-system] From b26f7a2a546821b47e6d636225a69cc03eb97979 Mon Sep 17 00:00:00 2001 From: dwest77 Date: Fri, 20 Dec 2024 14:25:18 +0000 Subject: [PATCH 02/35] Removed import issue --- padocc/core/logs.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/padocc/core/logs.py b/padocc/core/logs.py index 394427a..057c1f7 100644 --- a/padocc/core/logs.py +++ b/padocc/core/logs.py @@ -6,8 +6,6 @@ import os from typing import Union, Optional -from .utils import FalseLogger - levels = [ logging.WARN, logging.INFO, From 0def9e2fdea0a123e52cce87370fbfc806eb828b Mon Sep 17 00:00:00 2001 From: dwest77 Date: Fri, 20 Dec 2024 14:30:01 +0000 Subject: [PATCH 03/35] Fixed issue with FalseLogger --- padocc/core/logs.py | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/padocc/core/logs.py b/padocc/core/logs.py index 057c1f7..a42ef17 100644 --- a/padocc/core/logs.py +++ b/padocc/core/logs.py @@ -20,6 +20,23 @@ 'G': 1000000000 } + +class FalseLogger: + """ + Supplementary class where a logger is not wanted but is required for + some operations. + """ + def __init__(self): + pass + def debug(self, message: str): + pass + def info(self, message: str): + pass + def warning(self, message: str): + pass + def error(self, message: str): + pass + class LoggedOperation: """ Allows inherritance of logger objects without creating new ones. @@ -46,22 +63,6 @@ def __init__( else: self.logger = logger -class FalseLogger: - """ - Supplementary class where a logger is not wanted but is required for - some operations. - """ - def __init__(self): - pass - def debug(self, message: str): - pass - def info(self, message: str): - pass - def warning(self, message: str): - pass - def error(self, message: str): - pass - def reset_file_handler( logger : logging.Logger, verbose : int, From c1245aa4c16f3ee9c683549456f375f34329cdf3 Mon Sep 17 00:00:00 2001 From: dwest77 Date: Fri, 20 Dec 2024 14:32:45 +0000 Subject: [PATCH 04/35] Switched to iterator over generator --- padocc/core/filehandlers.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/padocc/core/filehandlers.py b/padocc/core/filehandlers.py index 7b69ca7..ae64dd3 100644 --- a/padocc/core/filehandlers.py +++ b/padocc/core/filehandlers.py @@ -7,7 +7,7 @@ import yaml from datetime import datetime import logging -from typing import Generator +from typing import Iterator from typing import Optional, Union from padocc.core import LoggedOperation, FalseLogger @@ -216,7 +216,7 @@ def __len__(self) -> int: self.logger.debug(f'content length: {len(content)}') return len(content) - def __iter__(self) -> Generator[str, None, None]: + def __iter__(self) -> Iterator[str]: """Iterator for the set of values""" # Issue #2 for i in self._value: @@ -308,7 +308,7 @@ def __len__(self) -> int: return len(self._value.keys()) - def __iter__(self) -> Generator[str, None, None]: + def __iter__(self) -> Iterator[str]: """Iterate over set of keys.""" self._check_value() @@ -494,7 +494,7 @@ class CSVFileHandler(ListIOMixin): def _set_file(self) -> None: self._file = f'{self.dir}/{self._file}.csv' - def __iter__(self) -> Generator[str]: + def __iter__(self) -> Iterator[str]: for i in self._value: if i is not None: yield i.replace(' ','').split(',') From 3978bf17cd221793212d35f28fddb9b34120e790 Mon Sep 17 00:00:00 2001 From: dwest77 Date: Fri, 20 Dec 2024 14:41:33 +0000 Subject: [PATCH 05/35] Updated all auto docs --- docs/source/allocation.rst | 6 -- docs/source/assess-overview.rst | 136 ------------------------------- docs/source/assess.rst | 5 -- docs/source/compute.rst | 2 +- docs/source/execution-source.rst | 8 -- docs/source/extras.rst | 4 +- docs/source/group.rst | 9 ++ docs/source/init.rst | 6 -- docs/source/project.rst | 9 ++ docs/source/scan.rst | 2 +- docs/source/validate.rst | 2 +- 11 files changed, 23 insertions(+), 166 deletions(-) delete mode 100644 docs/source/allocation.rst delete mode 100644 docs/source/assess-overview.rst delete mode 100644 docs/source/assess.rst delete mode 100644 docs/source/execution-source.rst create mode 100644 docs/source/group.rst delete mode 100644 docs/source/init.rst create mode 100644 docs/source/project.rst diff --git a/docs/source/allocation.rst b/docs/source/allocation.rst deleted file mode 100644 index 3c8cb97..0000000 --- a/docs/source/allocation.rst +++ /dev/null @@ -1,6 +0,0 @@ -================= -Allocation Module -================= - -.. automodule:: pipeline.allocate - :members: \ No newline at end of file diff --git a/docs/source/assess-overview.rst b/docs/source/assess-overview.rst deleted file mode 100644 index 9d8368f..0000000 --- a/docs/source/assess-overview.rst +++ /dev/null @@ -1,136 +0,0 @@ -Assessor Tool -============= - -The assessor script ```assess.py``` is an all-purpose pipeline checking tool which can be used to assess: - - The current status of all datasets within a given group in the pipeline (which phase each dataset currently sits in) - - The errors/outputs associated with previous job runs. - - Specific logs from datasets which are presenting a specific type of error. - -An example command to run the assessor tool can be found below: -:: - - python assess.py - -Where the operation can be one of the below options: - - Progress: Get a general overview of the pipeline; how many datasets have completed or are stuck on each phase. - - Display: Display a specific type of information about the pipeline (blacklisted codes, datasets with virtual dimensions or using parquet) - - Match: Match specific attributes within the ``detail-cfg.json`` file (and save to a new ID). - - Summarise: Get an assessment of the data processed for this group - - Upgrade: Update the version of a set of kerchunk files (includes internal metadata standard updates (timestamped, reason provided). - - Cleanup: Remove cached files as part of group runs (errs, outs, repeat_ids etc.) - -1. Progress of a group ----------------------- - -To see the general status of the pipeline for a given group: -:: - - python assess.py progress - -An example output from this command can be seen below: -:: - - Group: cci_group_v1 - Total Codes: 361 - - scan : 1 [0.3 %] (Variety: 1) - - Complete : 1 - - complete : 185 [51.2%] (Variety: 1) - - complete : 185 - - unknown : 21 [5.8 %] (Variety: 1) - - no data : 21 - - blacklist : 162 [44.9%] (Variety: 7) - - NonKerchunkable : 50 - - PartialDriver : 3 - - PartialDriverFail : 5 - - ExhaustedMemoryLimit : 64 - - ExhaustedTimeLimit : 18 - - ExhaustedTimeLimit* : 1 - - ValidationMemoryLimit : 21 - -In this case there are 185 datasets that have completed the pipeline with 1 left to be scanned. The 21 unknowns have no log file so there is no information on these. This will be resolved in later versions where a `seek` function will automatically run when checking the progress, to fix gaps in the logs for missing datasets. - - -An example use case is to write out all datasets that require scanning to a new label (repeat_label): -:: - - python assess.py progress -p scan -r -W - - -The last flag ```-W``` is required when writing an output file from this program, otherwise the program will dryrun and produce no files. - -1.1. Checking errors --------------------- -Check what repeat labels are available already using: -:: - - python assess.py display -s labels - -For listing the status of all datasets from a previous repeat idL -:: - - python assess.py progress -r - - -For selecting a specific type of error (-e) and examine the full log for each example (-E) -:: - - python assess.py progress -r -e "type_of_error" -p scan -E - -Following from this, you may want to rerun the pipeline for just one type of error previously found: -:: - - python assess.py progress -r -e "type_of_error" -p scan -n -W - -.. Note:: - - If you are looking at a specific repeat ID, you can forego the phase (-p) flag, since it is expected this set would appear in the same phase anyway. - The (-W) write flag is also required for any commands that would output data to a file. If the file already exists, you will need to specify an override - level (-O or -OO) for merging or overwriting existing data (project code lists) respectively. - -2. Display options --------------------------- - -Check how many of the datasets in a group have virtual dimensions -:: - - python assess.py display -s virtuals - -3. Match Special Attributes ---------------------------- - -Find the project codes where a specific attribute in ``detail-cfg.json`` matches some given value -:: - - python assess.py match -c "links_added:False" - -4. Summarise data ------------------ - -Summarise the Native/Kerchunk data generated (thus far) for an existing group. -:: - - python assess.py summarise - -5. Upgrade Kerchunk version ---------------------------- - -Upgrade all kerchunk files (compute-validate stages) to a new version for a given reason. This is the 'formal' way of updating the version. -:: - - python assess.py upgrade -r -R "Reason for upgrade" -W -U "krX.X" # New version id - -6. Cleanup ----------- - -"Clean" or remove specific types of files: - - Errors/Outputs in the correct places - - "labels" i.e repeat_ids (including allocations and bands under that repeat_id) - -In the below example we will remove every created ``repeat_id`` (equivalent terminology to 'label') except for ``main``. -:: - - python assess.py cleanup -c labels diff --git a/docs/source/assess.rst b/docs/source/assess.rst deleted file mode 100644 index c508e61..0000000 --- a/docs/source/assess.rst +++ /dev/null @@ -1,5 +0,0 @@ -Assess Module -============= - -.. automodule:: assess - :members: \ No newline at end of file diff --git a/docs/source/compute.rst b/docs/source/compute.rst index d5c6fc3..d523f88 100644 --- a/docs/source/compute.rst +++ b/docs/source/compute.rst @@ -2,6 +2,6 @@ Compute Module ============== -.. automodule:: pipeline.compute +.. automodule:: padocc.phases.compute :members: :show-inheritance: \ No newline at end of file diff --git a/docs/source/execution-source.rst b/docs/source/execution-source.rst deleted file mode 100644 index 399113e..0000000 --- a/docs/source/execution-source.rst +++ /dev/null @@ -1,8 +0,0 @@ -Pipeline Execution -================== - -.. automodule:: group_run - :members: - -.. automodule:: single_run - :members: \ No newline at end of file diff --git a/docs/source/extras.rst b/docs/source/extras.rst index de6e1dc..4176c89 100644 --- a/docs/source/extras.rst +++ b/docs/source/extras.rst @@ -5,7 +5,7 @@ Padocc Utility Scripts Utilities ========= -.. automodule:: pipeline.utils +.. automodule:: padocc.core.utils :members: :show-inheritance: @@ -13,6 +13,6 @@ Utilities Logging ======= -.. automodule:: pipeline.logs +.. automodule:: pipeline.core.logs :members: :show-inheritance: \ No newline at end of file diff --git a/docs/source/group.rst b/docs/source/group.rst new file mode 100644 index 0000000..9e49bc6 --- /dev/null +++ b/docs/source/group.rst @@ -0,0 +1,9 @@ +======================================== +GroupOperation Core and Mixin Behaviours +======================================== + +.. automodule:: padocc.operations.group + :members: + +.. automodule:: padocc.operations.mixins + :members: \ No newline at end of file diff --git a/docs/source/init.rst b/docs/source/init.rst deleted file mode 100644 index 1b3bdbc..0000000 --- a/docs/source/init.rst +++ /dev/null @@ -1,6 +0,0 @@ -===================== -Initialisation Module -===================== - -.. automodule:: pipeline.init - :members: \ No newline at end of file diff --git a/docs/source/project.rst b/docs/source/project.rst new file mode 100644 index 0000000..1b6c99f --- /dev/null +++ b/docs/source/project.rst @@ -0,0 +1,9 @@ +========================================== +ProjectOperation Core and Mixin Behaviours +========================================== + +.. automodule:: padocc.core.project + :members: + +.. automodule:: padocc.core.mixins + :members: \ No newline at end of file diff --git a/docs/source/scan.rst b/docs/source/scan.rst index db892f7..035d35c 100644 --- a/docs/source/scan.rst +++ b/docs/source/scan.rst @@ -2,5 +2,5 @@ Scanner Module ============== -.. automodule:: pipeline.scan +.. automodule:: padocc.phases.scan :members: \ No newline at end of file diff --git a/docs/source/validate.rst b/docs/source/validate.rst index 55b0647..389412d 100644 --- a/docs/source/validate.rst +++ b/docs/source/validate.rst @@ -2,5 +2,5 @@ Validation Module ================= -.. automodule:: pipeline.validate +.. automodule:: padocc.phases.validate :members: From 50fb0b3278030ce6f0a44cdba8f11f4906e29162 Mon Sep 17 00:00:00 2001 From: dwest77 Date: Fri, 20 Dec 2024 14:43:18 +0000 Subject: [PATCH 06/35] Updated index --- docs/source/index.rst | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/docs/source/index.rst b/docs/source/index.rst index fcd1763..ee07c92 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -33,7 +33,6 @@ The pipeline consists of four central phases, with an additional phase for inges Getting Started Example CCI Water Vapour Padocc Flags/Options - Assessor Tool Overview Error Codes Developer's Guide @@ -41,19 +40,15 @@ The pipeline consists of four central phases, with an additional phase for inges :maxdepth: 1 :caption: CLI Tool Source: - Assessor Source - Control Scripts Source - .. toctree:: :maxdepth: 1 - :caption: Pipeline Source: + :caption: PADOCC Source: Filehandlers Initialisation Scanning Compute Validate - Allocations Utils From f32598c1744d01c06eec2207a8e1262c0f34b726 Mon Sep 17 00:00:00 2001 From: dwest77 Date: Fri, 20 Dec 2024 14:45:18 +0000 Subject: [PATCH 07/35] Fixed issues with docs build --- docs/source/extras.rst | 2 +- docs/source/index.rst | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/docs/source/extras.rst b/docs/source/extras.rst index 4176c89..0d04fd6 100644 --- a/docs/source/extras.rst +++ b/docs/source/extras.rst @@ -13,6 +13,6 @@ Utilities Logging ======= -.. automodule:: pipeline.core.logs +.. automodule:: padocc.core.logs :members: :show-inheritance: \ No newline at end of file diff --git a/docs/source/index.rst b/docs/source/index.rst index ee07c92..e4f50e4 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -38,14 +38,15 @@ The pipeline consists of four central phases, with an additional phase for inges .. toctree:: :maxdepth: 1 - :caption: CLI Tool Source: + :caption: Operations: .. toctree:: :maxdepth: 1 :caption: PADOCC Source: + Single Datasets + Groups of Datasets Filehandlers - Initialisation Scanning Compute Validate @@ -68,9 +69,7 @@ PADOCC was developed at the Centre for Environmental Data Analysis, supported by .. image:: _images/ceda.png :width: 300 :alt: CEDA Logo - :width: 300 .. image:: _images/esa.png :width: 300 :alt: ESA Logo - :width: 300 From 5359735fee180fc4f4d532c0f660a9091f1624c0 Mon Sep 17 00:00:00 2001 From: dwest77a Date: Fri, 20 Dec 2024 15:35:34 +0000 Subject: [PATCH 08/35] Added initial Shepard configurations --- padocc/operations/shepard.py | 141 ++++++++++++++++++++++ padocc/tests/data_creator/flock_conf.yaml | 1 + padocc/tests/data_creator/test-flock.shp | 16 +++ 3 files changed, 158 insertions(+) create mode 100644 padocc/operations/shepard.py create mode 100644 padocc/tests/data_creator/flock_conf.yaml create mode 100644 padocc/tests/data_creator/test-flock.shp diff --git a/padocc/operations/shepard.py b/padocc/operations/shepard.py new file mode 100644 index 0000000..a41fecd --- /dev/null +++ b/padocc/operations/shepard.py @@ -0,0 +1,141 @@ +__author__ = "Daniel Westwood" +__contact__ = "daniel.westwood@stfc.ac.uk" +__copyright__ = "Copyright 2024 United Kingdom Research and Innovation" + +""" +SHEPARD: +Serialised Handler for Enabling Padocc Aggregations via Recurrent Deployment +""" + +import os +import yaml +import argparse +from typing import Union +import glob +import json + +from padocc.core.logs import LoggedOperation +from padocc.operations.group import GroupOperation + +shepard_template = { + 'workdir': '/my/workdir', + 'group_file': '/my/group/file.csv', + 'groupID': 'my-group1', + 'substitutions':['a','b'] +} + +class ShepardOperator(LoggedOperation): + + def __init__(self, conf: Union[dict,None] = None, verbose: int = 0) -> None: + + self.conf = self._load_config(conf) + + if self.conf is None: + raise NotImplementedError( + 'Shepard use without a config file is not enabled.' + ) + + self.flock_dir = self.conf.get('flock_dir',None) + if self.flock_dir is None: + raise ValueError( + 'Missing "flock_dir" from config.' + ) + + super().__init__(label='shepard-deploy',verbose=verbose) + + # Shepard Files + # - workdir for operations. + # - path to a group file. + + def run_batch(self, batch_limit: int = 100): + + batch_limit = self.conf.get('batch_limit',None) or batch_limit + + # Initialise all groups if needed (outside of batch limit) + + flock = self._init_all_flocks() + + self.logger.info("All flocks initialised") + + def _init_all_flocks(self): + shepard_files = self.find_flocks() + missed_flocks = [] + shp_flock = [] + for idx, shp in enumerate(shepard_files): + self.logger.info(f'Instantiating flock {idx+1}: {shp}') + try: + fconf = self.open_flock(shp) + except ValueError as err: + raise err + missed_flocks.append(shp) + continue + + flock = GroupOperation( + fconf['groupID'], + fconf['workdir'], + label=f'shepard->{fconf["groupID"]}', + verbose=self._verbose, + ) + + if not flock.datasets.get(): + flock.init_from_file(fconf['group_file'], substitutions=fconf['substitutions']) + + shp_flock.append(flock) + return shp_flock + + def open_flock(self, file: str): + + if not os.path.isfile(file): + raise ValueError(f'Unable to open {file}') + + with open(file) as f: + return json.load(f) + + def find_flocks(self): + + if not os.path.isdir(self.flock_dir): + raise ValueError( + f'Flock Directory: {self.flock_dir} - inaccessible.' + ) + + return glob.glob(f'{self.flock_dir}/**/*.shp', recursive=True) + + def _load_config(self, conf: str) -> Union[dict,None]: + """ + Load a conf.yaml file to a dictionary + """ + if conf is None: + return None + + if os.path.isfile(conf): + with open(conf) as f: + config = yaml.safe_load(f) + return config + else: + self.logger.error(f'Config file {conf} unreachable') + return None + +def _get_cmdline_args(): + """ + Get command line arguments passed to shepard + """ + + parser = argparse.ArgumentParser(description='Entrypoint for SHEPARD module') + parser.add_argument('--conf',type=str, help='Config file as part of deployment') + parser.add_argument('-v','--verbose', action='count', default=2, help='Set level of verbosity for logs') + + args = parser.parse_args() + + return { + 'conf': args.conf, + 'verbose': args.verbose} + +def main(): + + kwargs = _get_cmdline_args() + + shepherd = ShepardOperator(**kwargs) + shepherd.run_batch() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/padocc/tests/data_creator/flock_conf.yaml b/padocc/tests/data_creator/flock_conf.yaml new file mode 100644 index 0000000..ab4ab75 --- /dev/null +++ b/padocc/tests/data_creator/flock_conf.yaml @@ -0,0 +1 @@ +flock_dir: "/Users/daniel.westwood/cedadev/padocc/padocc/tests/data_creator" \ No newline at end of file diff --git a/padocc/tests/data_creator/test-flock.shp b/padocc/tests/data_creator/test-flock.shp new file mode 100644 index 0000000..4cdf46f --- /dev/null +++ b/padocc/tests/data_creator/test-flock.shp @@ -0,0 +1,16 @@ +{ + "workdir":"/Users/daniel.westwood/cedadev/padocc/padocc/tests/auto_testdata_dir", + "groupID":"padocc-test-suite", + "group_file":"/Users/daniel.westwood/cedadev/padocc/padocc/tests/data_creator/Aggs.csv", + "substitutions":{ + "init_file": { + "/home/users/dwest77/cedadev/":"/Users/daniel.westwood/cedadev/" + }, + "dataset_file": { + "/home/users/dwest77/cedadev/":"/Users/daniel.westwood/cedadev/" + }, + "datasets": { + "/home/users/dwest77/cedadev/":"/Users/daniel.westwood/cedadev/" + } + } +} \ No newline at end of file From 17e239023718636b6b1c06efd3b58fe5895ed071 Mon Sep 17 00:00:00 2001 From: dwest77a Date: Fri, 20 Dec 2024 15:36:01 +0000 Subject: [PATCH 09/35] Syntax changes and added shepard entrypoint --- padocc/core/project.py | 2 +- padocc/operations/group.py | 1 - pyproject.toml | 4 +++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/padocc/core/project.py b/padocc/core/project.py index c636ebf..e6e2e7c 100644 --- a/padocc/core/project.py +++ b/padocc/core/project.py @@ -145,7 +145,7 @@ def __init__( self.phase_logs[phase] = LogFileHandler( self.dir, phase, - self.logger, + logger=self.logger, extra_path='phase_logs/', **self.fh_kwargs ) diff --git a/padocc/operations/group.py b/padocc/operations/group.py index 5c18652..6e79587 100644 --- a/padocc/operations/group.py +++ b/padocc/operations/group.py @@ -352,7 +352,6 @@ def _validate_config( dryrun=dryrun) return status - def add_project(self): pass diff --git a/pyproject.toml b/pyproject.toml index a6174a2..c621396 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,9 @@ cfapyx = "2024.11.27" myst-nb = "^1.1.2" types-pyyaml = "^6.0.12.20240917" - [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" + +[tool.poetry.scripts] +shepard_deploy = "padocc.operations.shepard:main" \ No newline at end of file From 7c7775b58d99972f50728de5a5f2209e5dcdcdea Mon Sep 17 00:00:00 2001 From: dwest77a Date: Fri, 20 Dec 2024 15:40:15 +0000 Subject: [PATCH 10/35] Syntax fixes --- padocc/operations/shepard.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/padocc/operations/shepard.py b/padocc/operations/shepard.py index a41fecd..0d2123d 100644 --- a/padocc/operations/shepard.py +++ b/padocc/operations/shepard.py @@ -66,10 +66,9 @@ def _init_all_flocks(self): try: fconf = self.open_flock(shp) except ValueError as err: - raise err - missed_flocks.append(shp) + missed_flocks.append((shp, err)) continue - + flock = GroupOperation( fconf['groupID'], fconf['workdir'], @@ -79,8 +78,13 @@ def _init_all_flocks(self): if not flock.datasets.get(): flock.init_from_file(fconf['group_file'], substitutions=fconf['substitutions']) + else: + self.logger.info(f'Skipped existing flock: {fconf["groupID"]}') shp_flock.append(flock) + + # Handle missed flocks here. + return shp_flock def open_flock(self, file: str): @@ -122,7 +126,7 @@ def _get_cmdline_args(): parser = argparse.ArgumentParser(description='Entrypoint for SHEPARD module') parser.add_argument('--conf',type=str, help='Config file as part of deployment') - parser.add_argument('-v','--verbose', action='count', default=2, help='Set level of verbosity for logs') + parser.add_argument('-v','--verbose', action='count', default=0, help='Set level of verbosity for logs') args = parser.parse_args() From ea2078505bbe312cbd0be7d34b6b620f2eba6e1f Mon Sep 17 00:00:00 2001 From: dwest77a Date: Mon, 23 Dec 2024 13:55:16 +0000 Subject: [PATCH 11/35] Total overhaul of filehandlers for mypy consistency --- padocc/core/filehandlers.py | 600 +++++++++++++++++++++++------------- 1 file changed, 387 insertions(+), 213 deletions(-) diff --git a/padocc/core/filehandlers.py b/padocc/core/filehandlers.py index ae64dd3..d940d66 100644 --- a/padocc/core/filehandlers.py +++ b/padocc/core/filehandlers.py @@ -9,8 +9,11 @@ import logging from typing import Iterator from typing import Optional, Union +import xarray as xr from padocc.core import LoggedOperation, FalseLogger +from .utils import format_str + class FileIOMixin(LoggedOperation): """ @@ -46,12 +49,12 @@ def __init__( label : Union[str,None] = None, fh : Optional[str] = None, logid : Optional[str] = None, - dryrun : Optional[bool] = None, + dryrun : bool = False, forceful : bool = False, verbose : int = 0 ) -> None: """ - General filehandler for PADOCC operations involving file I/O operations. + Generic filehandler for PADOCC operations involving file I/O operations. :param dir: (str) The path to the directory in which this file can be found. @@ -77,14 +80,12 @@ def __init__( :returns: None """ - self.dir = dir - self._file = filename - - self._dryrun = dryrun - self._forceful = forceful - self._value = None + self._dir: str = dir + self._file: str = filename - self._set_file() + self._dryrun: bool = dryrun + self._forceful: bool = forceful + self._extension: str = '' # All filehandlers are logged operations super().__init__( @@ -93,284 +94,318 @@ def __init__( fh=fh, logid=logid, verbose=verbose) - - def __contains__(self, item: object) -> bool: - """ - Enables checking 'if x in fh'. - """ - if self._value is None: - self._get_content() - return item in self._value - @property def filepath(self) -> str: """ - Returns the private file attribute. + Returns the full filepath attribute. + """ + return f'{self._dir}/{self.file}' + + @property + def file(self) -> str: """ - return self._file + Returns the full filename attribute.""" + return f'{self._file}.{self._extension}' def file_exists(self) -> bool: """ Return true if the file is found. """ - return os.path.isfile(self._file) + return os.path.isfile(self.filepath) - def create_file(self): + def create_file(self) -> None: """ Create the file if not on dryrun. """ if not self._dryrun: - self.logger.debug(f'Creating file "{self._file}"') - os.system(f'touch {self._file}') + self.logger.debug(f'Creating file "{self.file}"') + os.system(f'touch {self.filepath}') else: - self.logger.info(f'DRYRUN: Skipped creating "{self._file}"') - - def close(self): - """ - Wrapper for _set_content method - """ - self.logger.debug(f'Saving file {self._file}') - self._set_content() + self.logger.info(f'DRYRUN: Skipped creating "{self.file}"') - def set(self, value: object, index: Union[object,None] = None): + def remove_file(self) -> None: """ - Reset the whole value of the private ``_value`` attribute. + Remove the file on the filesystem + if not on dryrun """ - self._check_value() - - if index is not None: - self._value[index] = value + if not self._dryrun: + self.logger.debug(f'Deleting file "{self.file}"') + os.system(f'rm {self.filepath}') else: - self._value = value - - def get( - self, - index: object = None, - default: object = None): - """ - Get the value of the private ``_value`` attribute. Can also get a - parameter from this item as you would with a dictionary, if possible - for the item type represented by ``_value``. - """ - self._check_value() + self.logger.info(f'DRYRUN: Skipped deleting "{self.file}"') - if index is None: - return self._value + def move_file( + self, + new_dir: str, + new_name: Union[str,None] = None, + new_extension: Union[str, None] = None + ): + + if not os.access(new_dir, os.W_OK): + raise OSError( + f'Specified directory "{new_dir}" is not writable' + ) + + old_path = str(self.filepath) + self._dir = new_dir - # Issue #1 + if new_name is not None: + self._file = new_name + + if new_extension is not None: + self._extension = new_extension try: - return self._value.get(index, default) - except AttributeError: - raise AttributeError( - f'Filehandler for {self._file} does not support getting specific items.' + os.system(f'mv {old_path} {self.filepath}') + self.logger.debug( + f'Moved file successfully from {old_path} to {self.filepath}' ) + except OSError as err: + self.__set_filepath(old_path) + raise err - def _set_file(self): - raise NotImplementedError( - 'Set file not implemented for FileIO Mixin - please use a filehandler.' - ) - - def _get_content(self): - raise NotImplementedError( - 'Get content not implemented for FileIO Mixin - please use a filehandler.' - ) - - def _check_save(self) -> bool: + def __set_filepath(self, filepath) -> None: """ - Returns true if content is able to be saved. + Private method to hard reset the filepath """ - # Only set value if value has been loaded to edit. - self._check_value() + components = '/'.join(filepath.split("/")) + self._dir = components[:-2] + filename = components[-1] - # Only set value if not doing a dryrun - if self._dryrun: - self.logger.info(f'DRYRUN: Skip writing file "{self.filename}"') - return None + self._file, self._extension = filename.split('.') - # Create new file as required - if not self.file_exists(): - self.create_file() +class ListIOMixin(FileIOMixin): + """ + Filehandler for string-based Lists in Padocc + """ + + def __init__( + self, + dir: str, + filename: str, + extension: Union[str,None] = None, + init_value: Union[list, None] = None, + **kwargs) -> None: + + super().__init__(dir, filename, **kwargs) + + self._value: list = init_value or [] + self._extension: str = extension or 'txt' + + if self._value is not None: + self._set_value_in_file() - # Continue with setting content to Filesystem object. - return True + def append(self, newvalue: str) -> None: + """Add a new value to the internal list""" + self._obtain_value() + + self._value.append(newvalue) - def _check_value(self): + def set(self, value: list) -> None: """ - Check if the value needs to be loaded from the file. + Reset the value as a whole for this + filehandler. """ - if self._value is None: - self._get_content() + self._value = value -class ListIOMixin(FileIOMixin): + def __contains__(self, item: str) -> bool: + """ + Check if the item value is contained in + this list.""" + self._obtain_value() + + return item in self._value def __str__(self) -> str: """String representation""" - content = self.get() - return '\n'.join(content) + return '\n'.join(self._value) + + def __repr__(self) -> str: + """Programmatic representation""" + return f"" def __len__(self) -> int: """Length of value""" - content = self.get() - self.logger.debug(f'content length: {len(content)}') - return len(content) + self.logger.debug(f'content length: {len(self._value)}') + return len(self._value) def __iter__(self) -> Iterator[str]: """Iterator for the set of values""" - # Issue #2 for i in self._value: if i is not None: yield i - def __getitem__(self, index: object): + def __getitem__(self, index: int) -> str: """ Override FileIOMixin class for getting index """ - if self._value is None: - self._get_content() - - if not isinstance(index, int): - raise ValueError( - 'List-based Filehandler is not numerically indexable.' - ) + self._obtain_value() return self._value[index] - def __setitem__(self, index: object, value: object) -> None: + def __setitem__(self, index: int, value: str) -> None: """ Enables setting items in filehandlers 'fh[0] = 1' """ - if self._value is None: - self._get_content() - - if not isinstance(index, int): - raise ValueError( - 'List-based Filehandler is not numerically indexable.' - ) + self._obtain_value() self._value[index] = value - def append(self, newvalue: object) -> None: - """Add a new value to the internal list""" - self._value.append(newvalue) - - def set(self, value: object): + def _obtain_value(self) -> None: """ - Extends the set function of the parent, creates a copy - of the input list so the original parameter is preserved. + Obtain the value for this filehandler. """ - super().set(list(value)) + if self._value is None: + self._obtain_value_from_file() - def _get_content(self) -> None: + def _obtain_value_from_file(self) -> None: """ - Open the file to get content if it exists + Obtain the value specifically from + the represented file """ - if self.file_exists(): - self.logger.debug('Opening existing file') - with open(self._file) as f: - content = [r.strip() for r in f.readlines()] - self._value = content + if not self.file_exists(): + self.create_file() - else: - self.logger.debug('Creating new file') + with open(self.filepath) as f: + self.value = [r.strip() for r in f.readlines()] + + def _set_value_in_file(self) -> None: + """ + On initialisation or close, set the value + in the file. + """ + if not self.file_exists(): self.create_file() - self._value = [] - def _set_content(self) -> None: - """If the content can be saved, save to the file.""" - if super()._check_save(): - with open(self._file,'w') as f: - f.write('\n'.join(self._value)) + with open(self.filepath,'w') as f: + f.write('\n'.join(self._value)) class JSONFileHandler(FileIOMixin): - description = "JSON File handler for padocc config files." + """JSON File handler for padocc config files.""" def __init__( self, dir: str, filename: str, - logger: logging.Logger | FalseLogger = None, conf: Union[dict,None] = None, + init_value: Union[dict,None] = None, **kwargs ) -> None: - self._conf = conf or {} - super().__init__(dir, filename, logger=logger, **kwargs) + super().__init__(dir, filename,**kwargs) + self._conf: dict = conf or {} + self._value: dict = init_value or {} + self.extension: str = 'json' + + if self._value is not None: + self._set_value_in_file() + + def set(self, value: dict) -> None: + """ + Set the value of the whole dictionary. + """ + self._value = value + + def __contains__(self, key: str): + """ + Check if the dict for this filehandler + contains this key.""" + self._obtain_value() + + return key in self._value.keys() def __str__(self) -> str: """String representation""" - return yaml.dump(self.get()) + self._obtain_value() + + return yaml.safe_dump(self._value,indent=2) + + def __repr__(self) -> str: + """Programmatic representation""" + return f"" def __len__(self) -> int: """Returns number of keys in this dict-like object.""" - self._check_value() + self._obtain_value() return len(self._value.keys()) def __iter__(self) -> Iterator[str]: """Iterate over set of keys.""" - self._check_value() + self._obtain_value() for i in self._value.keys(): yield i - def __getitem__(self, index: str): + def __getitem__(self, index: str) -> Union[str,dict,None]: """ - Enables indexing for filehandlers 'fh[0]' + Enables indexing for filehandlers. + Dict-based filehandlers accept string keys only. """ - if self._value is None: - self._get_content() - - if self._conf is not None: - if index in self._conf: - self._apply_conf() + self._obtain_value() if index in self._value: return self._value[index] return None + + def get( + self, + index: str, + default: Union[str,None] = None + ) -> Union[str,dict,None]: + """ + Safe method to get a value from this filehandler + """ + return self._value.get(index, default) - def __setitem__(self, index: str, value) -> None: + def __setitem__(self, index: str, value: str) -> None: """ - Enables setting items in filehandlers 'fh[0] = 1' + Enables setting items in filehandlers. + Dict-based filehandlers accept string keys only. """ - if self._value is None: - self._get_content() + self._obtain_value() if index in self._value: self._value[index] = value - return None + + def _obtain_value(self, index: Union[str,None] = None) -> None: + """ + Obtain the value for this filehandler. + """ + if self._value is None: + self._obtain_value_from_file() - def set(self, value: object, index: Union[object,None] = None) -> None: + if index is None: + return + + if self._conf is not None: + if index in self._conf: + self._apply_conf() + + def _obtain_value_from_file(self) -> None: """ - Wrapper to create a detached dict copy + Obtain the value specifically from + the represented file """ - super().set(dict(value), index=index) + if not self.file_exists(): + self.create_file() + return - def _set_file(self) -> None: - if '.json' not in self._file: - self._file = f'{self.dir}/{self._file}.json' - else: - self._file = f'{self.dir}/{self._file}' - - # Get/set routines for the filesystem files. - def _get_content(self) -> None: - if self.file_exists(): - try: - with open(self._file) as f: - self._value = json.load(f) - except json.decoder.JSONDecodeError: - self._value = {} - else: + with open(self.filepath) as f: + self.value = json.load(f) + + def _set_value_in_file(self) -> None: + """ + On initialisation or close, set the value + in the file. + """ + if not self.file_exists(): self.create_file() - self._value = {} - def _set_content(self) -> None: - if super()._check_save(): - self._apply_conf() - with open(self._file,'w') as f: - f.write(json.dumps(self._value)) + with open(self.filepath,'w') as f: + f.write(json.dumps(f)) def _apply_conf(self) -> None: """ @@ -384,7 +419,7 @@ def _apply_conf(self) -> None: self._conf.update(self._value) self._value = dict(self._conf) - self._conf = None + self._conf = {} class KerchunkFile(JSONFileHandler): """ @@ -400,14 +435,12 @@ def add_download_link( """ Add the download link to this Kerchunk File """ - refs = self.get() - - for key in refs.keys(): - if len(refs[key]) == 3: - if refs[key][0][0] == sub: - refs[key][0] = replace + refs[key][0] + self._obtain_value() - self.set(refs) + for key in self._value.keys(): + if len(self._value[key]) == 3: + if self._value[key][0][0] == sub: + self._value[key][0] = replace + self._value[key][0] def add_kerchunk_history(self, version_no: str) -> None: """ @@ -418,15 +451,20 @@ def add_kerchunk_history(self, version_no: str) -> None: from datetime import datetime # Get current time - attrs = self['refs'] + attrs = self.get('refs',None) + + if attrs is None or not isinstance(attrs,str): + raise ValueError( + 'Attribute "refs" not present in Kerchunk file' + ) # Format for different uses now = datetime.now() if 'history' in attrs: - if type(attrs['history']) == str: - hist = attrs['history'].split('\n') - else: - hist = attrs['history'] + hist = attrs.get('history','') + + if type(hist) == str: + hist = hist.split('\n') if 'Kerchunk' in hist[-1]: hist[-1] = 'Kerchunk file updated on ' + now.strftime("%D") @@ -439,34 +477,146 @@ def add_kerchunk_history(self, version_no: str) -> None: attrs['kerchunk_revision'] = version_no attrs['kerchunk_creation_date'] = now.strftime("%d%m%yT%H%M%S") - self.set(attrs, index='refs') + self['refs'] = attrs -class ZarrStore(FileIOMixin): +class GenericStore(LoggedOperation): """ - Filehandler for Zarr store in Padocc - enables Filesystem + Filehandler for Generic stores in Padocc - enables Filesystem operations on component files. """ + + def __init__( + self, + parent_dir: str, + store_name: str, + metadata_name: str = '.zattrs', + extension: str = 'zarr', + logger : Optional[Union[logging.Logger,FalseLogger]] = None, + label : Union[str,None] = None, + fh : Optional[str] = None, + logid : Optional[str] = None, + dryrun : bool = False, + forceful : bool = False, + verbose : int = 0 + ) -> None: + + self._parent_dir: str = parent_dir + self._store_name: str = store_name + self._extension: str = extension + + self._meta: JSONFileHandler = JSONFileHandler( + self.store_path, metadata_name) + + self._dryrun: bool = dryrun + self._forceful: bool = forceful + + # All filehandlers are logged operations + super().__init__( + logger, + label=label, + fh=fh, + logid=logid, + verbose=verbose) + + @property + def store_path(self) -> str: + """Assemble the store path""" + return f'{self._parent_dir}/{self._store_name}.{self._extension}' + def clear(self) -> None: + """ + Remove all components of the store""" if not self._dryrun: - os.system(f'rm -rf {self._file}') + os.system(f'rm -rf {self.store_path}') else: - self.logger.warning( - f'Unable to clear ZarrStore "{self._file}" in dryrun mode.') + self.logger.debug( + f'Skipped clearing "{self._extension}"-type ' + f'Store "{self._store_name}" in dryrun mode.' + ) - def _set_file(self): - self._file = f'{self.dir}/{self.filename}' + def open(self, engine: str = 'zarr', **open_kwargs) -> xr.Dataset: + """Open the store as a dataset (READ_ONLY)""" + return xr.open_dataset(self.store_path, engine=engine,**open_kwargs) -class TextFileHandler(ListIOMixin): + def __contains__(self, key: str) -> bool: + """ + Check if a key exists in the zattrs file""" + return key in self._meta + + def __str__(self) -> str: + """Return the string representation of the store""" + return self.__repr__() + + def __len__(self) -> int: + """Find the number of keys in zattrs""" + return len(self._meta) + + def __repr__(self) -> str: + """Programmatic representation""" + return f'' + + def __getitem__(self, index: str) -> Union[str,dict,None]: + """Get an attribute from the zarr store""" + return self._meta[index] + + def __setitem__(self, index: str, value: str) -> None: + """Set an attribute in the zarr store""" + self._meta[index] = value + +class ZarrStore(GenericStore): + """ + Filehandler for Zarr stores in PADOCC. + Enables manipulation of Zarr store on filesystem + and setting metadata attributes.""" + + def __init__( + self, + parent_dir: str, + store_name: str, + **kwargs + ) -> None: + + super().__init__(parent_dir, store_name, **kwargs) + + def __repr__(self) -> str: + """Programmatic representation""" + return f'' + + def open(self, *args, **zarr_kwargs) -> xr.Dataset: + """ + Open the ZarrStore as an xarray dataset + """ + return super().open(engine='zarr',**zarr_kwargs) + +class KerchunkStore(GenericStore): """ - Filehandler for text files. + Filehandler for Kerchunk stores using parquet + in PADOCC. Enables setting metadata attributes and + will allow combining stores in future. """ - description = "Text File handler for padocc config files." - def _set_file(self): - if '.txt' not in self._file: - self._file = f'{self.dir}/{self._file}.txt' - else: - self._file = f'{self.dir}/{self._file}' + def __init__( + self, + parent_dir: str, + store_name: str, + **kwargs + ) -> None: + + super().__init__( + parent_dir, store_name, + metadata_name='.zmetadata', + extension='parq', + **kwargs) + + def __repr__(self) -> str: + """Programmatic representation""" + return f'' + + def open(self, *args, **parquet_kwargs) -> xr.Dataset: + """ + Open the Parquet Store as an xarray dataset + """ + raise NotImplementedError class LogFileHandler(ListIOMixin): """Log File handler for padocc phase logs.""" @@ -477,22 +627,32 @@ def __init__( dir: str, filename: str, extra_path: str = '', - logger: Union[logging.Logger, FalseLogger, None] = None, **kwargs ) -> None: self._extra_path = extra_path - super().__init__(dir, filename, logger, **kwargs) + super().__init__(dir, filename, **kwargs) + + self._extension = 'log' - def _set_file(self) -> None: - self._file = f'{self.dir}/{self._extra_path}{self._file}.log' + @property + def file(self) -> str: + return f'{self._extra_path}{self._file}.{self._extension}' class CSVFileHandler(ListIOMixin): """CSV File handler for padocc config files""" description = "CSV File handler for padocc config files" - def _set_file(self) -> None: - self._file = f'{self.dir}/{self._file}.csv' + def __init__( + self, + dir: str, + filename: str, + **kwargs + ) -> None: + + super().__init__(dir, filename, **kwargs) + + self._extension = 'csv' def __iter__(self) -> Iterator[str]: for i in self._value: @@ -503,13 +663,27 @@ def update_status( self, phase: str, status: str, - jobid : Optional[str] = None, - dryrun: bool = False + jobid : str = '', ) -> None: - self._check_value() + """ + Update formatted status for this + log with the phase and status + + :param phase: (str) The phase for which this project is being + operated. + + :param status: (str) The status of the current run + (e.g. Success, Failed, Fatal) + + :param jobid: (str) The jobID of this run if present. + """ + + if self._dryrun: + self.logger.info("Skipped updating status") + return status = status.replace(',', '.').replace('\n','.') - addition = f'{phase},{status},{datetime.now().strftime("%H:%M %D")},{jobid},{dryrun}' + addition = f'{phase},{status},{datetime.now().strftime("%H:%M %D")},{jobid}' self.append(addition) self.logger.info(f'Updated new status: {phase} - {status}') \ No newline at end of file From 2881e6e82e734e0764f0d735bf7413b3f477578a Mon Sep 17 00:00:00 2001 From: dwest77a Date: Mon, 23 Dec 2024 15:34:25 +0000 Subject: [PATCH 12/35] Updated all tests for filehandler refactors --- padocc/tests/test_compute.py | 4 ++-- padocc/tests/test_fhs.py | 29 ++++++++++++++--------------- padocc/tests/test_init.py | 3 ++- padocc/tests/test_scan.py | 2 +- padocc/tests/test_validate.py | 6 +++--- 5 files changed, 22 insertions(+), 22 deletions(-) diff --git a/padocc/tests/test_compute.py b/padocc/tests/test_compute.py index 4f4c338..da3e435 100644 --- a/padocc/tests/test_compute.py +++ b/padocc/tests/test_compute.py @@ -17,5 +17,5 @@ def test_compute_basic(self, workdir=WORKDIR): assert results['Success'] == 3 if __name__ == '__main__': - workdir = '/home/users/dwest77/cedadev/padocc/padocc/tests/auto_testdata_dir' - TestCompute().test_compute_basic(workdir=workdir) \ No newline at end of file + #workdir = '/home/users/dwest77/cedadev/padocc/padocc/tests/auto_testdata_dir' + TestCompute().test_compute_basic()#workdir=workdir) \ No newline at end of file diff --git a/padocc/tests/test_fhs.py b/padocc/tests/test_fhs.py index dcd3f35..8604599 100644 --- a/padocc/tests/test_fhs.py +++ b/padocc/tests/test_fhs.py @@ -4,7 +4,7 @@ from padocc.core.filehandlers import ( JSONFileHandler, KerchunkFile, - TextFileHandler, + ListFileHandler, LogFileHandler, CSVFileHandler @@ -13,7 +13,6 @@ WORKDIR = 'padocc/tests/auto_testdata_dir' testdict = { - 0: 'test0', 'test':None } @@ -29,14 +28,6 @@ def generic(fh, testdata, dryrun): fh.create_file() assert dryrun == (not os.path.isfile(fh.filepath)) - # Magic methods - assert 'test' in fh - assert 'real' not in fh - - assert fh[0] == 'test0' - fh[0] = 'test1' - assert fh[0] == 'test1' - if os.path.isfile(fh.filepath): os.system(f'rm -rf {fh.filepath}') @@ -55,6 +46,10 @@ def generic_list(fh, testdata): for x, item in enumerate(fh): assert item == testdata[x] + assert fh[0] == 'test0' + fh[0] = 'test1' + assert fh[0] == 'test1' + # Append fh.append('testlist') assert fh[-1] == 'testlist' @@ -69,8 +64,8 @@ def test_json_fh(self): print("Unit Tests: JSON FH") - for dryrun in [True, False]: - json_fh = JSONFileHandler(WORKDIR,'testjs.json', dryrun=dryrun) + for dryrun in [True]: + json_fh = JSONFileHandler(WORKDIR,'testjs', dryrun=dryrun, verbose=2) json_fh.set(testdict) @@ -80,6 +75,10 @@ def test_json_fh(self): # Generic assert generic(json_fh, testdict, dryrun) + # Magic methods + assert 'test' in json_fh + assert 'real' not in json_fh + print(f' - JSON FH (dryrun={dryrun}) - Complete') def test_text_fh(self): @@ -88,7 +87,7 @@ def test_text_fh(self): for dryrun in [True, False]: - text_fh = TextFileHandler(WORKDIR, 'testtx.txt', dryrun=dryrun) + text_fh = ListFileHandler(WORKDIR, 'testtx', dryrun=dryrun) text_fh.set(testlist) if dryrun: @@ -104,7 +103,7 @@ def test_csv_fh(self): for dryrun in [True, False]: - csv_fh = CSVFileHandler(WORKDIR, 'test.csv', dryrun=dryrun) + csv_fh = CSVFileHandler(WORKDIR, 'test', dryrun=dryrun) csv_fh.set(testlist) @@ -113,7 +112,7 @@ def test_csv_fh(self): assert generic(csv_fh, testlist, dryrun) - csv_fh.update_status('testp','tests','jid1',dryrun) + csv_fh.update_status('testp','tests','jid1') assert not len(csv_fh) == len(testlist) print(f' - CSV FH (dryrun={dryrun}) - Complete') diff --git a/padocc/tests/test_init.py b/padocc/tests/test_init.py index 4fdad98..239cf26 100644 --- a/padocc/tests/test_init.py +++ b/padocc/tests/test_init.py @@ -27,7 +27,8 @@ def test_init_basic(self, wd=WORKDIR): process = GroupOperation( groupID, workdir=workdir, - label='test_init') + label='test_init', + verbose=2) process.init_from_file(infile, substitutions=substitutions) diff --git a/padocc/tests/test_scan.py b/padocc/tests/test_scan.py index e5a3f10..4b9d6c8 100644 --- a/padocc/tests/test_scan.py +++ b/padocc/tests/test_scan.py @@ -4,7 +4,7 @@ WORKDIR = 'padocc/tests/auto_testdata_dir' class TestScan: - def test_scan_basic(self, workdir=WORKDIR, verbose=1): + def test_scan_basic(self, workdir=WORKDIR, verbose=2): groupID = 'padocc-test-suite' process = GroupOperation( diff --git a/padocc/tests/test_validate.py b/padocc/tests/test_validate.py index 5f4a67e..f768c15 100644 --- a/padocc/tests/test_validate.py +++ b/padocc/tests/test_validate.py @@ -12,11 +12,11 @@ def test_validate(self, workdir=WORKDIR): label='test_validate', verbose=1) - results = process.run('validate', forceful=True) + results = process.run('validate', forceful=True, subset_bypass=True) assert results['Fatal'] == 2 assert results['Warning'] == 1 if __name__ == '__main__': - workdir = '/home/users/dwest77/cedadev/padocc/padocc/tests/auto_testdata_dir' - TestValidate().test_validate(workdir=workdir) \ No newline at end of file + #workdir = '/home/users/dwest77/cedadev/padocc/padocc/tests/auto_testdata_dir' + TestValidate().test_validate()#workdir=workdir) \ No newline at end of file From 5b904e0b6152e037e709f49d456ae74d6ca13c5b Mon Sep 17 00:00:00 2001 From: dwest77a Date: Mon, 23 Dec 2024 15:34:39 +0000 Subject: [PATCH 13/35] Major refactoring with filehandlers --- padocc/core/filehandlers.py | 87 +++++++++++++++++++++++++++---------- 1 file changed, 64 insertions(+), 23 deletions(-) diff --git a/padocc/core/filehandlers.py b/padocc/core/filehandlers.py index d940d66..f85a227 100644 --- a/padocc/core/filehandlers.py +++ b/padocc/core/filehandlers.py @@ -175,7 +175,7 @@ def __set_filepath(self, filepath) -> None: self._file, self._extension = filename.split('.') -class ListIOMixin(FileIOMixin): +class ListFileHandler(FileIOMixin): """ Filehandler for string-based Lists in Padocc """ @@ -193,9 +193,6 @@ def __init__( self._value: list = init_value or [] self._extension: str = extension or 'txt' - if self._value is not None: - self._set_value_in_file() - def append(self, newvalue: str) -> None: """Add a new value to the internal list""" self._obtain_value() @@ -207,7 +204,7 @@ def set(self, value: list) -> None: Reset the value as a whole for this filehandler. """ - self._value = value + self._value = list(value) def __contains__(self, item: str) -> bool: """ @@ -219,6 +216,8 @@ def __contains__(self, item: str) -> bool: def __str__(self) -> str: """String representation""" + self._obtain_value() + return '\n'.join(self._value) def __repr__(self) -> str: @@ -227,11 +226,15 @@ def __repr__(self) -> str: def __len__(self) -> int: """Length of value""" + self._obtain_value() + self.logger.debug(f'content length: {len(self._value)}') return len(self._value) def __iter__(self) -> Iterator[str]: """Iterator for the set of values""" + self._obtain_value() + for i in self._value: if i is not None: yield i @@ -244,6 +247,14 @@ def __getitem__(self, index: int) -> str: return self._value[index] + def get(self) -> list: + """ + Get the current value + """ + self._obtain_value() + + return self._value + def __setitem__(self, index: int, value: str) -> None: """ Enables setting items in filehandlers 'fh[0] = 1' @@ -256,7 +267,7 @@ def _obtain_value(self) -> None: """ Obtain the value for this filehandler. """ - if self._value is None: + if self._value == []: self._obtain_value_from_file() def _obtain_value_from_file(self) -> None: @@ -268,19 +279,29 @@ def _obtain_value_from_file(self) -> None: self.create_file() with open(self.filepath) as f: - self.value = [r.strip() for r in f.readlines()] + self._value = [r.strip() for r in f.readlines()] def _set_value_in_file(self) -> None: """ On initialisation or close, set the value in the file. """ + if self._dryrun or self._value == []: + self.logger.debug("Skipped setting value in file") + return + if not self.file_exists(): self.create_file() with open(self.filepath,'w') as f: f.write('\n'.join(self._value)) + def close(self) -> None: + """ + Save the content of the filehandler + """ + self._set_value_in_file() + class JSONFileHandler(FileIOMixin): """JSON File handler for padocc config files.""" @@ -293,19 +314,16 @@ def __init__( **kwargs ) -> None: - super().__init__(dir, filename,**kwargs) + super().__init__(dir, filename, **kwargs) self._conf: dict = conf or {} self._value: dict = init_value or {} - self.extension: str = 'json' - - if self._value is not None: - self._set_value_in_file() + self._extension: str = 'json' def set(self, value: dict) -> None: """ Set the value of the whole dictionary. """ - self._value = value + self._value = dict(value) def __contains__(self, key: str): """ @@ -350,14 +368,27 @@ def __getitem__(self, index: str) -> Union[str,dict,None]: return None + def create_file(self) -> None: + """JSON files require entry of a single dict on creation""" + super().create_file() + + if not self._dryrun: + with open(self.filepath,'w') as f: + f.write(json.dumps({})) + def get( self, - index: str, + index: Union[str,None] = None, default: Union[str,None] = None ) -> Union[str,dict,None]: """ Safe method to get a value from this filehandler """ + self._obtain_value() + + if index is None: + return self._value + return self._value.get(index, default) def __setitem__(self, index: str, value: str) -> None: @@ -374,7 +405,7 @@ def _obtain_value(self, index: Union[str,None] = None) -> None: """ Obtain the value for this filehandler. """ - if self._value is None: + if self._value == {}: self._obtain_value_from_file() if index is None: @@ -394,18 +425,26 @@ def _obtain_value_from_file(self) -> None: return with open(self.filepath) as f: - self.value = json.load(f) + self._value = json.load(f) def _set_value_in_file(self) -> None: """ On initialisation or close, set the value in the file. """ + if self._dryrun or self._value == {}: + self.logger.debug("Skipped setting value in file") + return + + self._apply_conf() + if not self.file_exists(): self.create_file() with open(self.filepath,'w') as f: - f.write(json.dumps(f)) + f.write(json.dumps(self._value)) + + def _apply_conf(self) -> None: """ @@ -421,6 +460,12 @@ def _apply_conf(self) -> None: self._value = dict(self._conf) self._conf = {} + def close(self) -> None: + """ + Save the content of the filehandler + """ + self._set_value_in_file() + class KerchunkFile(JSONFileHandler): """ Filehandler for Kerchunk file, enables substitution/replacement @@ -618,7 +663,7 @@ def open(self, *args, **parquet_kwargs) -> xr.Dataset: """ raise NotImplementedError -class LogFileHandler(ListIOMixin): +class LogFileHandler(ListFileHandler): """Log File handler for padocc phase logs.""" description = "Log File handler for padocc phase logs." @@ -639,7 +684,7 @@ def __init__( def file(self) -> str: return f'{self._extra_path}{self._file}.{self._extension}' -class CSVFileHandler(ListIOMixin): +class CSVFileHandler(ListFileHandler): """CSV File handler for padocc config files""" description = "CSV File handler for padocc config files" @@ -679,10 +724,6 @@ def update_status( :param jobid: (str) The jobID of this run if present. """ - if self._dryrun: - self.logger.info("Skipped updating status") - return - status = status.replace(',', '.').replace('\n','.') addition = f'{phase},{status},{datetime.now().strftime("%H:%M %D")},{jobid}' self.append(addition) From 0a8149d88bc3bcdcb04cb731e934bff0678a3979 Mon Sep 17 00:00:00 2001 From: dwest77a Date: Mon, 23 Dec 2024 15:35:15 +0000 Subject: [PATCH 14/35] Refactorings due to filehanders --- padocc/core/errors.py | 6 ++---- padocc/core/mixins.py | 4 ++-- padocc/core/project.py | 26 ++++++++++++-------------- padocc/operations/group.py | 21 ++++++++++----------- padocc/phases/compute.py | 12 ++++++------ padocc/phases/scan.py | 6 +++--- padocc/phases/validate.py | 9 +++++++-- 7 files changed, 42 insertions(+), 42 deletions(-) diff --git a/padocc/core/errors.py b/padocc/core/errors.py index f901f0c..e34e2c4 100644 --- a/padocc/core/errors.py +++ b/padocc/core/errors.py @@ -9,8 +9,6 @@ from typing import Optional, Union -from .filehandlers import CSVFileHandler - def error_handler( err : Exception, logger: logging.Logger, @@ -18,7 +16,7 @@ def error_handler( dryrun: bool = False, subset_bypass: bool = False, jobid: Optional[str] = None, - status_fh: Optional[CSVFileHandler] = None + status_fh: Optional[object] = None ): """ @@ -50,7 +48,7 @@ def get_status(tb: list) -> str: status = get_status(tb) if status_fh is not None: - status_fh.update_status(phase, status, jobid=jobid, dryrun=dryrun) + status_fh.update_status(phase, status, jobid=jobid) if subset_bypass: logger.error(tb) diff --git a/padocc/core/mixins.py b/padocc/core/mixins.py index 7eda6e4..9378e6e 100644 --- a/padocc/core/mixins.py +++ b/padocc/core/mixins.py @@ -206,9 +206,9 @@ def outpath(self): @property def outproduct(self): if self.stage == 'complete': - return f'{self.proj_code}.{self.version_no}.{self.file_type}' + return f'{self.proj_code}.{self.version_no}' else: - vn = f'{self.version_no}a.{self.file_type}' + vn = f'{self.version_no}a' if self._is_trial: vn = f'trial-{vn}' return vn diff --git a/padocc/core/project.py b/padocc/core/project.py index e6e2e7c..808e7bf 100644 --- a/padocc/core/project.py +++ b/padocc/core/project.py @@ -14,7 +14,7 @@ from .filehandlers import ( JSONFileHandler, CSVFileHandler, - TextFileHandler, + ListFileHandler, LogFileHandler, KerchunkFile ) @@ -129,7 +129,7 @@ def __init__( # Project FileHandlers self.base_cfg = JSONFileHandler(self.dir, 'base-cfg', logger=self.logger, conf=file_configs['base_cfg'], **self.fh_kwargs) self.detail_cfg = JSONFileHandler(self.dir, 'detail-cfg', logger=self.logger, conf=file_configs['detail_cfg'], **self.fh_kwargs) - self.allfiles = TextFileHandler(self.dir, 'allfiles', logger=self.logger, **self.fh_kwargs) + self.allfiles = ListFileHandler(self.dir, 'allfiles', logger=self.logger, **self.fh_kwargs) # ft_kwargs <- stored in base_cfg after this point. if first_time: @@ -138,7 +138,7 @@ def __init__( self._configure_filelist() # ProjectOperation attributes - self.status_log = CSVFileHandler(self.dir, 'status_log', self.logger, **self.fh_kwargs) + self.status_log = CSVFileHandler(self.dir, 'status_log', logger=self.logger, **self.fh_kwargs) self.phase_logs = {} for phase in ['scan', 'compute', 'validate']: @@ -217,12 +217,11 @@ def run( self.save_files() return status except Exception as err: - raise err - #return error_handler( - #err, self.logger, self.phase, - #jobid=self._logid, dryrun=self._dryrun, - ##subset_bypass=subset_bypass, - #status_fh=self.status_log) + return error_handler( + err, self.logger, self.phase, + jobid=self._logid, dryrun=self._dryrun, + subset_bypass=subset_bypass, + status_fh=self.status_log) def _run(self, **kwargs): # Default project operation run. @@ -232,7 +231,7 @@ def create_new_kfile(self, product : str): self.kfile = KerchunkFile( self.dir, product, - self.logger, + logger=self.logger, **self.fh_kwargs ) @@ -268,10 +267,9 @@ def update_status( self, phase : str, status: str, - jobid : str = '', - dryrun: str = '' + jobid : str = '' ) -> None: - self.status_log.update_status(phase, status, jobid=jobid, dryrun=dryrun) + self.status_log.update_status(phase, status, jobid=jobid) def save_files(self): # Add all files here. @@ -315,7 +313,7 @@ def _configure_filelist(self): if 'latest' in pattern: pattern = pattern.replace('latest', os.readlink(pattern)) - self.allfiles.set(glob.glob(pattern)) + self.allfiles.set(sorted(glob.glob(pattern))) def _setup_config( self, diff --git a/padocc/operations/group.py b/padocc/operations/group.py index 6e79587..b3cbe1d 100644 --- a/padocc/operations/group.py +++ b/padocc/operations/group.py @@ -18,7 +18,7 @@ ValidateOperation, ) from padocc.core.mixins import DirectoryMixin -from padocc.core.filehandlers import CSVFileHandler, TextFileHandler +from padocc.core.filehandlers import CSVFileHandler, ListFileHandler from .mixins import AllocationsMixin, InitialisationMixin, EvaluationsMixin @@ -106,7 +106,7 @@ def __init__( self.blacklist_codes = CSVFileHandler( self.groupdir, 'blacklist_codes', - self.logger, + logger=self.logger, dryrun=self._dryrun, forceful=self._forceful, ) @@ -114,7 +114,7 @@ def __init__( self.datasets = CSVFileHandler( self.groupdir, 'datasets', - self.logger, + logger=self.logger, dryrun=self._dryrun, forceful=self._forceful, ) @@ -365,16 +365,15 @@ def save_files(self): self._save_proj_codes() def _add_proj_codeset(self, name : str, newcodes : list): - self.proj_codes[name] = TextFileHandler( + self.proj_codes[name] = ListFileHandler( self.proj_codes_dir, name, - self.logger, + init_value=newcodes, + logger=self.logger, dryrun=self._dryrun, forceful=self._forceful ) - self.proj_codes[name].set(newcodes) - def check_writable(self): if not os.access(self.workdir, os.W_OK): self.logger.error('Workdir provided is not writable') @@ -487,7 +486,7 @@ def _create_job_array( sbatch_file = f'{phase}_{joblabel}.sbatch' repeat_id = f'{repeat_id}/{joblabel}' - sbatch = TextFileHandler(sbatch_dir, sbatch_file, self.logger, dryrun=self._dryrun, forceful=self._forceful) + sbatch = ListFileHandler(sbatch_dir, sbatch_file, self.logger, dryrun=self._dryrun, forceful=self._forceful) master_script = f'{source}/single_run.py' @@ -591,14 +590,14 @@ def _load_proj_codes(self): # Running for the first time self._add_proj_codeset( 'main', - self.datasets.get() + self.datasets ) for p in proj_codes: - self.proj_codes[p] = TextFileHandler( + self.proj_codes[p] = ListFileHandler( self.proj_codes_dir, p, - self.logger, + logger=self.logger, dryrun=self._dryrun, forceful=self._forceful, ) diff --git a/padocc/phases/compute.py b/padocc/phases/compute.py index 911cc4d..2aa1e8b 100644 --- a/padocc/phases/compute.py +++ b/padocc/phases/compute.py @@ -301,7 +301,7 @@ def __init__( self.temp_zattrs = JSONFileHandler( self.cache, 'temp_zattrs', - self.logger, + logger=self.logger, dryrun=self._dryrun, forceful=self._forceful ) @@ -603,7 +603,7 @@ def _dims_via_validator(self) -> tuple[list[str]]: vd.save_report( JSONFileHandler( self.dir, - 'potential_issues.json', + 'potential_issues', logger=self.logger ) ) @@ -683,7 +683,7 @@ def _run( if results is not None: self.base_cfg['data_properties'] = results self.detail_cfg['cfa'] = True - self.update_status('compute',status,jobid=self._logid, dryrun=self._dryrun) + self.update_status('compute',status,jobid=self._logid) return status def create_refs(self) -> None: @@ -709,7 +709,7 @@ def create_refs(self) -> None: t1 = datetime.now() for x, nfile in enumerate(listfiles[:self.limiter]): ref = None - CacheFile = JSONFileHandler(self.cache, f'{x}.json', + CacheFile = JSONFileHandler(self.cache, f'{x}', dryrun=self._dryrun, forceful=self._forceful, logger=self.logger) if not self._thorough: @@ -937,7 +937,7 @@ def __init__( super().__init__(proj_code, workdir, stage, *kwargs) - self.tempstore = ZarrStore(self.dir, "zarrcache.zarr", self.logger, **self.fh_kwargs) + self.tempstore = ZarrStore(self.dir, "zarrcache.zarr", logger=self.logger, **self.fh_kwargs) self.preferences = preferences if self.thorough or self.forceful: @@ -951,7 +951,7 @@ def _run(self, **kwargs) -> None: Recommended way of running an operation - includes timers etc. """ status = self._run_with_timings(self.create_store) - self.update_status('compute',status,jobid=self._logid, dryrun=self._dryrun) + self.update_status('compute',status,jobid=self._logid) return status def create_store(self): diff --git a/padocc/phases/scan.py b/padocc/phases/scan.py index a245c3a..6d7b8a3 100644 --- a/padocc/phases/scan.py +++ b/padocc/phases/scan.py @@ -171,12 +171,12 @@ def _run(self, mode: str = 'kerchunk') -> None: elif mode == 'kerchunk': self._scan_kerchunk(limiter=limiter) else: - self.update_status('scan','ValueError',jobid=self._logid, dryrun=self._dryrun) + self.update_status('scan','ValueError',jobid=self._logid) raise ValueError( f'Unrecognised mode: {mode} - must be one of ["kerchunk","zarr","CFA"]' ) - self.update_status('scan','Success',jobid=self._logid, dryrun=self._dryrun) + self.update_status('scan','Success',jobid=self._logid) return 'Success' def _scan_kerchunk(self, limiter: int = None): @@ -291,7 +291,7 @@ def _summarise_json(self, identifier) -> tuple: 'forceful':self._forceful, } - fh = JSONFileHandler(self.dir, f'cache/{identifier}.json', self.logger, **fh_kwargs) + fh = JSONFileHandler(self.dir, f'cache/{identifier}', self.logger, **fh_kwargs) kdict = fh['refs'] self.logger.debug(f'Starting Analysis of references for {identifier}') diff --git a/padocc/phases/validate.py b/padocc/phases/validate.py index 84fb99e..38014cb 100644 --- a/padocc/phases/validate.py +++ b/padocc/phases/validate.py @@ -858,6 +858,8 @@ class ValidateOperation(ProjectOperation): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + self.phase = 'validate' + def _run( self, mode: str = 'kerchunk', @@ -896,7 +898,7 @@ def _run( # Save report vd.save_report() - self.update_status('validate',vd.pass_fail,jobid=self._logid, dryrun=self._dryrun) + self.update_status('validate',vd.pass_fail,jobid=self._logid) return vd.pass_fail def _open_sample(self): @@ -922,9 +924,12 @@ def _open_product(self): """ if self.cloud_format == 'kerchunk': + + self.create_new_kfile(self.outproduct) + # Kerchunk opening sequence return open_kerchunk( - self.outpath, + self.kfile.filepath, self.logger, isparq = (self.file_type == 'parq'), retry = True, From 560e007efb18c436766b6a8c022aac44b43daaa2 Mon Sep 17 00:00:00 2001 From: dwest77a Date: Mon, 23 Dec 2024 15:35:31 +0000 Subject: [PATCH 15/35] Minor edits --- docs/source/index.rst | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/source/index.rst b/docs/source/index.rst index e4f50e4..326b8d3 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -51,9 +51,7 @@ The pipeline consists of four central phases, with an additional phase for inges Compute Validate Utils - - - + Indices and Tables ================== From 05b25df67df1379e0885a4ed7a72c915323d2a24 Mon Sep 17 00:00:00 2001 From: dwest77a Date: Mon, 23 Dec 2024 17:22:07 +0000 Subject: [PATCH 16/35] Added group merge/unmerge methods - pre-release --- padocc/core/project.py | 10 +++- padocc/operations/group.py | 111 +++++++++++++++++++++++++++++++++++- padocc/operations/mixins.py | 14 +++++ 3 files changed, 129 insertions(+), 6 deletions(-) diff --git a/padocc/core/project.py b/padocc/core/project.py index 808e7bf..bbba968 100644 --- a/padocc/core/project.py +++ b/padocc/core/project.py @@ -223,11 +223,15 @@ def run( subset_bypass=subset_bypass, status_fh=self.status_log) - def _run(self, **kwargs): + def move_to(self, new_directory: str) -> None: + """ + Move all associated files across to new directory.""" + + def _run(self, **kwargs) -> None: # Default project operation run. self.logger.info("Nothing to run with this setup!") - def create_new_kfile(self, product : str): + def create_new_kfile(self, product : str) -> None: self.kfile = KerchunkFile( self.dir, product, @@ -235,7 +239,7 @@ def create_new_kfile(self, product : str): **self.fh_kwargs ) - def create_new_kstore(self, product : str): + def create_new_kstore(self, product: str) -> None: raise NotImplementedError @property diff --git a/padocc/operations/group.py b/padocc/operations/group.py index b3cbe1d..b92ecf6 100644 --- a/padocc/operations/group.py +++ b/padocc/operations/group.py @@ -138,6 +138,114 @@ def new_inputfile(self): else: raise NotImplementedError + def merge(group_A,group_B): + """ + Merge group B into group A. + 1. Migrate all projects from B to A and reset groupID values. + 2. Combine datasets.csv + 3. Combine project codes + 4. Combine blacklists. + """ + + new_proj_dir = f'{group_A.workdir}/in_progress/{group_A.groupID}' + group_A.logger.info(f'Merging {group_B.groupID} into {group_A.groupID}') + + # Combine projects + for proj_code in group_B.proj_codes['main']: + proj_op = ProjectOperation( + proj_code, + group_B.workdir, + group_B.groupID + ) + group_A.logger.debug(f'Migrating project {proj_code}') + proj_op.move_to(new_proj_dir) + + # Datasets + group_A.datasets.set( + group_A.datasets.get() + group_B.datasets.get() + ) + group_B.datasets.remove_file() + group_A.logger.debug(f'Removed dataset file for {group_B.groupID}') + + # Blacklists + group_A.blacklist_codes.set( + group_A.blacklist_codes.get() + group_B.blacklist_codes.get() + ) + group_B.blacklist_codes.remove_file() + group_A.logger.debug(f'Removed blacklist file for {group_B.groupID}') + + # Subsets + for name, subset in group_B.proj_codes.items(): + if name not in group_A.proj_codes: + subset.move_file(group_A.groupdir) + group_A.logger.debug(f'Migrating subset {name}') + else: + group_A.proj_codes[name].set( + group_A.proj_codes[name].get() + subset.get() + ) + group_A.logger.debug(f'Merging subset {name}') + subset.remove_file() + + group_A.logger.info("Merge operation complete") + del group_B + + def unmerge(group_A, group_B, dataset_list: list): + """ + Separate elements from group_A into group_B + according to the list + 1. Migrate projects + 2. Set the datasets + 3. Set the blacklists + 4. Project codes (remove group B sections)""" + + group_A.logger.info( + f"Separating {len(dataset_list)} datasets from " + f"{group_A.groupID} to {group_B.groupID}") + + new_proj_dir = f'{group_B.workdir}/in_progress/{group_B.groupID}' + + # Combine projects + for proj_code in dataset_list: + proj_op = ProjectOperation( + proj_code, + group_A.workdir, + group_A.groupID + ) + + proj_op.move_to(new_proj_dir) + proj_op.groupID = group_B.groupID + + # Set datasets + group_B.datasets.set(dataset_list) + group_A.datasets.set( + [ds for ds in group_A.datasets if ds not in dataset_list] + ) + + group_A.logger.debug(f"Created datasets file for {group_B.groupID}") + + # Set blacklist + A_blacklist, B_blacklist = [],[] + for bl in group_A.blacklist_codes: + if bl in dataset_list: + B_blacklist.append(bl) + else: + A_blacklist.append(bl) + + group_A.blacklist_codes.set(A_blacklist) + group_B.blacklist_codes.set(B_blacklist) + group_A.logger.debug(f"Created blacklist file for {group_B.groupID}") + + # Combine project subsets + group_B.proj_codes['main'].set(dataset_list) + for name, subset in group_A.proj_codes.items(): + if name != 'main': + subset.set([s for s in subset if s not in dataset_list]) + group_A.logger.debug(f"Removed all datasets from all {group_A.groupID} subsets") + + + group_A.logger.info("Unmerge operation complete") + + def values(self): print(f'Group: {self.groupID}') print(f' - Workdir: {self.workdir}') @@ -352,9 +460,6 @@ def _validate_config( dryrun=dryrun) return status - def add_project(self): - pass - def _save_proj_codes(self): for pc in self.proj_codes.keys(): self.proj_codes[pc].close() diff --git a/padocc/operations/mixins.py b/padocc/operations/mixins.py index 706b69f..cbc0475 100644 --- a/padocc/operations/mixins.py +++ b/padocc/operations/mixins.py @@ -181,6 +181,20 @@ def _open_json(file): self.logger.info(f'Written as group ID: {self.groupID}') self.save_files() +class ModifiersMixin: + + def add_project(self): + """ + Add a project to this group + """ + pass + + def remove_project(self): + """ + Remove a project from this group + """ + pass + """ Replacement for assessor tool. Requires the following (public) methods: - progress (progress_check) From 3a4dbc1ca80ae9f92319395e21e4f62312a932bb Mon Sep 17 00:00:00 2001 From: dwest77a Date: Mon, 23 Dec 2024 17:23:10 +0000 Subject: [PATCH 17/35] Reordered docs pages --- docs/source/compute.rst | 7 -- docs/source/execution.rst | 192 ------------------------------ docs/source/extras.rst | 18 --- docs/source/filehandlers.rst | 8 -- docs/source/group.rst | 9 -- docs/source/pipeline-overview.rst | 45 ------- docs/source/project.rst | 9 -- docs/source/scan.rst | 6 - docs/source/validate.rst | 6 - 9 files changed, 300 deletions(-) delete mode 100644 docs/source/compute.rst delete mode 100644 docs/source/execution.rst delete mode 100644 docs/source/extras.rst delete mode 100644 docs/source/filehandlers.rst delete mode 100644 docs/source/group.rst delete mode 100644 docs/source/pipeline-overview.rst delete mode 100644 docs/source/project.rst delete mode 100644 docs/source/scan.rst delete mode 100644 docs/source/validate.rst diff --git a/docs/source/compute.rst b/docs/source/compute.rst deleted file mode 100644 index d523f88..0000000 --- a/docs/source/compute.rst +++ /dev/null @@ -1,7 +0,0 @@ -============== -Compute Module -============== - -.. automodule:: padocc.phases.compute - :members: - :show-inheritance: \ No newline at end of file diff --git a/docs/source/execution.rst b/docs/source/execution.rst deleted file mode 100644 index b5d396c..0000000 --- a/docs/source/execution.rst +++ /dev/null @@ -1,192 +0,0 @@ -Pipeline Flags -============== - -==================== -BypassSwitch Options -==================== - -Certain non-fatal errors may be bypassed using the Bypass flag: -:: - - Format: -b "DBSCR" - - Default: "DBSCR" # Highlighted by a '*' - - "D" - * Skip driver failures - Pipeline tries different options for NetCDF (default). - - Only need to turn this skip off if all drivers fail (KerchunkFatalDriverError). - "B" - * Skip Box compute errors. - "S" - * Skip Soft fails (NaN-only boxes in validation) (default). - "C" - * Skip calculation (data sum) errors (time array typically cannot be summed) (default). - "X" - Skip initial shape errors, by attempting XKShape tolerance method (special case.) - "R" - * Skip reporting to status_log which becomes visible with assessor. Reporting is skipped - by default in single_run.py but overridden when using group_run.py so any serial - testing does not by default report the error experienced to the status log for that project. - "F" - Skip scanning (fasttrack) and go straight to compute. Required if running compute before scan - is attempted. - -======================== -Single Dataset Operation -======================== - -Run all single-dataset processes with the ``single-run.py`` script. - -.. code-block:: python - - usage: single_run.py [-h] [-f] [-v] [-d] [-Q] [-B] [-A] [-w WORKDIR] [-g GROUPDIR] [-p PROJ_DIR] - [-t TIME_ALLOWED] [-G GROUPID] [-M MEMORY] [-s SUBSET] - [-r REPEAT_ID] [-b BYPASS] [-n NEW_VERSION] [-m MODE] [-O OVERRIDE_TYPE] - phase proj_code - - Run a pipeline step for a single dataset - - positional arguments: - phase Phase of the pipeline to initiate - proj_code Project identifier code - - options: - -h, --help show this help message and exit - -f, --forceful Force overwrite of steps if previously done - -v, --verbose Print helpful statements while running - -d, --dryrun Perform dry-run (i.e no new files/dirs created) - -Q, --quality Create refs from scratch (no loading), use all NetCDF files in validation - -B, --backtrack Backtrack to previous position, remove files that would be created in this job. - -A, --alloc-bins Use binpacking for allocations (otherwise will use banding) - - -w WORKDIR, --workdir WORKDIR - Working directory for pipeline - -g GROUPDIR, --groupdir GROUPDIR - Group directory for pipeline - -p PROJ_DIR, --proj_dir PROJ_DIR - Project directory for pipeline - -t TIME_ALLOWED, --time-allowed TIME_ALLOWED - Time limit for this job - -G GROUPID, --groupID GROUPID - Group identifier label - -M MEMORY, --memory MEMORY - Memory allocation for this job (i.e "2G" for 2GB) - -s SUBSET, --subset SUBSET - Size of subset within group - -r REPEAT_ID, --repeat_id REPEAT_ID - Repeat id (1 if first time running, _ otherwise) - -b BYPASS, --bypass-errs BYPASS - Bypass switch options: See Above - - -n NEW_VERSION, --new_version NEW_VERSION - If present, create a new version - -m MODE, --mode MODE Print or record information (log or std) - -O OVERRIDE_TYPE, --override_type OVERRIDE_TYPE - Specify cloud-format output type, overrides any determination by pipeline. - -============================= -Multi-Dataset Group Operation -============================= - -Run all multi-dataset group processes within the pipeline using the ``group_run.py`` script. - -.. code-block:: python - - usage: group_run.py [-h] [-S SOURCE] [-e VENVPATH] [-i INPUT] [-A] [--allow-band-increase] [-f] [-v] [-d] [-Q] [-b BYPASS] [-B] [-w WORKDIR] [-g GROUPDIR] - [-p PROJ_DIR] [-G GROUPID] [-t TIME_ALLOWED] [-M MEMORY] [-s SUBSET] [-r REPEAT_ID] [-n NEW_VERSION] [-m MODE] - phase groupID - - Run a pipeline step for a group of datasets - - positional arguments: - phase Phase of the pipeline to initiate - groupID Group identifier code - - options: - -h, --help show this help message and exit - -S SOURCE, --source SOURCE - Path to directory containing master scripts (this one) - -e VENVPATH, --environ VENVPATH - Path to virtual (e)nvironment (excludes /bin/activate) - -i INPUT, --input INPUT - input file (for init phase) - -A, --alloc-bins input file (for init phase) - - --allow-band-increase - Allow automatic banding increase relative to previous runs. - - -f, --forceful Force overwrite of steps if previously done - -v, --verbose Print helpful statements while running - -d, --dryrun Perform dry-run (i.e no new files/dirs created) - -Q, --quality Quality assured checks - thorough run - - -b BYPASS, --bypass-errs BYPASS - Bypass switch options: See Above - - -B, --backtrack Backtrack to previous position, remove files that would be created in this job. - -w WORKDIR, --workdir WORKDIR - Working directory for pipeline - -g GROUPDIR, --groupdir GROUPDIR - Group directory for pipeline - -p PROJ_DIR, --proj_dir PROJ_DIR - Project directory for pipeline - -G GROUPID, --groupID GROUPID - Group identifier label - -t TIME_ALLOWED, --time-allowed TIME_ALLOWED - Time limit for this job - -M MEMORY, --memory MEMORY - Memory allocation for this job (i.e "2G" for 2GB) - -s SUBSET, --subset SUBSET - Size of subset within group - -r REPEAT_ID, --repeat_id REPEAT_ID - Repeat id (main if first time running, _ otherwise) - -n NEW_VERSION, --new_version NEW_VERSION - If present, create a new version - -m MODE, --mode MODE Print or record information (log or std) - -======================= -Assessor Tool Operation -======================= - -Perform assessments of groups within the pipeline using the ``assess.py`` script. - -.. code-block:: python - - usage: assess.py [-h] [-B] [-R REASON] [-s OPTION] [-c CLEANUP] [-U UPGRADE] [-l] [-j JOBID] [-p PHASE] [-r REPEAT_ID] [-n NEW_ID] [-N NUMBERS] [-e ERROR] [-E] [-W] - [-O] [-w WORKDIR] [-g GROUPDIR] [-v] [-m MODE] - operation groupID - - Run a pipeline step for a single dataset - - positional arguments: - operation Operation to perform - choose from ['progress', 'blacklist', 'upgrade', 'summarise', 'display', 'cleanup', 'match', - 'status_log'] - groupID Group identifier code for the group on which to operate. - - options: - -h, --help show this help message and exit - -B, --blacklist Use when saving project codes to the blacklist - - -R REASON, --reason REASON - Provide the reason for handling project codes when saving to the blacklist or upgrading - -s OPTION, --show-opts OPTION - Show options for jobids, labels, also used in matching and status_log. - -c CLEANUP, --clean-up CLEANUP - Clean up group directory of errors/outputs/labels - -U UPGRADE, --upgrade UPGRADE - Upgrade to new version - -l, --long Show long error message (no concatenation) - -j JOBID, --jobid JOBID - Identifier of job to inspect - -p PHASE, --phase PHASE - Pipeline phase to inspect - -r REPEAT_ID, --repeat_id REPEAT_ID - Inspect an existing ID for errors - -n NEW_ID, --new_id NEW_ID - Create a new repeat ID, specify selection of codes by phase, error etc. - -N NUMBERS, --numbers NUMBERS - Show project code IDs for lists of codes less than the N value specified here. - -e ERROR, --error ERROR - Inspect error of a specific type - -E, --examine Examine log outputs individually. - -W, --write Write outputs to files - -O, --overwrite Force overwrite of steps if previously done - -w WORKDIR, --workdir WORKDIR - Working directory for pipeline - -g GROUPDIR, --groupdir GROUPDIR - Group directory for pipeline - -v, --verbose Print helpful statements while running - -m MODE, --mode MODE Print or record information (log or std) \ No newline at end of file diff --git a/docs/source/extras.rst b/docs/source/extras.rst deleted file mode 100644 index 0d04fd6..0000000 --- a/docs/source/extras.rst +++ /dev/null @@ -1,18 +0,0 @@ -Padocc Utility Scripts -====================== - -========= -Utilities -========= - -.. automodule:: padocc.core.utils - :members: - :show-inheritance: - -======= -Logging -======= - -.. automodule:: padocc.core.logs - :members: - :show-inheritance: \ No newline at end of file diff --git a/docs/source/filehandlers.rst b/docs/source/filehandlers.rst deleted file mode 100644 index 8f730f3..0000000 --- a/docs/source/filehandlers.rst +++ /dev/null @@ -1,8 +0,0 @@ -Padocc Filehandlers -====================== - -**A summary of the custom errors that are experienced through running the pipeline.** - -.. automodule:: padocc.core.filehandlers - :members: - :show-inheritance: \ No newline at end of file diff --git a/docs/source/group.rst b/docs/source/group.rst deleted file mode 100644 index 9e49bc6..0000000 --- a/docs/source/group.rst +++ /dev/null @@ -1,9 +0,0 @@ -======================================== -GroupOperation Core and Mixin Behaviours -======================================== - -.. automodule:: padocc.operations.group - :members: - -.. automodule:: padocc.operations.mixins - :members: \ No newline at end of file diff --git a/docs/source/pipeline-overview.rst b/docs/source/pipeline-overview.rst deleted file mode 100644 index 3d1ddc1..0000000 --- a/docs/source/pipeline-overview.rst +++ /dev/null @@ -1,45 +0,0 @@ -Overview of Pipeline Phases -=========================== - -.. image:: _images/pipeline.png - :alt: Stages of the Kerchunk Pipeline - -**Init (Initialisation) Phase** - -The pipeline takes a CSV (or similar) input file and creates the necessary directories and config files for the pipeline to being running. - -**Scan Phase** - -Second phase of the pipeline involves scanning a subset of the NetCDF/HDF/Tiff files to determine certain parameters: - -* Ensure NetCDF/HDF/Tiff files can be converted successfully using one of the available drivers: -* Calculate expected memory (for job allocation later.) -* Calculate estimated chunk sizes and other values. -* Determine file-type (JSON or Parquet) for final Kerchunk file. -* Identify Identical/Concat dims for use in **Compute** phase. -* Determine any other specific parameters for the dataset on creation and concatenation. - -**Compute Phase** - -Building the Kerchunk file for a dataset requires a multi*step process: - -* Create Kerchunk references for each archive-type file. -* Save cache of references for each file prior to concatenation. -* Perform concatenation (abort if concatenation fails, can load cache on second attempt). -* Perform metadata corrections (based on updates and removals specified at the start) -* Add Kerchunk history global attributes (creation time, pipeline version etc.) -* Reconfigure each chunk for remote access (replace local path with https:// download path) - -**Validation Phase** - -Kerchunk files must be validated against equivalent Xarray objects from the original NetCDF: - -* Ensure all variables present in original files are present in Kerchunk (barring exceptions) -* Ensure array shapes are consistent across Kerchunk/NetCDF -* Ensure data representations are consistent (values in array subsets) - -Several options and switches can be configured for the validation step, see the BypassSwitch class. - -**Next Steps** - -Kerchunk files that have been validated are moved to a ``complete`` directory with the project code as the name, plus the kerchunk revision `krX.X`. These can then be linked to a catalog or ingested into the CEDA archive where appropriate. diff --git a/docs/source/project.rst b/docs/source/project.rst deleted file mode 100644 index 1b6c99f..0000000 --- a/docs/source/project.rst +++ /dev/null @@ -1,9 +0,0 @@ -========================================== -ProjectOperation Core and Mixin Behaviours -========================================== - -.. automodule:: padocc.core.project - :members: - -.. automodule:: padocc.core.mixins - :members: \ No newline at end of file diff --git a/docs/source/scan.rst b/docs/source/scan.rst deleted file mode 100644 index 035d35c..0000000 --- a/docs/source/scan.rst +++ /dev/null @@ -1,6 +0,0 @@ -============== -Scanner Module -============== - -.. automodule:: padocc.phases.scan - :members: \ No newline at end of file diff --git a/docs/source/validate.rst b/docs/source/validate.rst deleted file mode 100644 index 389412d..0000000 --- a/docs/source/validate.rst +++ /dev/null @@ -1,6 +0,0 @@ -================= -Validation Module -================= - -.. automodule:: padocc.phases.validate - :members: From a83a3d6268b31b2ffe24278a1cb87008ed08ae03 Mon Sep 17 00:00:00 2001 From: dwest77a Date: Mon, 23 Dec 2024 17:23:47 +0000 Subject: [PATCH 18/35] Added introductory documentation pages --- docs/source/index.rst | 43 ++++++++++++++++------------- docs/source/inspiration.rst | 52 ++++++++++++++++++++++++++++++++++++ docs/source/introduction.rst | 50 ++++++++++++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 18 deletions(-) create mode 100644 docs/source/inspiration.rst create mode 100644 docs/source/introduction.rst diff --git a/docs/source/index.rst b/docs/source/index.rst index 326b8d3..64729e4 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -14,43 +14,50 @@ Vast amounts of archival data in a variety of formats can be processed using the Currently supported input file formats: - NetCDF/HDF - - GeoTiff (**coming soon**) - - GRIB (**coming soon**) + - GeoTiff + - GRIB - MetOffice (**future**) -*padocc* is capable of generating both reference files with Kerchunk (JSON or Parquet) and cloud formats like Zarr. +*padocc* is capable of generating both reference files with Kerchunk (JSON or Parquet) and cloud formats like Zarr. +Additionally, PADOCC creates CF-compliant aggregation files as part of the standard workflow, which means you get CFA-netCDF files as standard! +You can find out more about Climate Forecast Aggregations `here _`, these files are denoted with the extension ``.nca`` and can be opened using xarray with ``engine="CFA"`` if you have the ``CFAPyX`` package installed. -The pipeline consists of four central phases, with an additional phase for ingesting/cataloging the produced Kerchunk files. This is not part of the code-base of the pipeline currently but could be added in a future update. +The pipeline consists of three central phases, with an additional phase for ingesting/cataloging the produced Kerchunk files. +These phases represent operations that can be applied across groups of datasets in parallel, depending on the architecture of your system. +For further information around configuring PADOCC for parallel deployment please contact `daniel.westwood@stfc.ac.uk _`. -.. image:: _images/pipeline.png - :alt: Stages of the Kerchunk Pipeline +The ingestion/cataloging phase is not currently implemented for public use but may be added in a future update. + +.. image:: _images/padocc.png + :alt: Stages of the PADOCC workflow .. toctree:: - :maxdepth: 1 + :maxdepth: 2 :caption: Contents: - Introduction + Introduction + Inspiration Getting Started - Example CCI Water Vapour - Padocc Flags/Options - Error Codes + Example Operation + A Deep Dive Developer's Guide .. toctree:: :maxdepth: 1 :caption: Operations: + The Project Operator + The Group Operator + SHEPARD + .. toctree:: :maxdepth: 1 :caption: PADOCC Source: - Single Datasets - Groups of Datasets - Filehandlers - Scanning - Compute - Validate - Utils + Projects + Groups + Filehandlers, Logs, and Utilities + phases Indices and Tables ================== diff --git a/docs/source/inspiration.rst b/docs/source/inspiration.rst new file mode 100644 index 0000000..b6a8568 --- /dev/null +++ b/docs/source/inspiration.rst @@ -0,0 +1,52 @@ +Inspiration for the Aggregation Pipeline +======================================== + +Data Archives +------------- + +The need for cloud-accessible analysis-ready data is increasing due to high demand for cloud-native applications and wider usability of data. +Current archival formats and access methods are insufficient for an increasing number of user needs, especially given the volume of data being +produced by various projects globally. + +.. image:: _images/CedaArchive0824.png + :alt: Contents of the CEDA Archive circa August 2024. + :align: center + +The CEDA-operated JASMIN data analysis facility has a current (2024) data archive of more than 30 Petabytes, with more datasets being ingested +daily. Around 25% of all datasets are in NetCDF/HDF formats which are well-optimised for HPC architecture, but do not typically perform as well +and are not as accessible for cloud-based applications. The standard NetCDF/HDF python readers for example require direct access to the source +files, so are not able to open files stored either in Object Storage (S3) or served via a download service, without first downloading the whole file. + +Distributed Data +---------------- + +The aim of distributed data aggregations is to make the access of data more effective when dealing with these vast libraries of data. +Directly accessing the platforms, like JASMIN, where the data is stored is not necessarily possible for all users, and we would like to avoid the dependence +on download services where GB/TBs of data is copied across multiple sites. Instead, the data may be accessed via a **reference/aggregation file** which provides +the instructions to fetch portions of the data, and applications reading the file are able to load data as needed rather than all at once (Lazy Loading). + +.. image:: _images/DataDistributed.png + :alt: A diagram of how the typcial Distributed Data methods operate. + :align: center + +Formats which provide effective remote data access are typically referred to as **Cloud Optimised Formats** (COFs) like `Zarr `_ and `Kerchunk `_, as in the diagram above. +Zarr stores contain individual **binary-encoded** files for each chunk of data in memory. Opening a Zarr store means accessing the top-level metadata which +informs the application reader how the data is structured. Subsequent calls to load the data will then only load the appropriate memory chunks. Kerchunk +functions similarly as a pointer to chunks of data in another location, however Kerchunk only references the existing chunk structure within NetCDF files, +rather than having each chunk as a separate file. + +PADOCC supports an additional format called CFA, which takes elements of both of these methods. CFA files store references to portions of the array, rather than ranges of bytes of compressed/uncompressed data like with Kerchunk. +These references are stored in NetCDF instead of JSON metadata files, which has the advantage of lazily-loaded references from a single file. Read more about CF Aggregations `here _`. + +A workflow for data conversion +------------------------------ + +PADOCC is a tool being actively developed at CEDA to enable large-scale conversion of archival data to some of these new cloud formats, to address the issues above. +Originally created as part of the ESA Climate Change Initiative project, PADOCC is steadily growing into an essential part of the CEDA ingestion pipeline. +New datasets deposited into the CEDA archive will soon be automatically converted by PADOCC and represented as part of the growing STAC catalog collection at CEDA. +Use of the catalogs is facilitated by the `CEDA DataPoint _` package, which auto-configures for multiple different file types. + +The result of this new data architecture will be that users of CEDA data can discover and access data through our packages much faster and more efficiently than before, +without the need to learn to use many new formats. All the nuances of each dataset are handled by DataPoint, and use products created by PADOCC to facilitate fast search/access +to the data. + diff --git a/docs/source/introduction.rst b/docs/source/introduction.rst new file mode 100644 index 0000000..1684f36 --- /dev/null +++ b/docs/source/introduction.rst @@ -0,0 +1,50 @@ +Overview of Pipeline Phases +=========================== + +.. image:: _images/padocc.png + :alt: Stages of the PADOCC workflow + +**Initialisation of a Group of Datasets** + +The pipeline takes a CSV (or similar) input file from which to instantiate a ``GroupOperation``, which includes: + - creating subdirectories for all associated datasets (projects) + - creating multiple group files with information regarding this group. + +**Scan Phase** + +The first main phase of the pipeline involves scanning a subset of the native source files to determine certain parameters: + +* Ensure source files are compatible with one of the available converters for Kerchunk/Zarr etc.: +* Calculate expected memory (for job allocation later.) +* Calculate estimated chunk sizes and other values. +* Determine suggested file type, including whether to use JSON or Parquet for Kerchunk references. +* Identify Identical/Concat dims for use in **Compute** phase. +* Determine any other specific parameters for the dataset on creation and concatenation. + +**Compute Phase** + +Building the Cloud/reference product for a dataset requires a multi-step process: + +Example for Kerchunk: +* Create Kerchunk references for each archive-type file. +* Save cache of references for each file prior to concatenation. +* Perform concatenation (abort if concatenation fails, can load cache on second attempt). +* Perform metadata corrections (based on updates and removals specified at the start) +* Add Kerchunk history global attributes (creation time, pipeline version etc.) +* Reconfigure each chunk for remote access (replace local path with https:// download path) + +**Validation Phase** + +Cloud products must be validated against equivalent Xarray objects from CF Aggregations (CFA) where possible, or otherwise using the original NetCDF as separate Xarray Datasets. + +* Ensure all variables present in original files are present in the cloud products (barring exceptions where metadata has been altered/corrected) +* Ensure array shapes are consistent across the products. +* Ensure data representations are consistent (values in array subsets) + +The validation step produced a two-sectioned report that outlines validation warnings and errors with the data or metadata +around the project. See the documentation on the validation report for more details. + +**Next Steps** + +Cloud products that have been validated are moved to a ``complete`` directory with the project code as the name, plus the revision identifier `abX.X` - learn more about this in the deep dive section. +These can then be linked to a catalog or ingested into the CEDA archive where appropriate. From 4cc1123670a547d7dc2e2e5b18a5cb1792983340 Mon Sep 17 00:00:00 2001 From: dwest77a Date: Mon, 23 Dec 2024 17:42:28 +0000 Subject: [PATCH 19/35] Added basic descriptions for several sections --- docs/source/deep_dive.rst | 67 ++++++++++++++++++++++++++++++++++ docs/source/group_source.rst | 11 ++++++ docs/source/misc_source.rst | 31 ++++++++++++++++ docs/source/project_source.rst | 11 ++++++ 4 files changed, 120 insertions(+) create mode 100644 docs/source/deep_dive.rst create mode 100644 docs/source/group_source.rst create mode 100644 docs/source/misc_source.rst create mode 100644 docs/source/project_source.rst diff --git a/docs/source/deep_dive.rst b/docs/source/deep_dive.rst new file mode 100644 index 0000000..6650f60 --- /dev/null +++ b/docs/source/deep_dive.rst @@ -0,0 +1,67 @@ +=================================== +A Deeper Dive into PADOCC Mechanics +=================================== + +Revision Numbers +---------------- + +The PADOCC revision numbers for each product are auto-generated using the following rules. + * All projects begin with the revision number ``1.1``. + * The first number denotes major updates to the product, for instance where a data source file has been replaced. + * The second number denotes minor changes like alterations to attributes and metadata. + * The letters prefixed to the revision numbers identify the file type for the product. For example a zarr store has the letter ``z`` applied, while a Kerchunk (parquet) store has ``kp``. + +The Validation Report +--------------------- + +The ``ValidateDatasets`` class produces a validation report for both data and metadata validations. +This is designed to be fairly simple to interpret, while still being machine-readable. +The following headings which may be found in the report have the following meanings: + +1. Metadata Report (with Examples) +These are considered non-fatal errors that will need either a minor correction or can be ignored. +* ``variables.time: {'type':'missing'...}`` - The time variable is missing from the specified product. +* ``dims.all_dims: {'type':'order'}`` - The ordering of dimensions is not consistent across products. +* ``attributes {'type':'ignore'...}`` - Attributes that have been ignored. These may have already been edited. +* ``attributes {'type':'missing'...}`` - Attributes that are missing from the specified product file. +* ``attributes {'type':'not_equal'...}`` - Attributes that are not equal across products. + +2. Data Report +These are considered **fatal** errors that need a major correction or possibly a fix to the pipeline itself. +* ``size_errors`` - The size of the array is not consistent between products. +* ``dim_errors`` - Arrays have inconsistent dimensions (where not ignored). +* ``dim_size_errors`` - The dimensions are consistent for a variable but their sizes are not. +* ``data_errors`` - The data arrays do not match across products, this is the most fatal of all validation errors. +The validator should give an idea of which array comparisons failed. +* ``data_errors: {'type':'growbox_exceeded'...}`` - The variable in question could not be validated as no area could be identified that is not empty of values. + +BypassSwitch Options +-------------------- + +Certain non-fatal errors may be bypassed using the Bypass flag: +:: + + Format: -b "DBSCR" + + Default: "DBSCR" # Highlighted by a '*' + + "D" - * Skip driver failures - Pipeline tries different options for NetCDF (default). + - Only need to turn this skip off if all drivers fail (KerchunkFatalDriverError). + "B" - * Skip Box compute errors. + "S" - * Skip Soft fails (NaN-only boxes in validation) (default). + "C" - * Skip calculation (data sum) errors (time array typically cannot be summed) (default). + "X" - Skip initial shape errors, by attempting XKShape tolerance method (special case.) + "R" - * Skip reporting to status_log which becomes visible with assessor. Reporting is skipped + by default in single_run.py but overridden when using group_run.py so any serial + testing does not by default report the error experienced to the status log for that project. + "F" - Skip scanning (fasttrack) and go straight to compute. Required if running compute before scan + is attempted. + +Custom Pipeline Errors +---------------------- + +**A summary of the custom errors that are experienced through running the pipeline.** + +.. automodule:: padocc.core.errors + :members: + :show-inheritance: \ No newline at end of file diff --git a/docs/source/group_source.rst b/docs/source/group_source.rst new file mode 100644 index 0000000..7f493e6 --- /dev/null +++ b/docs/source/group_source.rst @@ -0,0 +1,11 @@ +======================================== +GroupOperation Core and Mixin Behaviours +======================================== + +Source code for group operations and mixin behaviours. + +.. automodule:: padocc.operations.group + :members: + +.. automodule:: padocc.operations.mixins + :members: \ No newline at end of file diff --git a/docs/source/misc_source.rst b/docs/source/misc_source.rst new file mode 100644 index 0000000..23e6bd9 --- /dev/null +++ b/docs/source/misc_source.rst @@ -0,0 +1,31 @@ +Padocc Filehandlers +====================== + +Filehandlers are an integral component of PADOCC on the filesystem. The filehandlers +connect directly to files within the pipeline directories for different groups and projects +and provide a seamless environment for fetching and saving values to these files. + +Filehandlers act like their respective data-types in most or all methods. +For example the ``JSONFileHandler`` acts like a dictionary, but with extra methods to close and save +the loaded data. Filehandlers can also be easily migrated or removed from the filesystem as part of other +processes. + +.. automodule:: padocc.core.filehandlers + :members: + :show-inheritance: + +========= +Utilities +========= + +.. automodule:: padocc.core.utils + :members: + :show-inheritance: + +======= +Logging +======= + +.. automodule:: padocc.core.logs + :members: + :show-inheritance: \ No newline at end of file diff --git a/docs/source/project_source.rst b/docs/source/project_source.rst new file mode 100644 index 0000000..2420811 --- /dev/null +++ b/docs/source/project_source.rst @@ -0,0 +1,11 @@ +========================================== +ProjectOperation Core and Mixin Behaviours +========================================== + +Source code for individual project operations and mixin behaviours. + +.. automodule:: padocc.core.project + :members: + +.. automodule:: padocc.core.mixins + :members: \ No newline at end of file From 98e7872cb24ab078539bcee58a038c8fd9aaacab Mon Sep 17 00:00:00 2001 From: dwest77a Date: Mon, 23 Dec 2024 18:02:10 +0000 Subject: [PATCH 20/35] Added brief messages about each phase --- docs/source/phases.rst | 72 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 docs/source/phases.rst diff --git a/docs/source/phases.rst b/docs/source/phases.rst new file mode 100644 index 0000000..a468d92 --- /dev/null +++ b/docs/source/phases.rst @@ -0,0 +1,72 @@ +============== +Scanner Module +============== + +A scan operation is performed across a group of datasets/projects to determine specific +properties of each project and some estimates of time/memory allocations that will be +required in later phases. + +The scan phase can be activated with the following: + +.. code:: python + + mygroup = GroupOperation( + 'my-group', + workdir='path/to/pipeline/directory' + ) + # Assuming this group has already been initialised from a file. + + mygroup.run('scan',mode='kerchunk') + +.. automodule:: padocc.phases.scan + :members: + +============== +Compute Module +============== + +Computation will either refer to outright data conversion to a new format, +or referencing using one of the Kerchunk drivers to create a reference file. +In either case the computation may be extensive and require processing in the background +or deployment and parallelisation across the group of projects. + +Computation can be executed in serial for a group with the following: + +.. code:: python + + mygroup = GroupOperation( + 'my-group', + workdir='path/to/pipeline/directory' + ) + # Assuming this group has already been initialised and scanned + + mygroup.run('compute',mode='kerchunk') + +.. automodule:: padocc.phases.compute + :members: + :show-inheritance: + +================= +Validation Module +================= + +Finally, it is advised to run the validator for all projects in a group to determine any issues +with the conversion process. Some file types or specific arrangements may produce unwanted effects +that result in differences between the original and new representations. This can be identified with the +validator which checks the Xarray representations and identifies differences in both data and metadata. + +.. code:: python + + mygroup = GroupOperation( + 'my-group', + workdir='path/to/pipeline/directory' + ) + # Assuming this group has already been initialised, scanned and computed + + mygroup.run('validate') + + # The validation reports will be saved to the filesystem for each project in this group + # as 'data_report.json' and 'metadata_report.json' + +.. automodule:: padocc.phases.validate + :members: From 3bbfdb000066221cbd855ae376c28cc446e2d90b Mon Sep 17 00:00:00 2001 From: dwest77a Date: Mon, 23 Dec 2024 18:08:29 +0000 Subject: [PATCH 21/35] Initial commit of shepard documentation --- docs/source/shepard.rst | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 docs/source/shepard.rst diff --git a/docs/source/shepard.rst b/docs/source/shepard.rst new file mode 100644 index 0000000..4f93322 --- /dev/null +++ b/docs/source/shepard.rst @@ -0,0 +1,16 @@ +The SHEPARD Module +================== + +The latest development in the PADOCC package is the SHEPARD Module (coming in 2025). + +SHEPARD (Serial Handler for Enabling PADOCC Aggregations via Recurrent Deployment) is a component +designed as an entrypoint script within the PADOCC environment to automate the operation of the pipeline. +Groups of datasets (called ``flocks``) can be created by producing input files and placing them in a persistent +directory accessible to a deployment of PADOCC. The deployment operates an hourly check in this directory, +and picks up any added files or any changes to existing files. The groups specified can then be automatically +run through all sections of the pipeline. + +The deployment of SHEPARD at CEDA involves a Kubernetes Pod that has access to the JASMIN filesystem as well as +the capability to deploy to JASMIN's LOTUS cluster for job submissions. The idea will be for SHEPARD to run +continuously, slowly processing large sections of the CEDA archive and creating cloud formats that can be utilised +by other packages like DataPoint (see the Inspiration tab) that provide fast access to data. \ No newline at end of file From 28cd361d6123e014419adaf8025e372860d1d631 Mon Sep 17 00:00:00 2001 From: dwest77a Date: Mon, 6 Jan 2025 16:22:27 +0000 Subject: [PATCH 22/35] Various syntax changes in documentation, merged intro and phases, updated projects --- docs/source/cci_water.rst | 77 +++---------------------------------- docs/source/deep_dive.rst | 18 ++++----- docs/source/groups.rst | 20 ++++++++++ docs/source/index.rst | 15 ++++---- docs/source/inspiration.rst | 8 ++-- docs/source/phases.rst | 66 ++++++++++++++++++++++++++----- docs/source/projects.rst | 61 +++++++++++++++++++++++++++++ docs/source/start.rst | 57 ++++++++------------------- 8 files changed, 177 insertions(+), 145 deletions(-) create mode 100644 docs/source/groups.rst create mode 100644 docs/source/projects.rst diff --git a/docs/source/cci_water.rst b/docs/source/cci_water.rst index 357f279..94ad2bd 100644 --- a/docs/source/cci_water.rst +++ b/docs/source/cci_water.rst @@ -8,9 +8,10 @@ A new *group* is created within the pipeline using the ``init`` operation as fol :: - python group_run.py init -i extensions/example_water_vapour/water_vapour.csv -v + padocc init -G -i extensions/example_water_vapour/water_vapour.csv -v .. note:: + Multiple flag options are available throughout the pipeline for more specific operations and methods. In the above case we have used the (-v) *verbose* flag to indicate we want to see the ``[INFO]`` messages put out by the pipeline. Adding a second (v) would also show ``[DEBUG]`` messages. Also the ``init`` phase is always run as a serial process since it just involves creating the directories and config files required by the pipeline. @@ -56,28 +57,6 @@ Ok great, we've initialised the pipeline for our new group! Here's a summary dia - validate.log - status_log.csv -For peace of mind and to check you understand the pipeline assessor tool we would suggest running this command next: - -:: - - python assess.py progress my_new_group - -Upon which your output should look something like this: - -.. code-block:: console - - Group: my_new_group - Total Codes: 4 - - Pipeline Current: - - init : 4 [100.%] (Variety: 1) - - complete : 4 - - Pipeline Complete: - - complete : 0 [0.0 %] - All 4 of our datasets were initialised successfully, no datasets are complete through the pipeline yet. The next steps are to ``scan``, ``compute``, and ``validate`` the datasets which would complete the pipeline. @@ -88,52 +67,8 @@ The next steps are to ``scan``, ``compute``, and ``validate`` the datasets which .. code-block:: console - python group_run.py scan my_new_group - python group_run.py compute my_new_group - python group_run.py validate my_new_group - -An more complex example of what you might see while running the pipeline in terms of errors encountered can be found below: - -.. code-block:: console - - Group: cci_group_v1 - Total Codes: 361 - - Pipeline Current: - - compute : 21 [5.8 %] (Variety: 2) - - complete : 20 - - KeyError 'refs' : 1 - - Pipeline Complete: - - complete : 185 [51.2%] - - blacklist : 155 [42.9%] (Variety: 8) - - NonKerchunkable : 50 - - PartialDriver : 3 - - PartialDriverFail : 5 - - ExhaustedMemoryLimit : 56 - - ExhaustedTimeLimit : 18 - - ExhaustedTimeLimit* : 1 - - ValidationMemoryLimit : 21 - - ScipyDimIssue : 1 - -In this example ``cci_group_v1`` group, 185 of the datasets have completed the pipeline, while 155 have been excluded (See blacklisting in the Assessor Tool section). -Of the remaining 21 datasets, 20 of them have completed the ``compute`` phase and now need to be run through ``validate``, but one encountered a KeyError which needs to be inspected. To view the log for this dataset we can use the command below: - -.. code-block:: console - - python assess.py progress cci_group_v1 -e "KeyError 'refs'" -p compute -E - -This will match with our ``compute``-phase error with that message, and the (-E) flag will give us the whole error log from that run. This may be enough to assess and fix the issue but otherwise, to rerun just this dataset a rerun command will be suggested by the assessor: - -.. code-block:: console - - Project Code: 201601-201612-ESACCI-L4_FIRE-BA-MSI-fv1.1 - 'refs' - Rerun suggested command: python single_run.py compute 218 -G cci_group_v1 -vv -d - -This rerun command has several flags included, the most importand here is the (-G) group flag, since we need to use the ``single_run`` script so now need to specify the group. The (-d) dryrun flag will simply mean we are not producing any output files since we may need to test and rerun several times. - - + padocc scan -G my_new_group + padocc compute -G my_new_group + padocc validate -G my_new_group +This section will be updated for the full release of v1.3 with additional content relating to the assessor tool. \ No newline at end of file diff --git a/docs/source/deep_dive.rst b/docs/source/deep_dive.rst index 6650f60..5ea05bf 100644 --- a/docs/source/deep_dive.rst +++ b/docs/source/deep_dive.rst @@ -6,6 +6,7 @@ Revision Numbers ---------------- The PADOCC revision numbers for each product are auto-generated using the following rules. + * All projects begin with the revision number ``1.1``. * The first number denotes major updates to the product, for instance where a data source file has been replaced. * The second number denotes minor changes like alterations to attributes and metadata. @@ -20,6 +21,7 @@ The following headings which may be found in the report have the following meani 1. Metadata Report (with Examples) These are considered non-fatal errors that will need either a minor correction or can be ignored. + * ``variables.time: {'type':'missing'...}`` - The time variable is missing from the specified product. * ``dims.all_dims: {'type':'order'}`` - The ordering of dimensions is not consistent across products. * ``attributes {'type':'ignore'...}`` - Attributes that have been ignored. These may have already been edited. @@ -28,6 +30,7 @@ These are considered non-fatal errors that will need either a minor correction o 2. Data Report These are considered **fatal** errors that need a major correction or possibly a fix to the pipeline itself. + * ``size_errors`` - The size of the array is not consistent between products. * ``dim_errors`` - Arrays have inconsistent dimensions (where not ignored). * ``dim_size_errors`` - The dimensions are consistent for a variable but their sizes are not. @@ -41,21 +44,16 @@ BypassSwitch Options Certain non-fatal errors may be bypassed using the Bypass flag: :: - Format: -b "DBSCR" + Format: -b "D" - Default: "DBSCR" # Highlighted by a '*' + Default: "D" # Highlighted by a '*' "D" - * Skip driver failures - Pipeline tries different options for NetCDF (default). - - Only need to turn this skip off if all drivers fail (KerchunkFatalDriverError). - "B" - * Skip Box compute errors. - "S" - * Skip Soft fails (NaN-only boxes in validation) (default). - "C" - * Skip calculation (data sum) errors (time array typically cannot be summed) (default). - "X" - Skip initial shape errors, by attempting XKShape tolerance method (special case.) - "R" - * Skip reporting to status_log which becomes visible with assessor. Reporting is skipped - by default in single_run.py but overridden when using group_run.py so any serial - testing does not by default report the error experienced to the status log for that project. + - Only need to turn this skip off if all drivers fail (KerchunkDriverFatalError). "F" - Skip scanning (fasttrack) and go straight to compute. Required if running compute before scan is attempted. + "L" - Skip adding links in compute (download links) - this will be required on ingest. + "S" - Skip errors when running a subset within a group. Record the error then move onto the next dataset. Custom Pipeline Errors ---------------------- diff --git a/docs/source/groups.rst b/docs/source/groups.rst new file mode 100644 index 0000000..14c3220 --- /dev/null +++ b/docs/source/groups.rst @@ -0,0 +1,20 @@ +Groups in PADOCC +================ + +The advantage of using PADOCC over other tools for creating cloud-format files is the scalability built-in, with parallelisation and deployment in mind. +PADOCC allows the creation of groups of datasets, each with N source files, that can be operated upon as a single entity. +The operation can be applied to all or a subset of the datasets within the group with relative ease. Here we outline some basic functionality of the ``GroupOperation``. +See the source documentation page for more detail. + +Instantiating a Group +--------------------- + +Initialisation from a File +-------------------------- + +Applying an operation +--------------------- + +Merging or Unmerging +-------------------- +**currently in development - alpha release** \ No newline at end of file diff --git a/docs/source/index.rst b/docs/source/index.rst index 64729e4..f384603 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -6,7 +6,7 @@ PADOCC - User Documentation ============================ -**padocc** (Pipeline to Aggregate Data for Optimised Cloud Capabilites) is a Python package (formerly **kerchunk-builder**) for aggregating data to enable methods of access for cloud-based applications. +**padocc** (Pipeline to Aggregate Data for Optimised Cloud Capabilites) is a Python package for aggregating data to enable methods of access for cloud-based applications. The pipeline makes it easy to generate data-aggregated access patterns in the form of Reference Files or Cloud Formats across different datasets simultaneously with validation steps to ensure the outputs are correct. @@ -20,26 +20,26 @@ Currently supported input file formats: *padocc* is capable of generating both reference files with Kerchunk (JSON or Parquet) and cloud formats like Zarr. Additionally, PADOCC creates CF-compliant aggregation files as part of the standard workflow, which means you get CFA-netCDF files as standard! -You can find out more about Climate Forecast Aggregations `here _`, these files are denoted with the extension ``.nca`` and can be opened using xarray with ``engine="CFA"`` if you have the ``CFAPyX`` package installed. +You can find out more about Climate Forecast Aggregations `here `_, these files are denoted with the extension ``.nca`` and can be opened using xarray with ``engine="CFA"`` if you have the ``CFAPyX`` package installed. The pipeline consists of three central phases, with an additional phase for ingesting/cataloging the produced Kerchunk files. These phases represent operations that can be applied across groups of datasets in parallel, depending on the architecture of your system. -For further information around configuring PADOCC for parallel deployment please contact `daniel.westwood@stfc.ac.uk _`. +For further information around configuring PADOCC for parallel deployment please contact `daniel.westwood@stfc.ac.uk `_. The ingestion/cataloging phase is not currently implemented for public use but may be added in a future update. -.. image:: _images/padocc.png +.. image:: _images/pipeline.png :alt: Stages of the PADOCC workflow .. toctree:: - :maxdepth: 2 + :maxdepth: 1 :caption: Contents: - Introduction Inspiration + Steps to Run Padocc Getting Started Example Operation - A Deep Dive + A Deep Dive Developer's Guide .. toctree:: @@ -57,7 +57,6 @@ The ingestion/cataloging phase is not currently implemented for public use but m Projects Groups Filehandlers, Logs, and Utilities - phases Indices and Tables ================== diff --git a/docs/source/inspiration.rst b/docs/source/inspiration.rst index b6a8568..dd31799 100644 --- a/docs/source/inspiration.rst +++ b/docs/source/inspiration.rst @@ -1,5 +1,5 @@ -Inspiration for the Aggregation Pipeline -======================================== +Inspiration for Cloud Formats and Aggregations +============================================== Data Archives ------------- @@ -36,7 +36,7 @@ functions similarly as a pointer to chunks of data in another location, however rather than having each chunk as a separate file. PADOCC supports an additional format called CFA, which takes elements of both of these methods. CFA files store references to portions of the array, rather than ranges of bytes of compressed/uncompressed data like with Kerchunk. -These references are stored in NetCDF instead of JSON metadata files, which has the advantage of lazily-loaded references from a single file. Read more about CF Aggregations `here _`. +These references are stored in NetCDF instead of JSON metadata files, which has the advantage of lazily-loaded references from a single file. Read more about CF Aggregations `here `_. A workflow for data conversion ------------------------------ @@ -44,7 +44,7 @@ A workflow for data conversion PADOCC is a tool being actively developed at CEDA to enable large-scale conversion of archival data to some of these new cloud formats, to address the issues above. Originally created as part of the ESA Climate Change Initiative project, PADOCC is steadily growing into an essential part of the CEDA ingestion pipeline. New datasets deposited into the CEDA archive will soon be automatically converted by PADOCC and represented as part of the growing STAC catalog collection at CEDA. -Use of the catalogs is facilitated by the `CEDA DataPoint _` package, which auto-configures for multiple different file types. +Use of the catalogs is facilitated by the `CEDA DataPoint `_ package, which auto-configures for multiple different file types. The result of this new data architecture will be that users of CEDA data can discover and access data through our packages much faster and more efficiently than before, without the need to learn to use many new formats. All the nuances of each dataset are handled by DataPoint, and use products created by PADOCC to facilitate fast search/access diff --git a/docs/source/phases.rst b/docs/source/phases.rst index a468d92..d165111 100644 --- a/docs/source/phases.rst +++ b/docs/source/phases.rst @@ -1,6 +1,28 @@ -============== -Scanner Module -============== +============================= +Phases of the PADOCC Pipeline +============================= + +.. image:: _images/padocc.png + :alt: Stages of the PADOCC workflow + +**Initialisation of a Group of Datasets** + + +The pipeline takes a CSV (or similar) input file from which to instantiate a ``GroupOperation``, which includes: + - creating subdirectories for all associated datasets (projects) + - creating multiple group files with information regarding this group. + +Scan +---- + +The first main phase of the pipeline involves scanning a subset of the native source files to determine certain parameters: + +* Ensure source files are compatible with one of the available converters for Kerchunk/Zarr etc.: +* Calculate expected memory (for job allocation later.) +* Calculate estimated chunk sizes and other values. +* Determine suggested file type, including whether to use JSON or Parquet for Kerchunk references. +* Identify Identical/Concat dims for use in **Compute** phase. +* Determine any other specific parameters for the dataset on creation and concatenation. A scan operation is performed across a group of datasets/projects to determine specific properties of each project and some estimates of time/memory allocations that will be @@ -21,9 +43,19 @@ The scan phase can be activated with the following: .. automodule:: padocc.phases.scan :members: -============== -Compute Module -============== +Compute +------- + +Building the Cloud/reference product for a dataset requires a multi-step process: + +Example for Kerchunk: + +* Create Kerchunk references for each archive-type file. +* Save cache of references for each file prior to concatenation. +* Perform concatenation (abort if concatenation fails, can load cache on second attempt). +* Perform metadata corrections (based on updates and removals specified at the start) +* Add Kerchunk history global attributes (creation time, pipeline version etc.) +* Reconfigure each chunk for remote access (replace local path with https:// download path) Computation will either refer to outright data conversion to a new format, or referencing using one of the Kerchunk drivers to create a reference file. @@ -46,11 +78,19 @@ Computation can be executed in serial for a group with the following: :members: :show-inheritance: -================= -Validation Module -================= +Validate +-------- + +Cloud products must be validated against equivalent Xarray objects from CF Aggregations (CFA) where possible, or otherwise using the original NetCDF as separate Xarray Datasets. + +* Ensure all variables present in original files are present in the cloud products (barring exceptions where metadata has been altered/corrected) +* Ensure array shapes are consistent across the products. +* Ensure data representations are consistent (values in array subsets) -Finally, it is advised to run the validator for all projects in a group to determine any issues +The validation step produced a two-sectioned report that outlines validation warnings and errors with the data or metadata +around the project. See the documentation on the validation report for more details. + +It is advised to run the validator for all projects in a group to determine any issues with the conversion process. Some file types or specific arrangements may produce unwanted effects that result in differences between the original and new representations. This can be identified with the validator which checks the Xarray representations and identifies differences in both data and metadata. @@ -70,3 +110,9 @@ validator which checks the Xarray representations and identifies differences in .. automodule:: padocc.phases.validate :members: + +Next Steps +---------- + +Cloud products that have been validated are moved to a ``complete`` directory with the project code as the name, plus the revision identifier `abX.X` - learn more about this in the deep dive section. +These can then be linked to a catalog or ingested into the CEDA archive where appropriate. diff --git a/docs/source/projects.rst b/docs/source/projects.rst new file mode 100644 index 0000000..6edf7cd --- /dev/null +++ b/docs/source/projects.rst @@ -0,0 +1,61 @@ +Projects in PADOCC +================== + +To differentiate syntax of datasets/datafiles with other packages that have varying definitions of those terms, +PADOCC uses the term ``Project`` to refer to a set of files to be aggregated into a single 'Cloud Product'. + +The ``ProjectOperation`` class within PADOCC allows us to access all information about a specific dataset, including +fetching data from files within the pipeline directory. This class also inherits from several Mixin classes which +act as containers for specific behaviours for easier organisation and future debugging. + +Directory Mixin +--------------- + +The directory mixin class contains all behaviours relating to creating directories within a project (or group) in PADOCC. +This includes the inherited ability for any project to create its parent working directory and group directory if needed, as well +as a subdirectory for cached data files. The switch values ``forceful`` and ``dryrun`` are also closely tied to this +container class, as the creation of new directories may be bypassed/forced if they exist already, or bypassed completely in a dry run. + +Evaluations Mixin +----------------- + +Previously, all evaluations were handled by an assessor module (pre 1.3), but this has now been reorganised +into a mixin class for the projects themselves, meaning any project instance has the capacity for self-evaluation. The routines +grouped into this container class relate to the self analysis of details and parameters of the project and various +files: + - get last run: Determine the parameters used in the most recent operation for a project. + - get last status: Get the status of the most recent (completed) operation. + - get log contents: Examine the log contents for a specific project. + +This list will be expanded in the full release version 1.3 to include many more useful evaluators including +statistics that can be averaged across a group. + +Properties Mixin +---------------- + +A collection of dynamic properties about a specific project. The Properties Mixin class abstracts any +complications or calculations with retrieving specific parameters; some may come from multiple files, are worked out on-the-fly +or may be based on an external request. Properties currently included are: + - Outpath: The output path to a 'product', which could be a zarr store, kerchunk file etc. + - Outproduct: The name of the output product which includes the cloud format and version number. + - Revision/Version: Abstracts the construction of revision and version numbers for the project. + - Cloud Format: Kerchunk/Zarr etc. - value stored in the base config file and can be set manually for further processing. + - File Type: Extension applied to the output product, can be one of 'json' or 'parquet' for Kerchunk products. + - Source Format: Format(s) detected during scan - retrieved from the detail config file after scanning. + +The properties mixin also enables a manual adjustment of some properties, like cloud format or file type, but also enables +minor and major version increments. This will later be wrapped into an ``Updater`` module to enable easier updates to +Cloud Product data/metadata. + +The Project Operator class +-------------------------- + +The 'core' behaviour of all classes is contained in the ``ProjectOperation`` class. +This class has public UI methods like ``info`` and ``help`` that give general information about a project, +and list some of the other public methods available respectively. + +Key Functions: + - Acts as an access point to all information and data about a project (dataset). + - Can adjust values within key files (abstracted) by setting specific parameters of the project instance and then using ``save_files``. + - Enables quick stats gathering for use with group statistics calculations. + - Can run any process on a project from the Project Operator. \ No newline at end of file diff --git a/docs/source/start.rst b/docs/source/start.rst index b801f3e..bcff1bf 100644 --- a/docs/source/start.rst +++ b/docs/source/start.rst @@ -13,6 +13,10 @@ If you need to clone the repository, either simply clone the main branch of the git clone git@github.com:cedadev/padocc.git +.. note:: + + The instructions below are specific to version 1.3 and later. To obtain documentation for pre-1.3, please contact `daniel.westwood@stfc.ac.uk `_. + Step 1: Set up Virtual Environment ---------------------------------- @@ -22,7 +26,7 @@ Step 1 is to create a virtual environment and install the necessary packages wit python -m venv name_of_venv; source name_of_venv/bin/activate; - pip install -r requirements.txt; + pip install ./; Step 2: Environment configuration @@ -32,11 +36,10 @@ Create a config file to set necessary environment variables. (Suggested to place .. code-block:: console export WORKDIR = /path/to/kerchunk-pipeline - export SRCDIR = /gws/nopw/j04/cedaproc/kerchunk_builder/kerchunk-builder - export KVENV = $SRCDIR/kvenv + export KVENV = /path/to/virtual/environment/venv -Now you should be set up to run the pipeline properly. For any of the pipeline scripts, running ```python