From 48e1d32745119982cc8d1c05e1f052c7185920db Mon Sep 17 00:00:00 2001 From: dwest77 Date: Tue, 7 Jan 2025 16:42:19 +0000 Subject: [PATCH] Various revisions --- padocc/core/filehandlers.py | 4 ++-- padocc/core/project.py | 2 +- padocc/operations/group.py | 18 +++++++++++++----- padocc/operations/mixins.py | 13 ++++++++----- padocc/phases/__init__.py | 2 +- padocc/phases/compute.py | 21 +++++++++++++++++---- padocc/phases/scan.py | 22 +++++++++++++++++++--- 7 files changed, 61 insertions(+), 21 deletions(-) diff --git a/padocc/core/filehandlers.py b/padocc/core/filehandlers.py index f85a227..0282232 100644 --- a/padocc/core/filehandlers.py +++ b/padocc/core/filehandlers.py @@ -287,7 +287,7 @@ def _set_value_in_file(self) -> None: in the file. """ if self._dryrun or self._value == []: - self.logger.debug("Skipped setting value in file") + self.logger.debug(f"Skipped setting value in {self.file}") return if not self.file_exists(): @@ -433,7 +433,7 @@ def _set_value_in_file(self) -> None: in the file. """ if self._dryrun or self._value == {}: - self.logger.debug("Skipped setting value in file") + self.logger.debug(f"Skipped setting value in {self.file}") return self._apply_conf() diff --git a/padocc/core/project.py b/padocc/core/project.py index cef36f1..a64640b 100644 --- a/padocc/core/project.py +++ b/padocc/core/project.py @@ -321,7 +321,7 @@ def _configure_filelist(self): if 'latest' in pattern: pattern = pattern.replace('latest', os.readlink(pattern)) - self.allfiles.set(sorted(glob.glob(pattern))) + self.allfiles.set(sorted(glob.glob(pattern, recursive=True))) def _setup_config( self, diff --git a/padocc/operations/group.py b/padocc/operations/group.py index 9d97cd0..e478aa1 100644 --- a/padocc/operations/group.py +++ b/padocc/operations/group.py @@ -13,7 +13,7 @@ ScanOperation, KerchunkDS, ZarrDS, - cfa_handler, + CfaDS, KNOWN_PHASES, ValidateOperation, ) @@ -25,7 +25,7 @@ COMPUTE = { 'kerchunk':KerchunkDS, 'zarr':ZarrDS, - 'cfa': cfa_handler, + 'cfa': CfaDS, } class GroupOperation( @@ -126,7 +126,15 @@ def __str__(self): def __repr__(self): return str(self) + + def __getitem__(self, index: int) -> ProjectOperation: + """ + Indexable group allows access to individual projects + """ + proj_code = self.proj_codes['main'][index] + return self.get_project(proj_code) + @property def proj_codes_dir(self): return f'{self.groupdir}/proj_codes' @@ -272,7 +280,7 @@ def run( repeat_id: str = 'main', proj_code: Optional[str] = None, subset: Optional[str] = None, - bypass: Union[BypassSwitch,None] = None, + bypass: Union[BypassSwitch, None] = None, **kwargs ) -> dict[str]: @@ -397,11 +405,11 @@ def _compute_config( self.workdir, groupID=self.groupID, logger=self.logger, - bypass=bypass + bypass=bypass, **kwargs, ) - mode = proj_op.cloud_format + mode = mode or proj_op.cloud_format if mode is None: mode = 'kerchunk' diff --git a/padocc/operations/mixins.py b/padocc/operations/mixins.py index cbc0475..76f6a80 100644 --- a/padocc/operations/mixins.py +++ b/padocc/operations/mixins.py @@ -114,8 +114,8 @@ def _init_group(self, datasets : list, substitutions: dict = None): # Group config is the contents of datasets.csv if substitutions: datasets, status = apply_substitutions('init_file',subs=substitutions, content=datasets) - if status: - self.logger.warning(status) + if status: + self.logger.warning(status) self.datasets.set(datasets) @@ -131,16 +131,19 @@ def _open_json(file): cfg_values = {} ds_values = datasets[index].split(',') - proj_code = ds_values[0] - pattern = ds_values[1] + proj_code = ds_values[0].replace(' ','') + pattern = ds_values[1].replace(' ','') if pattern.endswith('.txt') and substitutions: pattern, status = apply_substitutions('dataset_file', subs=substitutions, content=[pattern]) pattern = pattern[0] if status: self.logger.warning(status) - else: + elif pattern.endswith('.csv'): pattern = os.path.abspath(pattern) + else: + # Dont expand pattern if its not a csv + pass if substitutions: cfg_values['substitutions'] = substitutions diff --git a/padocc/phases/__init__.py b/padocc/phases/__init__.py index 6fa0a99..fee7b8e 100644 --- a/padocc/phases/__init__.py +++ b/padocc/phases/__init__.py @@ -3,7 +3,7 @@ __copyright__ = "Copyright 2024 United Kingdom Research and Innovation" from .scan import ScanOperation -from .compute import KerchunkDS, ZarrDS, ComputeOperation, cfa_handler +from .compute import KerchunkDS, ZarrDS, ComputeOperation, cfa_handler, CfaDS from .ingest import IngestOperation from .validate import ValidateOperation diff --git a/padocc/phases/compute.py b/padocc/phases/compute.py index b02a003..3e9b33f 100644 --- a/padocc/phases/compute.py +++ b/padocc/phases/compute.py @@ -287,7 +287,8 @@ def __init__( self.partial = (limiter and num_files != limiter) - self._determine_version() + # Perform this later + #self._determine_version() self.limiter = limiter if not self.limiter: @@ -343,7 +344,7 @@ def _run(self, mode: str = 'kerchunk'): self.logger.error('Nothing to do with this class - use KerchunkDS/ZarrDS instead!') raise ComputeError - def _run_with_timings(self, func): + def _run_with_timings(self, func) -> str: """ Configure all required steps for Kerchunk processing. - Check if output files already exist. @@ -668,7 +669,7 @@ def __init__( def _run( self, - **kwargs) -> None: + **kwargs) -> str: """ ``_run`` hook method called from the ``ProjectOperation.run`` which this subclass inherits. The kwargs capture the ``mode`` @@ -943,7 +944,7 @@ def __init__( self.filelist = [] self.mem_allowed = mem_allowed - def _run(self, **kwargs) -> None: + def _run(self, **kwargs) -> str: """ Recommended way of running an operation - includes timers etc. """ @@ -1053,6 +1054,18 @@ def _get_rechunk_scheme(self): return concat_dim_rechunk, dim_sizes, cpf/self.limiter, volume/self.limiter +class CfaDS(ComputeOperation): + + def _run(self, **kwargs) -> str: + """ + Integration of CFA Converter to + Padocc Operation class.""" + if cfa_handler(self): + return 'Success' + return 'Fatal' + + # Deal with setting proper values here in specific files. + if __name__ == '__main__': print('Serial Processor for Kerchunk Pipeline - run with single_run.py') \ No newline at end of file diff --git a/padocc/phases/scan.py b/padocc/phases/scan.py index 6d7b8a3..fd590c0 100644 --- a/padocc/phases/scan.py +++ b/padocc/phases/scan.py @@ -8,11 +8,13 @@ import re import logging +from typing import Union + from padocc.core import FalseLogger from padocc.core.errors import ConcatFatalError from padocc.core import ProjectOperation from padocc.core.utils import BypassSwitch -from .compute import KerchunkDS +from .compute import KerchunkDS, cfa_handler from padocc.core.filehandlers import JSONFileHandler @@ -170,6 +172,8 @@ def _run(self, mode: str = 'kerchunk') -> None: self._scan_zarr(limiter=limiter) elif mode == 'kerchunk': self._scan_kerchunk(limiter=limiter) + elif mode == 'cfa': + self._scan_cfa(limiter=limiter) else: self.update_status('scan','ValueError',jobid=self._logid) raise ValueError( @@ -179,7 +183,7 @@ def _run(self, mode: str = 'kerchunk') -> None: self.update_status('scan','Success',jobid=self._logid) return 'Success' - def _scan_kerchunk(self, limiter: int = None): + def _scan_kerchunk(self, limiter: Union[int,None] = None): """ Function to perform scanning with output Kerchunk format. """ @@ -247,7 +251,19 @@ def _scan_kerchunk(self, limiter: int = None): ctypes, escape=escape, scanned_with='kerchunk' ) - def _scan_zarr(self, limiter=None): + def _scan_cfa(self, limiter: Union[int,None] = None): + """ + Function to perform scanning with output CFA format. + """ + self.logger.info('Starting scan process for CFA cloud format') + + # Redo this processor call. + results = cfa_handler(self, file_limit=limiter) + + # Record results here + print(results) + + def _scan_zarr(self, limiter: Union[int,None] = None): """ Function to perform scanning with output Zarr format. """