diff --git a/workflow/pygw b/workflow/pygw new file mode 120000 index 0000000000..dfa1d9a164 --- /dev/null +++ b/workflow/pygw @@ -0,0 +1 @@ +../ush/python/pygw/src/pygw \ No newline at end of file diff --git a/workflow/rocoto/workflow_tasks.py b/workflow/rocoto/workflow_tasks.py index 5ec1dbb39c..edb35af513 100644 --- a/workflow/rocoto/workflow_tasks.py +++ b/workflow/rocoto/workflow_tasks.py @@ -310,10 +310,6 @@ def aerosol_init(self): interval = self._base['INTERVAL'] offset = f'-{interval}' - # Previous cycle - dep_dict = {'type': 'cycleexist', 'offset': offset} - deps.append(rocoto.add_dependency(dep_dict)) - # Files from previous cycle files = [f'@Y@m@d.@H0000.fv_core.res.nc'] + \ [f'@Y@m@d.@H0000.fv_core.res.tile{tile}.nc' for tile in range(1, self.n_tiles + 1)] + \ @@ -326,8 +322,10 @@ def aerosol_init(self): dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + cycledef = 'gfs_seq' resources = self.get_resource('aerosol_init') - task = create_wf_task('aerosol_init', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + task = create_wf_task('aerosol_init', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies, + cycledef=cycledef) return task @@ -387,8 +385,6 @@ def analdiag(self): deps = [] dep_dict = {'type': 'task', 'name': f'{self.cdump}anal'} deps.append(rocoto.add_dependency(dep_dict)) - dep_dict = {'type': 'cycleexist', 'offset': '-06:00:00'} - deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) resources = self.get_resource('analdiag') @@ -446,8 +442,6 @@ def atmanalpost(self): deps = [] dep_dict = {'type': 'task', 'name': f'{self.cdump}atmanalrun'} deps.append(rocoto.add_dependency(dep_dict)) - dep_dict = {'type': 'cycleexist', 'offset': '-06:00:00'} - deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) resources = self.get_resource('atmanalpost') @@ -459,7 +453,6 @@ def aeroanlinit(self): suffix = self._base["SUFFIX"] dump_suffix = self._base["DUMP_SUFFIX"] - gfs_cyc = self._base["gfs_cyc"] dmpdir = self._base["DMPDIR"] deps = [] @@ -494,8 +487,6 @@ def aeroanlfinal(self): deps = [] dep_dict = {'type': 'task', 'name': f'{self.cdump}aeroanlrun'} deps.append(rocoto.add_dependency(dep_dict)) - dep_dict = {'type': 'cycleexist', 'offset': '-06:00:00'} - deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) resources = self.get_resource('aeroanlfinal') @@ -508,8 +499,6 @@ def gldas(self): deps = [] dep_dict = {'type': 'task', 'name': f'{self.cdump}sfcanl'} deps.append(rocoto.add_dependency(dep_dict)) - dep_dict = {'type': 'cycleexist', 'offset': '-06:00:00'} - deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) resources = self.get_resource('gldas') @@ -604,8 +593,11 @@ def _fcst_cycled(self): dependencies.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='or', dep=dependencies) + cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump + resources = self.get_resource('fcst') - task = create_wf_task('fcst', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + task = create_wf_task('fcst', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies, + cycledef=cycledef) return task @@ -679,9 +671,11 @@ def _get_postgroups(cdump, config, add_anl=False): varval1, varval2, varval3 = _get_postgroups(self.cdump, self._configs[task_name], add_anl=add_anl_to_post) vardict = {varname2: varval2, varname3: varval3} + cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump + resources = self.get_resource(task_name) task = create_wf_task(task_name, resources, cdump=self.cdump, envar=postenvars, dependency=dependencies, - metatask=task_name, varname=varname1, varval=varval1, vardict=vardict) + metatask=task_name, varname=varname1, varval=varval1, vardict=vardict, cycledef=cycledef) return task @@ -913,8 +907,11 @@ def vrfy(self): deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep=deps) + cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump + resources = self.get_resource('vrfy') - task = create_wf_task('vrfy', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + task = create_wf_task('vrfy', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies, + cycledef=cycledef) return task @@ -961,8 +958,11 @@ def arch(self): deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump + resources = self.get_resource('arch') - task = create_wf_task('arch', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + task = create_wf_task('arch', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies, + cycledef=cycledef) return task @@ -1171,9 +1171,10 @@ def efcs(self): groups = self._get_hybgroups(self._base['NMEM_ENKF'], self._configs['efcs']['NMEM_EFCSGRP']) + cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump resources = self.get_resource('efcs') task = create_wf_task('efcs', resources, cdump=self.cdump, envar=efcsenvars, dependency=dependencies, - metatask='efmn', varname='grp', varval=groups) + metatask='efmn', varname='grp', varval=groups, cycledef=cycledef) return task @@ -1188,8 +1189,11 @@ def echgres(self): deps.append(rocoto.add_dependency(dep_dict)) dependencies = rocoto.create_dependency(dep_condition='and', dep=deps) + cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump + resources = self.get_resource('echgres') - task = create_wf_task('echgres', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies) + task = create_wf_task('echgres', resources, cdump=self.cdump, envar=self.envars, dependency=dependencies, + cycledef=cycledef) return task @@ -1231,9 +1235,11 @@ def _get_eposgroups(epos): varval1, varval2, varval3 = _get_eposgroups(self._configs['epos']) vardict = {varname2: varval2, varname3: varval3} + cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump + resources = self.get_resource('epos') task = create_wf_task('epos', resources, cdump=self.cdump, envar=eposenvars, dependency=dependencies, - metatask='epmn', varname=varname1, varval=varval1, vardict=vardict) + metatask='epmn', varname=varname1, varval=varval1, vardict=vardict, cycledef=cycledef) return task @@ -1251,9 +1257,11 @@ def earc(self): groups = self._get_hybgroups(self._base['NMEM_ENKF'], self._configs['earc']['NMEM_EARCGRP'], start_index=0) + cycledef = 'gdas_half,gdas' if self.cdump in ['gdas'] else self.cdump + resources = self.get_resource('earc') task = create_wf_task('earc', resources, cdump=self.cdump, envar=earcenvars, dependency=dependencies, - metatask='eamn', varname='grp', varval=groups) + metatask='eamn', varname='grp', varval=groups, cycledef=cycledef) return task diff --git a/workflow/rocoto/workflow_xml.py b/workflow/rocoto/workflow_xml.py index 440ff93db5..52ff86db2c 100644 --- a/workflow/rocoto/workflow_xml.py +++ b/workflow/rocoto/workflow_xml.py @@ -3,6 +3,7 @@ import os from distutils.spawn import find_executable from datetime import datetime +from pygw.timetools import to_timedelta from collections import OrderedDict from applications import AppConfig from rocoto.workflow_tasks import get_wf_tasks @@ -110,29 +111,41 @@ def _get_cycledefs(self): return cycledefs def _get_cycledefs_cycled(self): - sdate = self._base['SDATE'].strftime('%Y%m%d%H%M') - edate = self._base['EDATE'].strftime('%Y%m%d%H%M') + sdate = self._base['SDATE'] + edate = self._base['EDATE'] interval = self._base.get('INTERVAL', '06:00:00') - strings = [f'\t{sdate} {edate} {interval}\n'] + strings = [] + strings.append(f'\t{sdate.strftime("%Y%m%d%H%M")} {sdate.strftime("%Y%m%d%H%M")} {interval}') + sdate = sdate + to_timedelta(interval) + strings.append(f'\t{sdate.strftime("%Y%m%d%H%M")} {edate.strftime("%Y%m%d%H%M")} {interval}') if self._app_config.gfs_cyc != 0: - sdate_gfs = self._base['SDATE_GFS'].strftime('%Y%m%d%H%M') - edate_gfs = self._base['EDATE_GFS'].strftime('%Y%m%d%H%M') + sdate_gfs = self._base['SDATE_GFS'] + edate_gfs = self._base['EDATE_GFS'] interval_gfs = self._base['INTERVAL_GFS'] - strings.append(f'\t{sdate_gfs} {edate_gfs} {interval_gfs}') - strings.append('') - strings.append('') + strings.append(f'\t{sdate_gfs.strftime("%Y%m%d%H%M")} {edate_gfs.strftime("%Y%m%d%H%M")} {interval_gfs}') + + sdate_gfs = sdate_gfs + to_timedelta(interval_gfs) + if sdate_gfs <= edate_gfs: + strings.append(f'\t{sdate_gfs.strftime("%Y%m%d%H%M")} {edate_gfs.strftime("%Y%m%d%H%M")} {interval_gfs}') + + strings.append('') + strings.append('') return '\n'.join(strings) def _get_cycledefs_forecast_only(self): - sdate = self._base['SDATE'].strftime('%Y%m%d%H%M') - edate = self._base['EDATE'].strftime('%Y%m%d%H%M') + sdate = self._base['SDATE'] + edate = self._base['EDATE'] interval = self._base.get('INTERVAL_GFS', '24:00:00') - cdump = self._base['CDUMP'] - strings = f'\t{sdate} {edate} {interval}\n\n' + strings = [] + strings.append(f'\t{sdate.strftime("%Y%m%d%H%M")} {edate.strftime("%Y%m%d%H%M")} {interval}') + + sdate = sdate + to_timedelta(interval) + if sdate <= edate: + strings.append(f'\t{sdate.strftime("%Y%m%d%H%M")} {edate.strftime("%Y%m%d%H%M")} {interval}') - return strings + return '\n'.join(strings) @staticmethod def _get_workflow_footer():