Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update UFSDA ATM ens for new COM directory structure #1538

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions jobs/JGLOBAL_ATMENS_ANALYSIS_FINALIZE
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,17 @@ source "${HOMEgfs}/ush/jjob_header.sh" -e "atmensanlfinal" -c "base atmensanl at
##############################################
# Set variables used in the script
##############################################
GDATE=$(date +%Y%m%d%H -d "${PDY} ${cyc} - ${assim_freq} hours")
GDUMP="gdas"
GDUMP_ENS="enkf${GDUMP}"

##############################################
# Begin JOB SPECIFIC work
##############################################
# Generate COM variable from template
MEMDIR='ensstat' RUN=${GDUMP_ENS} YMD=${PDY} HH=${cyc} generate_com -rx \
COM_ATMOS_ANALYSIS_ENS:COM_ATMOS_ANALYSIS_TMPL

export COMOUT=${COMOUT:-${ROTDIR}/${RUN}.${PDY}/${cyc}}
mkdir -p "${COMOUT}"

# COMIN_GES and COMIN_GES_ENS are used in script
export COMIN_GES="${ROTDIR}/${GDUMP}.${GDATE:0:8}/${GDATE:8:2}/atmos"
export COMIN_GES_ENS="${ROTDIR}/enkf${GDUMP}.${GDATE:0:8}/${GDATE:8:2}"
mkdir -m 755 -p "${COM_ATMOS_ANALYSIS_ENS}"

###############################################################
# Run relevant script
Expand Down
13 changes: 7 additions & 6 deletions jobs/JGLOBAL_ATMENS_ANALYSIS_INITIALIZE
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@ source "${HOMEgfs}/ush/jjob_header.sh" -e "atmensanlinit" -c "base atmensanl atm
##############################################
# Set variables used in the script
##############################################
# shellcheck disable=SC2153
GDATE=$(date +%Y%m%d%H -d "${PDY} ${cyc} - ${assim_freq} hours")
gPDY=${GDATE:0:8}
gcyc=${GDATE:8:2}
GDUMP="gdas"


##############################################
# Begin JOB SPECIFIC work
##############################################
export COMOUT=${COMOUT:-${ROTDIR}/${RUN}.${PDY}/${cyc}}
mkdir -p "${COMOUT}"
# Generate COM variables from templates
RUN=${GDUMP} YMD=${PDY} HH=${cyc} generate_com -rx COM_OBS

# COMIN_GES and COMIN_GES_ENS are used in script
export COMIN_GES="${ROTDIR}/${GDUMP}.${GDATE:0:8}/${GDATE:8:2}/atmos"
export COMIN_GES_ENS="${ROTDIR}/enkf${GDUMP}.${GDATE:0:8}/${GDATE:8:2}"
RUN=${GDUMP} YMD=${gPDY} HH=${gcyc} generate_com -rx \
COM_ATMOS_ANALYSIS_PREV:COM_ATMOS_ANALYSIS_TMPL

###############################################################
# Run relevant script
Expand Down
3 changes: 0 additions & 3 deletions jobs/JGLOBAL_ATMENS_ANALYSIS_RUN
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ source "${HOMEgfs}/ush/jjob_header.sh" -e "atmensanlrun" -c "base atmensanl atme
# Begin JOB SPECIFIC work
##############################################

export COMOUT=${COMOUT:-${ROTDIR}/${RUN}.${PDY}/${cyc}}
mkdir -p "${COMOUT}"

###############################################################
# Run relevant script

Expand Down
1 change: 1 addition & 0 deletions ush/python/pygfs/task/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pygw.template import Template, TemplateConstants
from pygw.logger import logit
from pygw.task import Task
from pygw.timetools import to_fv3time, to_YMD
aerorahul marked this conversation as resolved.
Show resolved Hide resolved

logger = getLogger(__name__.split('.')[-1])

Expand Down
185 changes: 112 additions & 73 deletions ush/python/pygfs/task/atmens_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@

from pygw.attrdict import AttrDict
from pygw.file_utils import FileHandler
from pygw.timetools import add_to_datetime, to_fv3time, to_timedelta, to_YMDH
from pygw.timetools import add_to_datetime, to_fv3time, to_timedelta, to_YMDH, to_YMD
from pygw.fsutils import rm_p, chdir
from pygw.yaml_file import parse_yamltmpl, parse_j2yaml, save_as_yaml
from pygw.logger import logit
from pygw.executable import Executable
from pygw.exceptions import WorkflowException
from pygw.template import Template, TemplateConstants
from pygfs.task.analysis import Analysis

logger = getLogger(__name__.split('.')[-1])
Expand Down Expand Up @@ -45,8 +46,6 @@ def __init__(self, config):
'npz_anl': self.config.LEVS - 1,
'ATM_WINDOW_BEGIN': _window_begin,
'ATM_WINDOW_LENGTH': f"PT{self.config.assim_freq}H",
'comin_ges_atm': self.config.COMIN_GES,
'comin_ges_atmens': self.config.COMIN_GES_ENS,
'OPREFIX': f"{self.config.EUPD_CYC}.t{self.runtime_config.cyc:02d}z.", # TODO: CDUMP is being replaced by RUN
'APREFIX': f"{self.runtime_config.CDUMP}.t{self.runtime_config.cyc:02d}z.", # TODO: CDUMP is being replaced by RUN
'GPREFIX': f"gdas.t{self.runtime_config.previous_cycle.hour:02d}z.",
Expand Down Expand Up @@ -79,6 +78,27 @@ def initialize(self: Analysis) -> None:
"""
super().initialize()

# Make member directories in DATA for background and in DATA and ROTDIR for analysis files
# create template dictionary for output member analysis directories
template_inc = self.task_config.COM_ATMOS_ANALYSIS_TMPL
tmpl_inc_dict = {
'ROTDIR': self.task_config.ROTDIR,
'RUN': self.task_config.RUN,
'YMD': to_YMD(self.task_config.current_cycle),
'HH': self.task_config.current_cycle.strftime('%H')
}
dirlist = []
for imem in range(1, self.task_config.NMEM_ENKF + 1):
dirlist.append(os.path.join(self.task_config.DATA, 'bkg', f'mem{imem:03d}'))
dirlist.append(os.path.join(self.task_config.DATA, 'anl', f'mem{imem:03d}'))

# create output directory path for member analysis
tmpl_inc_dict['MEMDIR'] = f"mem{imem:03d}"
incdir = Template.substitute_structure(template_inc, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_inc_dict.get)
dirlist.append(incdir)

FileHandler({'mkdir': dirlist}).sync()

# stage CRTM fix files
crtm_fix_list_path = os.path.join(self.task_config.HOMEgfs, 'parm', 'parm_gdas', 'atm_crtm_coeff.yaml')
logger.debug(f"Staging CRTM fix files from {crtm_fix_list_path}")
Expand All @@ -92,7 +112,7 @@ def initialize(self: Analysis) -> None:
FileHandler(jedi_fix_list).sync()

# stage backgrounds
FileHandler(self.get_bkg_dict(AttrDict(self.task_config))).sync()
FileHandler(self.get_bkg_dict()).sync()

# generate ensemble da YAML file
logger.debug(f"Generate ensemble da YAML file: {self.task_config.fv3jedi_yaml}")
Expand All @@ -108,13 +128,6 @@ def initialize(self: Analysis) -> None:
]
FileHandler({'mkdir': newdirs}).sync()

# Make directories for member analysis files
anldir = []
for imem in range(1, self.task_config.NMEM_ENKF + 1):
memchar = f"mem{imem:03d}"
anldir.append(os.path.join(self.task_config.DATA, 'anl', f'mem{imem:03d}'))
FileHandler({'mkdir': anldir}).sync()

@logit(logger)
def execute(self: Analysis) -> None:
"""Execute a global atmens analysis
Expand Down Expand Up @@ -169,7 +182,7 @@ def finalize(self: Analysis) -> None:
"""
# ---- tar up diags
# path of output tar statfile
atmensstat = os.path.join(self.task_config.COMOUT, f"{self.task_config.APREFIX}atmensstat")
atmensstat = os.path.join(self.task_config.COM_ATMOS_ANALYSIS_ENS, f"{self.task_config.APREFIX}atmensstat")

# get list of diag files to put in tarball
diags = glob.glob(os.path.join(self.task_config.DATA, 'diags', 'diag*nc4'))
Expand All @@ -190,12 +203,12 @@ def finalize(self: Analysis) -> None:
archive.add(diaggzip, arcname=os.path.basename(diaggzip))

# copy full YAML from executable to ROTDIR
logger.info(f"Copying {self.task_config.fv3jedi_yaml} to {self.task_config.COMOUT}")
logger.info(f"Copying {self.task_config.fv3jedi_yaml} to {self.task_config.COM_ATMOS_ANALYSIS_ENS}")
src = os.path.join(self.task_config.DATA, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atmens.yaml")
dest = os.path.join(self.task_config.COMOUT, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atmens.yaml")
dest = os.path.join(self.task_config.COM_ATMOS_ANALYSIS_ENS, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atmens.yaml")
logger.debug(f"Copying {src} to {dest}")
yaml_copy = {
'mkdir': [self.task_config.COMOUT],
'mkdir': [self.task_config.COM_ATMOS_ANALYSIS_ENS],
'copy': [[src, dest]]
}
FileHandler(yaml_copy).sync()
Expand All @@ -207,55 +220,6 @@ def finalize(self: Analysis) -> None:
def clean(self):
super().clean()

@logit(logger)
def get_bkg_dict(self, task_config: Dict[str, Any]) -> Dict[str, List[str]]:
"""Compile a dictionary of model background files to copy

This method constructs a dictionary of FV3 RESTART files (coupler, core, tracer)
that are needed for global atmens DA and returns said dictionary for use by the FileHandler class.

Parameters
----------
task_config: Dict
a dictionary containing all of the configuration needed for the task

Returns
----------
bkg_dict: Dict
a dictionary containing the list of model background files to copy for FileHandler
"""
# NOTE for now this is FV3 RESTART files and just assumed to be fh006
# loop over ensemble members
dirlist = []
bkglist = []
for imem in range(1, task_config.NMEM_ENKF + 1):
memchar = f"mem{imem:03d}"

# accumulate directory list for member restart files
dirlist.append(os.path.join(task_config.DATA, 'bkg', memchar))

# get FV3 RESTART files, this will be a lot simpler when using history files
rst_dir = os.path.join(task_config.comin_ges_atmens, memchar, 'atmos/RESTART')
run_dir = os.path.join(task_config.DATA, 'bkg', memchar)

# atmens DA needs coupler
basename = f'{to_fv3time(task_config.current_cycle)}.coupler.res'
bkglist.append([os.path.join(rst_dir, basename), os.path.join(task_config.DATA, 'bkg', memchar, basename)])

# atmens DA needs core, srf_wnd, tracer, phy_data, sfc_data
for ftype in ['fv_core.res', 'fv_srf_wnd.res', 'fv_tracer.res', 'phy_data', 'sfc_data']:
template = f'{to_fv3time(self.task_config.current_cycle)}.{ftype}.tile{{tilenum}}.nc'
for itile in range(1, task_config.ntiles + 1):
basename = template.format(tilenum=itile)
bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)])

bkg_dict = {
'mkdir': dirlist,
'copy': bkglist,
}

return bkg_dict

@logit(logger)
def jedi2fv3inc(self: Analysis) -> None:
"""Generate UFS model readable analysis increment
Expand Down Expand Up @@ -285,21 +249,37 @@ def jedi2fv3inc(self: Analysis) -> None:
# Reference the python script which does the actual work
incpy = os.path.join(self.task_config.HOMEgfs, 'ush/jediinc2fv3.py')

# create template dictionaries
template_inc = self.task_config.COM_ATMOS_ANALYSIS_TMPL
tmpl_inc_dict = {
'ROTDIR': self.task_config.ROTDIR,
'RUN': self.task_config.RUN,
'YMD': to_YMD(self.task_config.current_cycle),
'HH': self.task_config.current_cycle.strftime('%H')
}

template_ges = self.task_config.COM_ATMOS_HISTORY_TMPL
tmpl_ges_dict = {
'ROTDIR': self.task_config.ROTDIR,
'RUN': self.task_config.RUN,
'YMD': to_YMD(self.task_config.previous_cycle),
'HH': self.task_config.previous_cycle.strftime('%H')
}

# loop over ensemble members
for imem in range(1, self.task_config.NMEM_ENKF + 1):
memchar = f"mem{imem:03d}"

# make output directory for member increment
incdir = [
os.path.join(self.task_config.COMOUT, memchar, 'atmos')
]
FileHandler({'mkdir': incdir}).sync()
# create output path for member analysis increment
tmpl_inc_dict['MEMDIR'] = memchar
incdir = Template.substitute_structure(template_inc, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_inc_dict.get)

# rewrite UFS-DA atmens increments
atmges_fv3 = os.path.join(self.task_config.COMIN_GES_ENS, memchar, 'atmos',
f"{self.task_config.CDUMP}.t{self.runtime_config.previous_cycle.hour:02d}z.atmf006.nc")
tmpl_ges_dict['MEMDIR'] = memchar
gesdir = Template.substitute_structure(template_ges, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_ges_dict.get)
atmges_fv3 = os.path.join(gesdir, f"{self.task_config.CDUMP}.t{self.task_config.previous_cycle.hour:02d}z.atmf006.nc")
atminc_jedi = os.path.join(self.task_config.DATA, 'anl', memchar, f'atminc.{cdate_inc}z.nc4')
atminc_fv3 = os.path.join(self.task_config.COMOUT, memchar, 'atmos',
f"{self.task_config.CDUMP}.t{self.runtime_config.cyc:02d}z.atminc.nc")
atminc_fv3 = os.path.join(incdir, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atminc.nc")

# Execute incpy to create the UFS model atm increment file
# TODO: use MPMD or parallelize with mpi4py
Expand All @@ -310,3 +290,62 @@ def jedi2fv3inc(self: Analysis) -> None:
cmd.add_default_arg(atminc_fv3)
logger.debug(f"Executing {cmd}")
cmd(output='stdout', error='stderr')

@logit(logger)
def get_bkg_dict(self: Analysis) -> Dict[str, List[str]]:
"""Compile a dictionary of model background files to copy

This method constructs a dictionary of ensemble FV3 restart files (coupler, core, tracer)
that are needed for global atmens DA and returns said dictionary for use by the FileHandler class.

Parameters
----------
None

Returns
----------
bkg_dict: Dict
a dictionary containing the list of model background files to copy for FileHandler
"""
# NOTE for now this is FV3 restart files and just assumed to be fh006
# loop over ensemble members
rstlist = []
bkglist = []

# get FV3 restart files, this will be a lot simpler when using history files
template_res = self.task_config.COM_ATMOS_RESTART_TMPL
tmpl_res_dict = {
'ROTDIR': self.task_config.ROTDIR,
'RUN': self.task_config.RUN,
'YMD': to_YMD(self.task_config.previous_cycle),
'HH': self.task_config.previous_cycle.strftime('%H'),
'MEMDIR': None
}

for imem in range(1, self.task_config.NMEM_ENKF + 1):
memchar = f"mem{imem:03d}"

# get FV3 restart files, this will be a lot simpler when using history files
tmpl_res_dict['MEMDIR'] = memchar
rst_dir = Template.substitute_structure(template_res, TemplateConstants.DOLLAR_CURLY_BRACE, tmpl_res_dict.get)
rstlist.append(rst_dir)

run_dir = os.path.join(self.task_config.DATA, 'bkg', memchar)

# atmens DA needs coupler
basename = f'{to_fv3time(self.task_config.current_cycle)}.coupler.res'
bkglist.append([os.path.join(rst_dir, basename), os.path.join(self.task_config.DATA, 'bkg', memchar, basename)])

# atmens DA needs core, srf_wnd, tracer, phy_data, sfc_data
for ftype in ['fv_core.res', 'fv_srf_wnd.res', 'fv_tracer.res', 'phy_data', 'sfc_data']:
template = f'{to_fv3time(self.task_config.current_cycle)}.{ftype}.tile{{tilenum}}.nc'
for itile in range(1, self.task_config.ntiles + 1):
basename = template.format(tilenum=itile)
bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)])

bkg_dict = {
'mkdir': rstlist,
'copy': bkglist,
}

return bkg_dict