@@ -437,21 +437,18 @@ def _update_fx_files(step_name, settings, variable, config_user, fx_vars):
437
437
if not fx_vars :
438
438
return
439
439
440
- fx_vars = [
441
- _get_fx_file (variable , fxvar , config_user )
442
- for fxvar in fx_vars
443
- ]
440
+ fx_vars = [_get_fx_file (variable , fxvar , config_user ) for fxvar in fx_vars ]
444
441
445
442
fx_dict = {fx_var [1 ]['short_name' ]: fx_var [0 ] for fx_var in fx_vars }
446
443
settings ['fx_variables' ] = fx_dict
447
444
logger .info ('Using fx_files: %s for variable %s during step %s' ,
448
- pformat (settings ['fx_variables' ]),
449
- variable ['short_name' ],
445
+ pformat (settings ['fx_variables' ]), variable ['short_name' ],
450
446
step_name )
451
447
452
448
453
449
def _update_fx_settings (settings , variable , config_user ):
454
450
"""Update fx settings depending on the needed method."""
451
+
455
452
# get fx variables either from user defined attribute or fixed
456
453
def _get_fx_vars_from_attribute (step_settings , step_name ):
457
454
user_fx_vars = step_settings .get ('fx_variables' )
@@ -462,8 +459,8 @@ def _get_fx_vars_from_attribute(step_settings, step_name):
462
459
user_fx_vars .append ('sftof' )
463
460
elif step_name == 'mask_landseaice' :
464
461
user_fx_vars = ['sftgif' ]
465
- elif step_name in ('area_statistics' ,
466
- 'volume_statistics' , ' zonal_statistics' ):
462
+ elif step_name in ('area_statistics' , 'volume_statistics' ,
463
+ 'zonal_statistics' ):
467
464
user_fx_vars = []
468
465
return user_fx_vars
469
466
@@ -475,8 +472,8 @@ def _get_fx_vars_from_attribute(step_settings, step_name):
475
472
for step_name , step_settings in settings .items ():
476
473
if step_name in fx_steps :
477
474
fx_vars = _get_fx_vars_from_attribute (step_settings , step_name )
478
- _update_fx_files (step_name , step_settings ,
479
- variable , config_user , fx_vars )
475
+ _update_fx_files (step_name , step_settings , variable , config_user ,
476
+ fx_vars )
480
477
481
478
482
479
def _read_attributes (filename ):
@@ -658,16 +655,12 @@ def get_matching(attributes):
658
655
return grouped_products
659
656
660
657
661
- def _get_preprocessor_products (variables ,
662
- profile ,
663
- order ,
664
- ancestor_products ,
665
- config_user ):
666
- """
667
- Get preprocessor product definitions for a set of datasets.
658
+ def _get_preprocessor_products (variables , profile , order , ancestor_products ,
659
+ config_user , name ):
660
+ """Get preprocessor product definitions for a set of datasets.
668
661
669
- It updates recipe settings as needed by various preprocessors
670
- and sets the correct ancestry.
662
+ It updates recipe settings as needed by various preprocessors and
663
+ sets the correct ancestry.
671
664
"""
672
665
products = set ()
673
666
for variable in variables :
@@ -678,7 +671,7 @@ def _get_preprocessor_products(variables,
678
671
grouped_ancestors = _match_products (ancestor_products , variables )
679
672
else :
680
673
grouped_ancestors = {}
681
-
674
+ missing_vars = set ()
682
675
for variable in variables :
683
676
settings = _get_default_settings (
684
677
variable ,
@@ -687,29 +680,26 @@ def _get_preprocessor_products(variables,
687
680
)
688
681
_apply_preprocessor_profile (settings , profile )
689
682
_update_multi_dataset_settings (variable , settings )
690
- _update_target_levels (
691
- variable = variable ,
692
- variables = variables ,
693
- settings = settings ,
694
- config_user = config_user ,
695
- )
696
- _update_extract_shape (settings , config_user )
697
- _update_weighting_settings (settings , variable )
698
- _update_fx_settings (settings = settings ,
699
- variable = variable ,
700
- config_user = config_user )
701
- _update_target_grid (
702
- variable = variable ,
703
- variables = variables ,
704
- settings = settings ,
705
- config_user = config_user ,
706
- )
707
- _update_regrid_time (variable , settings )
683
+ try :
684
+ _update_target_levels (
685
+ variable = variable ,
686
+ variables = variables ,
687
+ settings = settings ,
688
+ config_user = config_user ,
689
+ )
690
+ except RecipeError as ex :
691
+ missing_vars .add (ex .message )
692
+ _update_preproc_functions (settings , config_user , variable , variables ,
693
+ missing_vars )
708
694
ancestors = grouped_ancestors .get (variable ['filename' ])
709
695
if not ancestors :
710
- ancestors = _get_ancestors (variable , config_user )
711
- if config_user .get ('skip-nonexistent' ) and not ancestors :
712
- logger .info ("Skipping: no data found for %s" , variable )
696
+ try :
697
+ ancestors = _get_ancestors (variable , config_user )
698
+ except RecipeError as ex :
699
+ if config_user .get ('skip-nonexistent' ) and not ancestors :
700
+ logger .info ("Skipping: %s" , ex .message )
701
+ else :
702
+ missing_vars .add (ex .message )
713
703
continue
714
704
product = PreprocessorFile (
715
705
attributes = variable ,
@@ -718,6 +708,11 @@ def _get_preprocessor_products(variables,
718
708
)
719
709
products .add (product )
720
710
711
+ if missing_vars :
712
+ separator = "\n - "
713
+ raise RecipeError (f'Missing data for preprocessor { name } :{ separator } '
714
+ f'{ separator .join (sorted (missing_vars ))} ' )
715
+
721
716
_update_statistic_settings (products , order , config_user ['preproc_dir' ])
722
717
723
718
for product in products :
@@ -726,6 +721,25 @@ def _get_preprocessor_products(variables,
726
721
return products
727
722
728
723
724
+ def _update_preproc_functions (settings , config_user , variable , variables ,
725
+ missing_vars ):
726
+ _update_extract_shape (settings , config_user )
727
+ _update_weighting_settings (settings , variable )
728
+ _update_fx_settings (settings = settings ,
729
+ variable = variable ,
730
+ config_user = config_user )
731
+ try :
732
+ _update_target_grid (
733
+ variable = variable ,
734
+ variables = variables ,
735
+ settings = settings ,
736
+ config_user = config_user ,
737
+ )
738
+ except RecipeError as ex :
739
+ missing_vars .add (ex .message )
740
+ _update_regrid_time (variable , settings )
741
+
742
+
729
743
def _get_single_preprocessor_task (variables ,
730
744
profile ,
731
745
config_user ,
@@ -746,7 +760,9 @@ def _get_single_preprocessor_task(variables,
746
760
profile = profile ,
747
761
order = order ,
748
762
ancestor_products = ancestor_products ,
749
- config_user = config_user )
763
+ config_user = config_user ,
764
+ name = name ,
765
+ )
750
766
751
767
if not products :
752
768
raise RecipeError (
@@ -916,7 +932,13 @@ def __init__(self,
916
932
raw_recipe ['diagnostics' ], raw_recipe .get ('datasets' , []))
917
933
self .entity = self ._initialize_provenance (
918
934
raw_recipe .get ('documentation' , {}))
919
- self .tasks = self .initialize_tasks () if initialize_tasks else None
935
+ try :
936
+ self .tasks = self .initialize_tasks () if initialize_tasks else None
937
+ except RecipeError as ex :
938
+ logger .error (ex .message )
939
+ for task in ex .failed_tasks :
940
+ logger .error (task .message )
941
+ raise
920
942
921
943
@staticmethod
922
944
def _need_ncl (raw_diagnostics ):
@@ -980,8 +1002,7 @@ def _initialize_datasets(raw_datasets):
980
1002
981
1003
@staticmethod
982
1004
def _expand_ensemble (variables ):
983
- """
984
- Expand ensemble members to multiple datasets.
1005
+ """Expand ensemble members to multiple datasets.
985
1006
986
1007
Expansion only supports ensembles defined as strings, not lists.
987
1008
"""
@@ -1272,24 +1293,29 @@ def initialize_tasks(self):
1272
1293
tasknames_to_run = self ._cfg .get ('diagnostics' )
1273
1294
1274
1295
priority = 0
1296
+ failed_tasks = []
1275
1297
for diagnostic_name , diagnostic in self .diagnostics .items ():
1276
1298
logger .info ("Creating tasks for diagnostic %s" , diagnostic_name )
1277
1299
1278
1300
# Create preprocessor tasks
1279
1301
for variable_group in diagnostic ['preprocessor_output' ]:
1280
1302
task_name = diagnostic_name + TASKSEP + variable_group
1281
1303
logger .info ("Creating preprocessor task %s" , task_name )
1282
- task = _get_preprocessor_task (
1283
- variables = diagnostic ['preprocessor_output' ]
1284
- [variable_group ],
1285
- profiles = self ._preprocessors ,
1286
- config_user = self ._cfg ,
1287
- task_name = task_name ,
1288
- )
1289
- for task0 in task .flatten ():
1290
- task0 .priority = priority
1291
- tasks .add (task )
1292
- priority += 1
1304
+ try :
1305
+ task = _get_preprocessor_task (
1306
+ variables = diagnostic ['preprocessor_output' ]
1307
+ [variable_group ],
1308
+ profiles = self ._preprocessors ,
1309
+ config_user = self ._cfg ,
1310
+ task_name = task_name ,
1311
+ )
1312
+ except RecipeError as ex :
1313
+ failed_tasks .append (ex )
1314
+ else :
1315
+ for task0 in task .flatten ():
1316
+ task0 .priority = priority
1317
+ tasks .add (task )
1318
+ priority += 1
1293
1319
1294
1320
# Create diagnostic tasks
1295
1321
for script_name , script_cfg in diagnostic ['scripts' ].items ():
@@ -1304,7 +1330,10 @@ def initialize_tasks(self):
1304
1330
task .priority = priority
1305
1331
tasks .add (task )
1306
1332
priority += 1
1307
-
1333
+ if failed_tasks :
1334
+ ex = RecipeError ('Could not create all tasks' )
1335
+ ex .failed_tasks .extend (failed_tasks )
1336
+ raise ex
1308
1337
check .tasks_valid (tasks )
1309
1338
1310
1339
# Resolve diagnostic ancestors
0 commit comments