Skip to content

Commit

Permalink
Merge pull request #5 from cedadev/streamline
Browse files Browse the repository at this point in the history
Streamlined and improved pipeline
  • Loading branch information
dwest77a authored Jan 17, 2024
2 parents 9c0f525 + 3899181 commit b76eb7d
Show file tree
Hide file tree
Showing 5 changed files with 460 additions and 339 deletions.
4 changes: 4 additions & 0 deletions group_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ def main(args):
sb += ' -v'
if args.bypass:
sb += ' -b'
if args.quality:
sb += ' -Q'

if args.repeat_id:
sb += f' -r {args.repeat_id}'
Expand Down Expand Up @@ -160,6 +162,8 @@ def main(args):
parser.add_argument('-v','--verbose',dest='verbose' , action='count', default=0, help='Print helpful statements while running')
parser.add_argument('-d','--dryrun', dest='dryrun', action='store_true', help='Perform dry-run (i.e no new files/dirs created)' )

parser.add_argument('-Q','--quality', dest='quality', action='store_true', help='Quality assured checks - thorough run')

args = parser.parse_args()

main(args)
Expand Down
57 changes: 34 additions & 23 deletions pipeline/compute/serial_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ def init_logger(verbose, mode, name):
return logger

class Converter:
def __init__(self, logger):
def __init__(self, logger, bypass_errs=False):
self.logger = logger
self.success = True
self.bypass_errs = bypass_errs

def convert_to_zarr(self, nfile, ctype, **kwargs):
try:
Expand All @@ -53,8 +54,10 @@ def convert_to_zarr(self, nfile, ctype, **kwargs):
self.logger.debug(f'Extension {ctype} not valid')
return None
except Exception as err:
self.logger.debug(f'Dataset {nfile} failed using {ctype} driver - {err}')
return None
if self.bypass_errs:
pass
else:
raise err

def hdf5_to_zarr(self, nfile, **kwargs):
"""Converter for HDF5 type files"""
Expand All @@ -76,12 +79,12 @@ class Indexer(Converter):
def __init__(self,
proj_code,
cfg_file=None, detail_file=None, workdir=WORKDIR,
issave_meta=False, refresh='', forceful=False,
issave_meta=False, thorough=False, forceful=False,
verb=0, mode=None, version_no=1,
concat_msg=CONCAT_MSG):
concat_msg=CONCAT_MSG, bypass=False):
"""Initialise indexer for this dataset, set all variables and prepare for computation"""
logger = init_logger(verb, mode, 'compute-serial')
super().__init__(logger)
super().__init__(logger, bypass_errs=bypass)

self.logger.debug('Starting variable definitions')

Expand Down Expand Up @@ -128,16 +131,19 @@ def __init__(self,
self.filelist = f'{self.proj_dir}/allfiles.txt'

self.cache = f'{self.proj_dir}/cache/'
if refresh == '' and os.path.isfile(f'{self.cache}/temp_zattrs.json'):
if os.path.isfile(f'{self.cache}/temp_zattrs.json') and not thorough:
# Load data instead of create from scratch
self.load_refs = True
self.logger.debug('Found cached data from previous run, loading cache')


if not os.path.isdir(self.cache):
os.makedirs(self.cache)
os.makedirs(self.cache)
if thorough:
os.system(f'rm -rf {self.cache}/*')

self.combine_kwargs = {}
self.create_kwargs = {}
self.combine_kwargs = {'concat_dims':'time', 'coo_map':{'time':'cf:time'}}
self.create_kwargs = {'inline_threshold':1000}
self.pre_kwargs = {}

self.set_filelist()
Expand Down Expand Up @@ -228,16 +234,17 @@ def data_to_json(self, refs, zattrs):
self.logger.debug('Starting JSON-write process')

# Already have default options saved to class variables
mzz = MultiZarrToZarr(refs, concat_dims=['time'], **self.combine_kwargs).translate()
# Override global attributes

# Needs but must be fixed
if zattrs:
zattrs = self.add_kerchunk_history(zattrs)
if len(refs) > 1:
mzz = MultiZarrToZarr(refs, **self.combine_kwargs).translate()
if zattrs:
zattrs = self.add_kerchunk_history(zattrs)
else:
self.logger.debug(zattrs)
raise ValueError
mzz['refs']['.zattrs'] = json.dumps(zattrs)
else:
self.logger.debug(zattrs)
raise ValueError
mzz['refs']['.zattrs'] = json.dumps(zattrs)
mzz = refs[0]
# Override global attributes
mzz['refs'] = self.add_download_link(mzz['refs'])

with open(self.outfile,'w') as f:
Expand Down Expand Up @@ -363,13 +370,14 @@ def save_cache(self, refs, zattrs):
def try_all_drivers(self, nfile, **kwargs):
"""Safe creation allows for known issues and tries multiple drivers"""

extension = False

if '.' in nfile:
ctype = f'.{nfile.split(".")[-1]}'
else:
ctype = '.nc'

supported_extensions = ['ncf3','hdf5','tif']
ctype=''

self.logger.debug(f'Attempting conversion for 1 {ctype} extension')

Expand All @@ -382,12 +390,15 @@ def try_all_drivers(self, nfile, **kwargs):
if extension != ctype:
tdict = self.convert_to_zarr(nfile, extension, **kwargs)
ext_index += 1

if not tdict:
self.logger.error('Scanning failed for all drivers, file type is not Kerchunkable')
raise KerchunkDriverFatalError
else:
self.logger.info(f'Scan successful with {ctype} driver')
if extension:
self.logger.debug(f'Scan successful with {extension} driver')
else:
self.logger.debug(f'Scan successful with {ctype} driver')
return tdict

def convert_to_kerchunk(self):
Expand Down Expand Up @@ -431,7 +442,7 @@ def create_refs(self):
self.logger.info('Single conversions complete, starting concatenation')
self.combine_and_save(refs, zattrs)
if self.issave_meta:
self.save_meta(zattrs)
self.save_cache(refs, zattrs)
else:
self.logger.info('Issue with conversion unspecified - aborting process')
self.save_cache(refs, zattrs)
Expand Down
72 changes: 52 additions & 20 deletions pipeline/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@
logging.DEBUG
]

class FilecapExceededError(Exception):
def __init__(self, nfiles=0, verbose=0):
self.message = f'Filecap exceeded: {nfiles} files attempted'
super().__init__(self.message)
if verbose < 1:
self.__class__.__module__ = 'builtins'

class ExpectTimeoutError(Exception):
def __init__(self, required=0, current='', verbose=0):
self.message = f'Scan requires minimum {required} - current {current}'
super().__init__(self.message)
if verbose < 1:
self.__class__.__module__ = 'builtins'

def init_logger(verbose, mode, name):
"""Logger object init and configure with formatting"""
verbose = min(verbose, len(levels)-1)
Expand Down Expand Up @@ -66,33 +80,36 @@ def map_to_kerchunk(args, nfile, ctype, logger):
logger.info(f'Running Kerchunk reader for {nfile}')
from pipeline.compute.serial_process import Converter

quickConvert = Converter(logger, args)
quickConvert = Converter(logger, bypass_errs=args.bypass)

kwargs = {}
supported_extensions = ['ncf3','hdf5','tif']

logger.debug(f'Attempting conversion for 1 {ctype} extension')

t1 = datetime.now()
tdict = quickConvert.convert_to_zarr(nfile, ctype, **kwargs)
t_len = (datetime.now()-t1).total_seconds()
ext_index = 0
while not tdict and ext_index < len(supported_extensions)-1:
# Try the other ones
extension = supported_extensions[ext_index]
logger.debug(f'Attempting conversion for {extension} extension')
if extension != ctype:
t1 = datetime.now()
tdict = quickConvert.convert_to_zarr(nfile, extension, **kwargs)
t_len = (datetime.now()-t1).total_seconds()
ext_index += 1

if not tdict:
logger.error('Scanning failed for all drivers, file type is not Kerchunkable')
return None, None
return None, None, None
else:
logger.info(f'Scan successful with {ctype} driver')
return tdict['refs'], ctype
return tdict['refs'], ctype, t_len

def get_internals(args, testfile, ctype, logger):
"""Map to kerchunk data and perform calculations on test netcdf file."""
refs,ctype = map_to_kerchunk(args, testfile, ctype, logger)
refs, ctype, time = map_to_kerchunk(args, testfile, ctype, logger)
if not refs:
return None, None, None
logger.info(f'Starting summation process for {testfile}')
Expand All @@ -109,7 +126,7 @@ def get_internals(args, testfile, ctype, logger):
vars[chunkkey.split('/')[0]] = 1
except ValueError:
pass
return np.sum(sizes), chunks, sorted(list(vars.keys())), ctype
return np.sum(sizes), chunks, sorted(list(vars.keys())), ctype, time

def eval_sizes(files):
"""Get a list of file sizes on disk from a list of filepaths"""
Expand All @@ -122,7 +139,14 @@ def get_seconds(time_allowed):
mins, secs = time_allowed.split(':')
return int(secs) + 60*int(mins)

def perform_safe_calculations(std_vars, cpf, volms, files, logger):
def format_seconds(seconds):
"""Convert time in seconds to MM:SS"""
mins = int(seconds/60) + 1
if mins < 10:
mins = f'0{mins}'
return f'{mins}:00'

def perform_safe_calculations(std_vars, cpf, volms, files, times, logger):
kchunk_const = 167 # Bytes per Kerchunk ref (standard/typical)
if std_vars:
num_vars = len(std_vars)
Expand Down Expand Up @@ -165,16 +189,20 @@ def perform_safe_calculations(std_vars, cpf, volms, files, logger):
else:
addition = None

if files and len(times) > 0:
estm_time = int(np.mean(times)*len(files))
else:
estm_time = 0

return avg_cpf, num_vars, avg_chunk, spatial_res, data_represented, num_files, total_chunks, addition
return avg_cpf, num_vars, avg_chunk, spatial_res, data_represented, num_files, total_chunks, addition, estm_time

def scan_dataset(args, files, proj_dir, proj_code, logger):
"""Main process handler for scanning phase"""
logger.debug(f'Assessment for {proj_code}')

# Set up conditions, skip for small file count < 5
escape, is_varwarn, is_skipwarn = False, False, False
cpf, volms = [],[]
cpf, volms, times = [],[],[]
trial_files = 5
if len(files) < 5:
details = {'skipped':True}
Expand All @@ -185,11 +213,14 @@ def scan_dataset(args, files, proj_dir, proj_code, logger):
f.write(json.dumps(details))
logger.info(f'Skipped scanning - {proj_code}/detail-cfg.json blank file created')
return None
else:
logger.info(f'Identified {len(files)} files for scanning')

# Perform scans for sample (max 5) files
count = 0
std_vars = None
ctypes = []
filecap = min(100,len(files))
while not escape and len(cpf) < trial_files:
logger.info(f'Attempting scan for file {count+1} (min 5, max 100)')
# Add random file selector here
Expand All @@ -202,39 +233,39 @@ def scan_dataset(args, files, proj_dir, proj_code, logger):

try:
# Measure time and ensure job will not overrun if it can be prevented.
t1 = datetime.now()
volume, chunks_per_file, vars, ctype = get_internals(args, scanfile, extension, logger)
t2 = (datetime.now() - t1).total_seconds()
if count == 0 and t2 > get_seconds(args.time_allowed)/trial_files:
logger.error(f'Time estimate exceeds allowed time for job - {t2}')
escape = True
volume, chunks_per_file, vars, ctype, time = get_internals(args, scanfile, extension, logger)
if count == 0 and time > get_seconds(args.time_allowed)/trial_files:
raise ExpectTimeoutError(required=format_seconds(time*5), current=args.time_allowed)

cpf.append(chunks_per_file)
volms.append(volume)
ctypes.append(ctype)
times.append(time)

if not std_vars:
std_vars = vars
if vars != std_vars:
logger.warning('Variables differ between files')
is_varwarn = True
logger.info(f'Data saved for file {count+1}')
except ExpectTimeoutError as err:
raise err
except Exception as e:
if args.bypass:
logger.warning(f'Skipped file {count} - {e}')
is_skipwarn = True
else:
raise e
if count >= 100:
escape = True
count += 1
if count > 100:
logger.error('Filecount Exceeded: No valid files in first 100 tried')
if count >= filecap:
escape = True
if escape:
raise FilecapExceededError(filecap)

logger.info('Scan complete, compiling outputs')
(avg_cpf, num_vars, avg_chunk,
spatial_res, data_represented, num_files,
total_chunks, addition) = perform_safe_calculations(std_vars, cpf, volms, files, logger)
total_chunks, addition, estm_time) = perform_safe_calculations(std_vars, cpf, volms, files, times, logger)

details = {
'data_represented' : format_float(data_represented, logger),
Expand All @@ -243,6 +274,7 @@ def scan_dataset(args, files, proj_dir, proj_code, logger):
'total_chunks' : safe_format(total_chunks,'{value:.2f}'),
'estm_chunksize' : format_float(avg_chunk,logger),
'estm_spatial_res' : safe_format(spatial_res,'{value:.2f}') + ' deg',
'estm_time' : format_seconds(estm_time),
'variable_count' : num_vars,
'addition' : safe_format(addition,'{value:.3f}') + ' %',
'var_err' : is_varwarn,
Expand Down
Loading

0 comments on commit b76eb7d

Please sign in to comment.