Skip to content

Commit

Permalink
Merge pull request #40 from cedadev/valid_0412
Browse files Browse the repository at this point in the history
Valid 0412
  • Loading branch information
dwest77a authored Dec 4, 2024
2 parents af48b71 + c91d609 commit 9673689
Show file tree
Hide file tree
Showing 47 changed files with 706 additions and 788 deletions.
11 changes: 11 additions & 0 deletions padocc/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
__author__ = "Daniel Westwood"
__contact__ = "daniel.westwood@stfc.ac.uk"
__copyright__ = "Copyright 2023 United Kingdom Research and Innovation"

## PADOCC CLI for entrypoint scripts

def main():
pass

if __name__ == '__main__':
main()
3 changes: 3 additions & 0 deletions padocc/core/filehandlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def __str__(self) -> str:
def __len__(self) -> int:
"""Length of value"""
content = self.get()
self.logger.debug(f'content length: {len(content)}')
return len(content)

def __iter__(self) -> Generator[str, None, None]:
Expand Down Expand Up @@ -238,11 +239,13 @@ def _get_content(self) -> None:
Open the file to get content if it exists
"""
if self.file_exists():
self.logger.debug('Opening existing file')
with open(self._file) as f:
content = [r.strip() for r in f.readlines()]
self._value = content

else:
self.logger.debug('Creating new file')
self.create_file()
self._value = []

Expand Down
14 changes: 14 additions & 0 deletions padocc/core/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,17 @@ def _rerun_command(self):
Setup for running this specific component interactively.
"""
return ''

class PropertiesMixin:

@property
def isparq(self) -> bool:
"""
Return True if the project is configured to use parquet.
"""

return (self.detail_cfg['type'] == 'parq')

@property
def cloud_format(self) -> bool:
return None
82 changes: 63 additions & 19 deletions padocc/core/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging

from .errors import error_handler
from .utils import extract_file, BypassSwitch, apply_substitutions
from .utils import extract_file, BypassSwitch, apply_substitutions, phases
from .logs import reset_file_handler

from .mixins import DirectoryMixin, EvaluationsMixin
Expand Down Expand Up @@ -101,6 +101,11 @@ def __init__(
fh=fh,
logid=logid,
verbose=verbose)

if not os.path.isdir(self.groupdir):
raise ValueError(
f'The group "{groupID}" has not been initialised - not present in the working directory'
)

self.proj_code = proj_code

Expand Down Expand Up @@ -149,14 +154,41 @@ def __init__(
self._outfile = None

def __str__(self):
return f'<PADOCC Project: {self.groupID}>'
return f'<PADOCC Project: {self.proj_code} ({self.groupID})>'

def __repr__(self):
return str(self)

def info(self, fn=print):
"""
Display some info about this particular project
"""
if self.groupID is not None:
fn(f'{self.proj_code} ({self.groupID}):')
else:
fn(f'{self.proj_code}:')
fn(f' > Phase: {self._get_phase()}')
fn(f' > Files: {len(self.allfiles)}')
fn(f' > Version: {self.get_version()}')

def help(self, fn=print):
"""
Public user functions for the project operator.
"""
fn(str(self))
fn(' > project.info() - Get some information about this project')
fn(' > project.get_version() - Get the version number for the output product')
fn(' > project.save_files() - Save all open files related to this project')
fn('Properties:')
fn(' > project.proj_code - code for this project.')
fn(' > project.groupID - group to which this project belongs.')
fn(' > project.dir - directory containing the projects files.')
fn(' > project.cfa_path - path to the CFA file.')
fn(' > project.outfile - path to the output product (Kerchunk/Zarr)')

def run(
self,
mode: str = None,
mode: str = 'kerchunk',
subset_bypass: bool = False,
forceful : bool = None,
thorough : bool = None,
Expand All @@ -181,12 +213,12 @@ def run(
self.save_files()
return status
except Exception as err:

return error_handler(
err, self.logger, self.phase,
jobid=self._logid, dryrun=self._dryrun,
subset_bypass=subset_bypass,
status_fh=self.status_log)
print(err)
#return error_handler(
#err, self.logger, self.phase,
#jobid=self._logid, dryrun=self._dryrun,
##subset_bypass=subset_bypass,
#status_fh=self.status_log)

def _run(self, **kwargs):
# Default project operation run.
Expand All @@ -209,6 +241,13 @@ def get_version(self):
"""
return self.detail_cfg['version_no'] or 1

@property
def dir(self):
if self.groupID:
return f'{self.workdir}/in_progress/{self.groupID}/{self.proj_code}'
else:
return f'{self.workdir}/in_progress/general/{self.proj_code}'

@property
def cfa_path(self):
return f'{self.dir}/{self.proj_code}.nca'
Expand All @@ -225,9 +264,6 @@ def outfile(self):
def outfile(self, value : str):
self._outfile = value

def __str__(self):
return self.proj_code

def dir_exists(self, checkdir : str = None):
if not checkdir:
checkdir = self.dir
Expand Down Expand Up @@ -258,6 +294,21 @@ def save_files(self):
self.allfiles.close()
self.status_log.close()

def _get_phase(self):
"""
Gets the highest phase this project has currently undertaken successfully"""

max_sid = 0
for row in self.status_log:
status = row[0]
if status != 'Success':
continue

phase = row[1]
sid = phases.index(phase)
max_sid = max(sid, max_sid)
return phases[max_sid]

def _configure_filelist(self):
pattern = self.base_cfg['pattern']

Expand Down Expand Up @@ -303,13 +354,6 @@ def _setup_config(
config['substitutions'] = substitutions
self.base_cfg.set(config)

@property
def dir(self):
if self.groupID:
return f'{self.workdir}/in_progress/{self.groupID}/{self.proj_code}'
else:
return f'{self.workdir}/in_progress/general/{self.proj_code}'

def _create_dirs(self, first_time : bool = None):
if not self.dir_exists():
if self._dryrun:
Expand Down
7 changes: 7 additions & 0 deletions padocc/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
'validate':'30:00' # From CMIP experiments - no reliable prediction mechanism possible
}

phases = [
'scan',
'compute',
'validate',
'catalog'
]

class BypassSwitch:
"""Class to represent all bypass switches throughout the pipeline.
Requires a switch string which is used to enable/disable specific pipeline
Expand Down
3 changes: 2 additions & 1 deletion padocc/operations/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ def _compute_config(
self,
proj_code,
mode=None,
subset_bypass=False,
**kwargs
) -> None:
"""
Expand Down Expand Up @@ -306,7 +307,7 @@ def _compute_config(
version_no=version,
**kwargs
)
status = proj_op.run()
status = proj_op.run(subset_bypass=subset_bypass)
proj_op.save_files()
return status

Expand Down
2 changes: 2 additions & 0 deletions padocc/operations/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ def _open_json(file):
pattern = pattern[0]
if status:
self.logger.warning(status)
else:
pattern = os.path.abspath(pattern)

if substitutions:
cfg_values['substitutions'] = substitutions
Expand Down
21 changes: 13 additions & 8 deletions padocc/phases/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def _convert_kerchunk(self, nfile: str, ctype, **kwargs) -> None:
def _hdf5_to_zarr(self, nfile: str, **kwargs) -> dict:
"""Wrapper for converting NetCDF4/HDF5 type files to Kerchunk"""
from kerchunk.hdf import SingleHdf5ToZarr
return SingleHdf5ToZarr(nfile, **kwargs).translate()
return SingleHdf5ToZarr(nfile,**kwargs).translate()

def _ncf3_to_zarr(self, nfile: str, **kwargs) -> dict:
"""Wrapper for converting NetCDF3 type files to Kerchunk"""
Expand Down Expand Up @@ -193,6 +193,7 @@ def __init__(
limiter : int = None,
skip_concat : bool = False,
new_version : bool = None,
label : str = 'compute',
**kwargs
) -> None:
"""
Expand Down Expand Up @@ -233,6 +234,7 @@ def __init__(
workdir,
groupID=groupID,
thorough=thorough,
label=label,
**kwargs)

self.logger.debug('Starting variable definitions')
Expand All @@ -243,7 +245,8 @@ def __init__(
self.skip_concat = skip_concat

self.stage = stage
self._identify_mode()
self.mode = self.detail_cfg['mode'] or 'kerchunk'
self.fmt = self.detail_cfg['type'] or 'JSON'

self.validate_time = None
self.concat_time = None
Expand Down Expand Up @@ -284,14 +287,20 @@ def __init__(
self.temp_zattrs.set({})

self.combine_kwargs = {} # Now using concat_dims and identical dims finders.
self.create_kwargs = {'inline_threshold':1}
self.create_kwargs = {'inline_threshold':0}
self.pre_kwargs = {}

self.special_attrs = {}
self.var_shapes = {}

self.logger.debug('Finished all setup steps')

def help(self, fn=print):
super().help(fn=fn)
fn('')
fn('Compute Options:')
fn(' > project.run() - Run compute for this project')

def _run(self, mode: str = 'kerchunk'):
"""
Default _run hook for compute operations. A user should aim to use the
Expand Down Expand Up @@ -597,20 +606,16 @@ def _determine_dim_specs(self, objs: list) -> None:
# Calculate Partial Validation Estimate here
t1 = datetime.now()
self.logger.info("Determining concatenation dimensions")
print()
self._find_concat_dims(objs)
if self.combine_kwargs['concat_dims'] == []:
self.logger.info("No concatenation dimensions available - virtual dimension will be constructed.")
else:
self.logger.info(f"Found {self.combine_kwargs['concat_dims']} concatenation dimensions.")
print()

# Identical (Variables) Dimensions
self.logger.info("Determining identical variables")
print()
self._find_identical_dims(objs)
self.logger.info(f"Found {self.combine_kwargs['identical_dims']} identical variables.")
print()

# This one only happens for two files so don't need to take a mean
self.validate_time = (datetime.now()-t1).total_seconds()
Expand Down Expand Up @@ -749,7 +754,7 @@ def _combine_and_save(self, refs: dict) -> None:
])

t1 = datetime.now()
if self.fmt == 'json':
if self.fmt == 'JSON':
self.logger.info('Concatenating to JSON format Kerchunk file')
self._data_to_json(refs)
else:
Expand Down
19 changes: 14 additions & 5 deletions padocc/phases/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def __init__(
proj_code : str,
workdir : str,
groupID : str = None,
label : str = None,
label : str = 'scan',
**kwargs,
) -> None:

Expand All @@ -139,7 +139,13 @@ def __init__(
label = 'scan-operation'

super().__init__(
proj_code, workdir, groupID=groupID, **kwargs)
proj_code, workdir, groupID=groupID, label=label,**kwargs)

def help(self, fn=print):
super().help(fn=fn)
fn('')
fn('Scan Options:')
fn(' > project.run() - Run a scan for this project')

def _run(self, mode: str = 'kerchunk') -> None:
"""Main process handler for scanning phase"""
Expand All @@ -150,22 +156,25 @@ def _run(self, mode: str = 'kerchunk') -> None:

if nfiles < 3:
self.detail_cfg = {'skipped':True}
self.logger.info('Skip scanning phase >> proceed directly to compute')
self.logger.info(f'Skip scanning phase (only found {nfiles} files) >> proceed directly to compute')
return None


# Create all files in mini-kerchunk set here. Then try an assessment.
limiter = min(100, max(2, int(nfiles/20)))

self.logger.info(f'Determined {limiter} files to scan (out of {nfiles})')
self.logger.debug(f'Using {mode} scan operations')

if mode == 'zarr':
self._scan_zarr(limiter=limiter)
elif mode == 'kerchunk':
self._scan_kerchunk(limiter=limiter)
else:
self.logger.error('Unrecognised mode - must be one of ["kerchunk","zarr","CFA"]')
return 'Failed'
self.update_status('scan','ValueError',jobid=self._logid, dryrun=self._dryrun)
raise ValueError(
f'Unrecognised mode: {mode} - must be one of ["kerchunk","zarr","CFA"]'
)

self.update_status('scan','Success',jobid=self._logid, dryrun=self._dryrun)
return 'Success'
Expand Down
13 changes: 0 additions & 13 deletions padocc/phases/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,6 @@
from ujson import JSONDecodeError
from dask.distributed import LocalCluster

class CloudValidator:
"""
Encapsulate all validation testing into a single class. Instantiate for a specific project,
the object could then contain all project info (from detail-cfg) opened only once. Also a
copy of the total datasets (from native and cloud sources). Subselections can be passed
between class methods along with a variable index (class variables: variable list, dimension list etc.)
Class logger attribute so this doesn't need to be passed between functions.
Bypass switch contained here with all switches.
"""
def __init__(self):
pass

## 1. Array Selection Tools

def find_dimensions(dimlen: int, divisions: int) -> int:
Expand Down
Loading

0 comments on commit 9673689

Please sign in to comment.