Skip to content

Commit

Permalink
Various revisions
Browse files Browse the repository at this point in the history
  • Loading branch information
dwest77a committed Jan 7, 2025
1 parent 99b059e commit 48e1d32
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 21 deletions.
4 changes: 2 additions & 2 deletions padocc/core/filehandlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def _set_value_in_file(self) -> None:
in the file.
"""
if self._dryrun or self._value == []:
self.logger.debug("Skipped setting value in file")
self.logger.debug(f"Skipped setting value in {self.file}")
return

if not self.file_exists():
Expand Down Expand Up @@ -433,7 +433,7 @@ def _set_value_in_file(self) -> None:
in the file.
"""
if self._dryrun or self._value == {}:
self.logger.debug("Skipped setting value in file")
self.logger.debug(f"Skipped setting value in {self.file}")
return

self._apply_conf()
Expand Down
2 changes: 1 addition & 1 deletion padocc/core/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ def _configure_filelist(self):
if 'latest' in pattern:
pattern = pattern.replace('latest', os.readlink(pattern))

self.allfiles.set(sorted(glob.glob(pattern)))
self.allfiles.set(sorted(glob.glob(pattern, recursive=True)))

def _setup_config(
self,
Expand Down
18 changes: 13 additions & 5 deletions padocc/operations/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
ScanOperation,
KerchunkDS,
ZarrDS,
cfa_handler,
CfaDS,
KNOWN_PHASES,
ValidateOperation,
)
Expand All @@ -25,7 +25,7 @@
COMPUTE = {
'kerchunk':KerchunkDS,
'zarr':ZarrDS,
'cfa': cfa_handler,
'cfa': CfaDS,
}

class GroupOperation(
Expand Down Expand Up @@ -126,7 +126,15 @@ def __str__(self):

def __repr__(self):
return str(self)

def __getitem__(self, index: int) -> ProjectOperation:
"""
Indexable group allows access to individual projects
"""

proj_code = self.proj_codes['main'][index]
return self.get_project(proj_code)

@property
def proj_codes_dir(self):
return f'{self.groupdir}/proj_codes'
Expand Down Expand Up @@ -272,7 +280,7 @@ def run(
repeat_id: str = 'main',
proj_code: Optional[str] = None,
subset: Optional[str] = None,
bypass: Union[BypassSwitch,None] = None,
bypass: Union[BypassSwitch, None] = None,
**kwargs
) -> dict[str]:

Expand Down Expand Up @@ -397,11 +405,11 @@ def _compute_config(
self.workdir,
groupID=self.groupID,
logger=self.logger,
bypass=bypass
bypass=bypass,
**kwargs,
)

mode = proj_op.cloud_format
mode = mode or proj_op.cloud_format
if mode is None:
mode = 'kerchunk'

Expand Down
13 changes: 8 additions & 5 deletions padocc/operations/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ def _init_group(self, datasets : list, substitutions: dict = None):
# Group config is the contents of datasets.csv
if substitutions:
datasets, status = apply_substitutions('init_file',subs=substitutions, content=datasets)
if status:
self.logger.warning(status)
if status:
self.logger.warning(status)

self.datasets.set(datasets)

Expand All @@ -131,16 +131,19 @@ def _open_json(file):
cfg_values = {}
ds_values = datasets[index].split(',')

proj_code = ds_values[0]
pattern = ds_values[1]
proj_code = ds_values[0].replace(' ','')
pattern = ds_values[1].replace(' ','')

if pattern.endswith('.txt') and substitutions:
pattern, status = apply_substitutions('dataset_file', subs=substitutions, content=[pattern])
pattern = pattern[0]
if status:
self.logger.warning(status)
else:
elif pattern.endswith('.csv'):
pattern = os.path.abspath(pattern)
else:
# Dont expand pattern if its not a csv
pass

if substitutions:
cfg_values['substitutions'] = substitutions
Expand Down
2 changes: 1 addition & 1 deletion padocc/phases/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
__copyright__ = "Copyright 2024 United Kingdom Research and Innovation"

from .scan import ScanOperation
from .compute import KerchunkDS, ZarrDS, ComputeOperation, cfa_handler
from .compute import KerchunkDS, ZarrDS, ComputeOperation, cfa_handler, CfaDS
from .ingest import IngestOperation
from .validate import ValidateOperation

Expand Down
21 changes: 17 additions & 4 deletions padocc/phases/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@ def __init__(

self.partial = (limiter and num_files != limiter)

self._determine_version()
# Perform this later
#self._determine_version()

self.limiter = limiter
if not self.limiter:
Expand Down Expand Up @@ -343,7 +344,7 @@ def _run(self, mode: str = 'kerchunk'):
self.logger.error('Nothing to do with this class - use KerchunkDS/ZarrDS instead!')
raise ComputeError

def _run_with_timings(self, func):
def _run_with_timings(self, func) -> str:
"""
Configure all required steps for Kerchunk processing.
- Check if output files already exist.
Expand Down Expand Up @@ -668,7 +669,7 @@ def __init__(

def _run(
self,
**kwargs) -> None:
**kwargs) -> str:
"""
``_run`` hook method called from the ``ProjectOperation.run``
which this subclass inherits. The kwargs capture the ``mode``
Expand Down Expand Up @@ -943,7 +944,7 @@ def __init__(
self.filelist = []
self.mem_allowed = mem_allowed

def _run(self, **kwargs) -> None:
def _run(self, **kwargs) -> str:
"""
Recommended way of running an operation - includes timers etc.
"""
Expand Down Expand Up @@ -1053,6 +1054,18 @@ def _get_rechunk_scheme(self):

return concat_dim_rechunk, dim_sizes, cpf/self.limiter, volume/self.limiter

class CfaDS(ComputeOperation):

def _run(self, **kwargs) -> str:
"""
Integration of CFA Converter to
Padocc Operation class."""
if cfa_handler(self):
return 'Success'
return 'Fatal'

# Deal with setting proper values here in specific files.

if __name__ == '__main__':
print('Serial Processor for Kerchunk Pipeline - run with single_run.py')

22 changes: 19 additions & 3 deletions padocc/phases/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
import re
import logging

from typing import Union

from padocc.core import FalseLogger
from padocc.core.errors import ConcatFatalError
from padocc.core import ProjectOperation
from padocc.core.utils import BypassSwitch
from .compute import KerchunkDS
from .compute import KerchunkDS, cfa_handler

from padocc.core.filehandlers import JSONFileHandler

Expand Down Expand Up @@ -170,6 +172,8 @@ def _run(self, mode: str = 'kerchunk') -> None:
self._scan_zarr(limiter=limiter)
elif mode == 'kerchunk':
self._scan_kerchunk(limiter=limiter)
elif mode == 'cfa':
self._scan_cfa(limiter=limiter)
else:
self.update_status('scan','ValueError',jobid=self._logid)
raise ValueError(
Expand All @@ -179,7 +183,7 @@ def _run(self, mode: str = 'kerchunk') -> None:
self.update_status('scan','Success',jobid=self._logid)
return 'Success'

def _scan_kerchunk(self, limiter: int = None):
def _scan_kerchunk(self, limiter: Union[int,None] = None):
"""
Function to perform scanning with output Kerchunk format.
"""
Expand Down Expand Up @@ -247,7 +251,19 @@ def _scan_kerchunk(self, limiter: int = None):
ctypes, escape=escape, scanned_with='kerchunk'
)

def _scan_zarr(self, limiter=None):
def _scan_cfa(self, limiter: Union[int,None] = None):
"""
Function to perform scanning with output CFA format.
"""
self.logger.info('Starting scan process for CFA cloud format')

# Redo this processor call.
results = cfa_handler(self, file_limit=limiter)

# Record results here
print(results)

def _scan_zarr(self, limiter: Union[int,None] = None):
"""
Function to perform scanning with output Zarr format.
"""
Expand Down

0 comments on commit 48e1d32

Please sign in to comment.