@@ -432,21 +432,18 @@ def _update_fx_files(step_name, settings, variable, config_user, fx_vars):
432
432
if not fx_vars :
433
433
return
434
434
435
- fx_vars = [
436
- _get_fx_file (variable , fxvar , config_user )
437
- for fxvar in fx_vars
438
- ]
435
+ fx_vars = [_get_fx_file (variable , fxvar , config_user ) for fxvar in fx_vars ]
439
436
440
437
fx_dict = {fx_var [1 ]['short_name' ]: fx_var [0 ] for fx_var in fx_vars }
441
438
settings ['fx_variables' ] = fx_dict
442
439
logger .info ('Using fx_files: %s for variable %s during step %s' ,
443
- pformat (settings ['fx_variables' ]),
444
- variable ['short_name' ],
440
+ pformat (settings ['fx_variables' ]), variable ['short_name' ],
445
441
step_name )
446
442
447
443
448
444
def _update_fx_settings (settings , variable , config_user ):
449
445
"""Update fx settings depending on the needed method."""
446
+
450
447
# get fx variables either from user defined attribute or fixed
451
448
def _get_fx_vars_from_attribute (step_settings , step_name ):
452
449
user_fx_vars = step_settings .get ('fx_variables' )
@@ -457,8 +454,8 @@ def _get_fx_vars_from_attribute(step_settings, step_name):
457
454
user_fx_vars .append ('sftof' )
458
455
elif step_name == 'mask_landseaice' :
459
456
user_fx_vars = ['sftgif' ]
460
- elif step_name in ('area_statistics' ,
461
- 'volume_statistics' , ' zonal_statistics' ):
457
+ elif step_name in ('area_statistics' , 'volume_statistics' ,
458
+ 'zonal_statistics' ):
462
459
user_fx_vars = []
463
460
return user_fx_vars
464
461
@@ -470,8 +467,8 @@ def _get_fx_vars_from_attribute(step_settings, step_name):
470
467
for step_name , step_settings in settings .items ():
471
468
if step_name in fx_steps :
472
469
fx_vars = _get_fx_vars_from_attribute (step_settings , step_name )
473
- _update_fx_files (step_name , step_settings ,
474
- variable , config_user , fx_vars )
470
+ _update_fx_files (step_name , step_settings , variable , config_user ,
471
+ fx_vars )
475
472
476
473
477
474
def _read_attributes (filename ):
@@ -653,16 +650,12 @@ def get_matching(attributes):
653
650
return grouped_products
654
651
655
652
656
- def _get_preprocessor_products (variables ,
657
- profile ,
658
- order ,
659
- ancestor_products ,
660
- config_user ):
661
- """
662
- Get preprocessor product definitions for a set of datasets.
653
+ def _get_preprocessor_products (variables , profile , order , ancestor_products ,
654
+ config_user , name ):
655
+ """Get preprocessor product definitions for a set of datasets.
663
656
664
- It updates recipe settings as needed by various preprocessors
665
- and sets the correct ancestry.
657
+ It updates recipe settings as needed by various preprocessors and
658
+ sets the correct ancestry.
666
659
"""
667
660
products = set ()
668
661
for variable in variables :
@@ -673,7 +666,7 @@ def _get_preprocessor_products(variables,
673
666
grouped_ancestors = _match_products (ancestor_products , variables )
674
667
else :
675
668
grouped_ancestors = {}
676
-
669
+ missing_vars = set ()
677
670
for variable in variables :
678
671
settings = _get_default_settings (
679
672
variable ,
@@ -682,29 +675,26 @@ def _get_preprocessor_products(variables,
682
675
)
683
676
_apply_preprocessor_profile (settings , profile )
684
677
_update_multi_dataset_settings (variable , settings )
685
- _update_target_levels (
686
- variable = variable ,
687
- variables = variables ,
688
- settings = settings ,
689
- config_user = config_user ,
690
- )
691
- _update_extract_shape (settings , config_user )
692
- _update_weighting_settings (settings , variable )
693
- _update_fx_settings (settings = settings ,
694
- variable = variable ,
695
- config_user = config_user )
696
- _update_target_grid (
697
- variable = variable ,
698
- variables = variables ,
699
- settings = settings ,
700
- config_user = config_user ,
701
- )
702
- _update_regrid_time (variable , settings )
678
+ try :
679
+ _update_target_levels (
680
+ variable = variable ,
681
+ variables = variables ,
682
+ settings = settings ,
683
+ config_user = config_user ,
684
+ )
685
+ except RecipeError as ex :
686
+ missing_vars .add (ex .message )
687
+ _update_preproc_functions (settings , config_user , variable , variables ,
688
+ missing_vars )
703
689
ancestors = grouped_ancestors .get (variable ['filename' ])
704
690
if not ancestors :
705
- ancestors = _get_ancestors (variable , config_user )
706
- if config_user .get ('skip-nonexistent' ) and not ancestors :
707
- logger .info ("Skipping: no data found for %s" , variable )
691
+ try :
692
+ ancestors = _get_ancestors (variable , config_user )
693
+ except RecipeError as ex :
694
+ if config_user .get ('skip-nonexistent' ) and not ancestors :
695
+ logger .info ("Skipping: %s" , ex .message )
696
+ else :
697
+ missing_vars .add (ex .message )
708
698
continue
709
699
product = PreprocessorFile (
710
700
attributes = variable ,
@@ -713,6 +703,11 @@ def _get_preprocessor_products(variables,
713
703
)
714
704
products .add (product )
715
705
706
+ if missing_vars :
707
+ separator = "\n - "
708
+ raise RecipeError (f'Missing data for preprocessor { name } :{ separator } '
709
+ f'{ separator .join (sorted (missing_vars ))} ' )
710
+
716
711
_update_statistic_settings (products , order , config_user ['preproc_dir' ])
717
712
718
713
for product in products :
@@ -721,6 +716,25 @@ def _get_preprocessor_products(variables,
721
716
return products
722
717
723
718
719
+ def _update_preproc_functions (settings , config_user , variable , variables ,
720
+ missing_vars ):
721
+ _update_extract_shape (settings , config_user )
722
+ _update_weighting_settings (settings , variable )
723
+ _update_fx_settings (settings = settings ,
724
+ variable = variable ,
725
+ config_user = config_user )
726
+ try :
727
+ _update_target_grid (
728
+ variable = variable ,
729
+ variables = variables ,
730
+ settings = settings ,
731
+ config_user = config_user ,
732
+ )
733
+ except RecipeError as ex :
734
+ missing_vars .add (ex .message )
735
+ _update_regrid_time (variable , settings )
736
+
737
+
724
738
def _get_single_preprocessor_task (variables ,
725
739
profile ,
726
740
config_user ,
@@ -741,7 +755,9 @@ def _get_single_preprocessor_task(variables,
741
755
profile = profile ,
742
756
order = order ,
743
757
ancestor_products = ancestor_products ,
744
- config_user = config_user )
758
+ config_user = config_user ,
759
+ name = name ,
760
+ )
745
761
746
762
if not products :
747
763
raise RecipeError (
@@ -911,7 +927,13 @@ def __init__(self,
911
927
raw_recipe ['diagnostics' ], raw_recipe .get ('datasets' , []))
912
928
self .entity = self ._initialize_provenance (
913
929
raw_recipe .get ('documentation' , {}))
914
- self .tasks = self .initialize_tasks () if initialize_tasks else None
930
+ try :
931
+ self .tasks = self .initialize_tasks () if initialize_tasks else None
932
+ except RecipeError as ex :
933
+ logger .error (ex .message )
934
+ for task in ex .failed_tasks :
935
+ logger .error (task .message )
936
+ raise
915
937
916
938
@staticmethod
917
939
def _need_ncl (raw_diagnostics ):
@@ -975,8 +997,7 @@ def _initialize_datasets(raw_datasets):
975
997
976
998
@staticmethod
977
999
def _expand_ensemble (variables ):
978
- """
979
- Expand ensemble members to multiple datasets.
1000
+ """Expand ensemble members to multiple datasets.
980
1001
981
1002
Expansion only supports ensembles defined as strings, not lists.
982
1003
"""
@@ -1267,24 +1288,29 @@ def initialize_tasks(self):
1267
1288
tasknames_to_run = self ._cfg .get ('diagnostics' )
1268
1289
1269
1290
priority = 0
1291
+ failed_tasks = []
1270
1292
for diagnostic_name , diagnostic in self .diagnostics .items ():
1271
1293
logger .info ("Creating tasks for diagnostic %s" , diagnostic_name )
1272
1294
1273
1295
# Create preprocessor tasks
1274
1296
for variable_group in diagnostic ['preprocessor_output' ]:
1275
1297
task_name = diagnostic_name + TASKSEP + variable_group
1276
1298
logger .info ("Creating preprocessor task %s" , task_name )
1277
- task = _get_preprocessor_task (
1278
- variables = diagnostic ['preprocessor_output' ]
1279
- [variable_group ],
1280
- profiles = self ._preprocessors ,
1281
- config_user = self ._cfg ,
1282
- task_name = task_name ,
1283
- )
1284
- for task0 in task .flatten ():
1285
- task0 .priority = priority
1286
- tasks .add (task )
1287
- priority += 1
1299
+ try :
1300
+ task = _get_preprocessor_task (
1301
+ variables = diagnostic ['preprocessor_output' ]
1302
+ [variable_group ],
1303
+ profiles = self ._preprocessors ,
1304
+ config_user = self ._cfg ,
1305
+ task_name = task_name ,
1306
+ )
1307
+ except RecipeError as ex :
1308
+ failed_tasks .append (ex )
1309
+ else :
1310
+ for task0 in task .flatten ():
1311
+ task0 .priority = priority
1312
+ tasks .add (task )
1313
+ priority += 1
1288
1314
1289
1315
# Create diagnostic tasks
1290
1316
for script_name , script_cfg in diagnostic ['scripts' ].items ():
@@ -1299,7 +1325,10 @@ def initialize_tasks(self):
1299
1325
task .priority = priority
1300
1326
tasks .add (task )
1301
1327
priority += 1
1302
-
1328
+ if failed_tasks :
1329
+ ex = RecipeError ('Could not create all tasks' )
1330
+ ex .failed_tasks .extend (failed_tasks )
1331
+ raise ex
1303
1332
check .tasks_valid (tasks )
1304
1333
1305
1334
# Resolve diagnostic ancestors
0 commit comments