-
Notifications
You must be signed in to change notification settings - Fork 59
/
cfutil.py
2044 lines (1647 loc) · 59.5 KB
/
cfutil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
"""
compliance_checker/cfutil.py
"""
import csv
import re
import warnings
from collections import defaultdict
from functools import lru_cache, partial
from importlib.resources import files
from cf_units import Unit
_UNITLESS_DB = None
_SEA_NAMES = None
VALID_LAT_UNITS = {
"degrees_north",
"degree_north",
"degree_n",
"degrees_n",
"degreen",
"degreesn",
}
VALID_LON_UNITS = {
"degrees_east",
"degree_east",
"degree_e",
"degrees_e",
"degreee",
"degreese",
}
# We can't import appendix d without getting circular imports
DIMENSIONLESS_VERTICAL_COORDINATES = {
"ocean_s_coordinate",
"ocean_s_coordinate_g1",
"ocean_s_coordinate_g2",
"atmosphere_hybrid_sigma_pressure_coordinate",
"atmosphere_hybrid_height_coordinate",
"ocean_double_sigma_coordinate",
"ocean_sigma_z_coordinate",
"ocean_sigma_coordinate",
"atmosphere_sigma_coordinate",
"atmosphere_ln_pressure_coordinate",
"atmosphere_sleve_coordinate",
}
def attr_membership(attr_val, value_set, attr_type=str, modifier_fn=lambda x: x):
"""
Helper function passed to netCDF4.Dataset.get_attributes_by_value
Checks that `attr_val` exists, has the same type as `attr_type`,
and is contained in `value_set`
attr_val: The value of the attribute being checked
attr_type: A type object that the `attr_val` is expected to have the same
type as. If the type is not the same, a warning is issued and
the code attempts to cast `attr_val` to the expected type.
value_set: The set against which membership for `attr_val` is tested
modifier_fn: A function to apply to attr_val prior to applying the set
membership test
"""
if attr_val is None:
return False
if not isinstance(attr_val, attr_type):
warnings.warn(
f"Attribute is of type {type(attr_val)!r}, {attr_type!r} expected. Attempting to cast to expected type.",
stacklevel=2,
)
try:
# if the expected type is str, try casting to unicode type
# since str can't be instantiated
if attr_type is str:
new_attr_val = str(attr_val)
else:
new_attr_val = attr_type(attr_val)
# catch casting errors
except (ValueError, UnicodeEncodeError):
warnings.warn(f"Could not cast to type {attr_type}", stacklevel=2)
return False
else:
new_attr_val = attr_val
try:
is_in_set = modifier_fn(new_attr_val) in value_set
except Exception as e:
warnings.warn(
f"Could not apply modifier function {modifier_fn} to value: {e.msg}",
stacklevel=2,
)
return False
return is_in_set
@lru_cache(128)
def is_dimensionless_standard_name(standard_name_table, standard_name):
"""
Returns True if the units for the associated standard name are
dimensionless. Dimensionless standard names include those that have no
units and units that are defined as constant units in the CF standard name
table i.e. '1', or '1e-3'.
"""
# standard_name must be string, so if it is not, it is *wrong* by default
if not isinstance(standard_name, str):
return False
found_standard_name = standard_name_table.find(
f".//entry[@id='{standard_name}']",
)
if found_standard_name is not None:
canonical_units = Unit(found_standard_name.find("canonical_units").text)
return canonical_units.is_dimensionless()
# if the standard name is not found, assume we need units for the time being
else:
return False
def get_sea_names():
"""
Returns a list of NODC sea names
source of list: https://www.ncei.noaa.gov/resources/ocean-data-format-codes
"""
global _SEA_NAMES
if _SEA_NAMES is None:
buf = {}
with open(
files("compliance_checker") / "data/seanames.csv",
) as f:
reader = csv.reader(f)
for code, sea_name in reader:
buf[sea_name] = code
_SEA_NAMES = buf
return _SEA_NAMES
def is_unitless(nc, variable):
"""
Returns true if the variable is unitless
Note units of '1' are considered whole numbers or parts but still represent
physical units and not the absence of units.
:param netCDF4.Dataset nc: An open netCDF dataset
:param str variable: Name of the variable
"""
units = getattr(nc.variables[variable], "units", None)
return units is None or units == ""
def is_geophysical(nc, variable):
"""
Returns true if the dataset's variable is likely a geophysical variable
:param netCDF4.Dataset nc: An open netCDF dataset
:param str variable: Name of the variable
"""
ncvar = nc.variables[variable]
if getattr(ncvar, "cf_role", None):
return False
# Check for axis
if getattr(ncvar, "axis", None):
return False
standard_name_test = getattr(ncvar, "standard_name", "")
unitless = is_unitless(nc, variable)
if not isinstance(standard_name_test, str):
warnings.warn(
f"Variable {variable} has non string standard name, Attempting cast to string",
stacklevel=2,
)
try:
standard_name = str(standard_name_test)
except ValueError:
warnings.warn(
"Unable to cast standard name to string, excluding from geophysical variables",
stacklevel=2,
)
else:
standard_name = standard_name_test
# Is the standard name associated with coordinates
if standard_name in {
"time",
"latitude",
"longitude",
"height",
"depth",
"altitude",
}:
return False
if variable in get_coordinate_variables(nc):
return False
if variable in get_auxiliary_coordinate_variables(nc):
return False
if variable in get_forecast_metadata_variables(nc):
return False
# Is it dimensionless and unitless?
if len(ncvar.shape) == 0 and unitless:
return False
# Is it a QC Flag?
if "status_flag" in standard_name or hasattr(ncvar, "flag_meanings"):
return False
# Is it a §7.1 Cell Boundaries variable
if variable in get_cell_boundary_variables(nc):
return False
if variable == get_climatology_variable(nc):
return False
# Is it a string but with no defined units?
if hasattr(ncvar.dtype, "char") and ncvar.dtype.char == "S":
return False
elif ncvar.dtype is str:
return False
# Is it an instrument descriptor?
if variable in get_instrument_variables(nc):
return False
# What about a platform descriptor?
if variable in get_platform_variables(nc):
return False
# Skip count/index variables too
if hasattr(ncvar, "sample_dimension") or hasattr(ncvar, "instance_dimension"):
return False
return True
def get_coordinate_variables(nc):
"""
Returns a list of variable names that identify as coordinate variables.
A coordinate variable is a netCDF variable with exactly one dimension. The
name of this dimension must be equivalent to the variable name.
From CF §1.2 Terminology
It is a one-dimensional variable with the same name as its dimension [e.g.,
time(time) ], and it is defined as a numeric data type with values that are
ordered monotonically. Missing values are not allowed in coordinate
variables.
:param netCDF4.Dataset nc: An open netCDF dataset
"""
coord_vars = []
for dimension in nc.dimensions:
if dimension in nc.variables:
if nc.variables[dimension].dimensions == (dimension,):
coord_vars.append(dimension)
return coord_vars
def get_auxiliary_coordinate_variables(nc):
"""
Returns a list of auxiliary coordinate variables
An auxiliary coordinate variable is any netCDF variable that contains
coordinate data, but is not a coordinate variable (in the sense of the term
defined by CF).
:param netCDf4.Dataset nc: An open netCDF dataset
"""
aux_vars = []
# get any variables referenced by the coordinates attribute
for ncvar in nc.get_variables_by_attributes(
coordinates=lambda x: isinstance(x, str),
):
# split the coordinates into individual variable names
referenced_variables = ncvar.coordinates.split(" ")
# if the variable names exist, add them
for referenced_variable in referenced_variables:
if (
referenced_variable in nc.variables
and referenced_variable not in aux_vars
):
aux_vars.append(referenced_variable)
# axis variables are automatically in
for variable in get_axis_variables(nc):
if variable not in aux_vars:
aux_vars.append(variable)
# Last are any variables that define the common coordinate standard names
coordinate_standard_names = [
"time",
"longitude",
"latitude",
"height",
"depth",
"altitude",
]
coordinate_standard_names += DIMENSIONLESS_VERTICAL_COORDINATES
# Some datasets like ROMS use multiple variables to define coordinates
for ncvar in nc.get_variables_by_attributes(
standard_name=lambda x: x in coordinate_standard_names,
):
if ncvar.name not in aux_vars:
aux_vars.append(ncvar.name)
# Remove any that are purely coordinate variables
ret_val = []
for aux_var in aux_vars:
if nc.variables[aux_var].dimensions == (aux_var,):
continue
ret_val.append(aux_var)
return ret_val
def get_forecast_metadata_variables(nc):
"""
Returns a list of variables that represent forecast reference time
metadata.
:param netCDF4.Dataset nc: An open netCDF4 Dataset.
:rtype: list
"""
forecast_metadata_standard_names = {
"forecast_period",
"forecast_reference_time",
}
forecast_metadata_variables = []
for varname in nc.variables:
standard_name = getattr(nc.variables[varname], "standard_name", None)
if standard_name in forecast_metadata_standard_names:
forecast_metadata_variables.append(varname)
return forecast_metadata_variables
def get_cell_boundary_map(nc):
"""
Returns a dictionary mapping a variable to its boundary variable. The
returned dictionary maps a string variable name to the name of the boundary
variable.
:param netCDF4.Dataset nc: netCDF dataset
"""
boundary_map = {}
for variable in nc.get_variables_by_attributes(bounds=lambda x: x is not None):
if variable.bounds in nc.variables:
boundary_map[variable.name] = variable.bounds
return boundary_map
def get_cell_boundary_variables(nc):
"""
Returns a list of variable names for variables that represent cell
boundaries through the `bounds` attribute
:param netCDF4.Dataset nc: netCDF dataset
"""
boundary_variables = []
has_bounds = nc.get_variables_by_attributes(bounds=lambda x: x is not None)
for var in has_bounds:
if var.bounds in nc.variables:
boundary_variables.append(var.bounds)
return boundary_variables
def get_bounds_variables(nc):
contains_bounds = nc.get_variables_by_attributes(bounds=lambda s: s in nc.variables)
return {nc.variables[parent_var.bounds] for parent_var in contains_bounds}
def get_geophysical_variables(nc):
"""
Returns a list of variable names for the variables detected as geophysical
variables.
:param netCDF4.Dataset nc: An open netCDF dataset
"""
parameters = []
for variable in nc.variables:
if is_geophysical(nc, variable) and variable not in get_bounds_variables(nc):
parameters.append(variable)
return parameters
def get_z_variable(nc):
"""
Returns the name of the variable that defines the Z axis or height/depth
:param netCDF4.Dataset nc: netCDF dataset
"""
z_variables = get_z_variables(nc)
if not z_variables:
return None
# Priority is standard_name, units
for var in z_variables:
ncvar = nc.variables[var]
if getattr(ncvar, "standard_name", None) in ("depth", "height", "altitude"):
return var
for var in z_variables:
ncvar = nc.variables[var]
units = getattr(ncvar, "units", None)
if isinstance(units, str):
if units_convertible(units, "bar"):
return var
if units_convertible(units, "m"):
return var
return z_variables[0]
def get_z_variables(nc):
"""
Returns a list of all variables matching definitions for Z
:param netcdf4.dataset nc: an open netcdf dataset object
"""
z_variables = []
# Vertical coordinates will be identifiable by units of pressure or the
# presence of the positive attribute with a value of up/down
# optionally, the vertical type may be indicated by providing the
# standard_name attribute or axis='Z'
total_coords = get_coordinate_variables(nc) + get_auxiliary_coordinate_variables(nc)
for coord_name in total_coords:
if coord_name in z_variables:
continue
coord_var = nc.variables[coord_name]
units = getattr(coord_var, "units", None)
positive = getattr(coord_var, "positive", None)
standard_name = getattr(coord_var, "standard_name", None)
axis = getattr(coord_var, "axis", None)
# If there are no units, we can't identify it as a vertical coordinate
# by checking pressure or positive
if units is not None:
if units_convertible(units, "bar"):
z_variables.append(coord_name)
elif isinstance(positive, str):
if positive.lower() in ["up", "down"]:
z_variables.append(coord_name)
# if axis='Z' we're good
if coord_name not in z_variables and axis == "Z":
z_variables.append(coord_name)
if coord_name not in z_variables and standard_name in (
"depth",
"height",
"altitude",
):
z_variables.append(coord_name)
if (
coord_name not in z_variables
and standard_name in DIMENSIONLESS_VERTICAL_COORDINATES
):
z_variables.append(coord_name)
return z_variables
def get_lat_variable(nc):
"""
Returns the first variable matching latitude
:param netcdf4.dataset nc: an open netcdf dataset object
"""
latitudes = get_latitude_variables(nc)
if latitudes:
return latitudes[0]
return None
def get_latitude_variables(nc):
"""
Returns a list of all variables matching definitions for latitude
:param netcdf4.dataset nc: an open netcdf dataset object
"""
latitude_variables = []
# standard_name takes precedence
for variable in nc.get_variables_by_attributes(standard_name="latitude"):
latitude_variables.append(variable.name)
# Then axis
for variable in nc.get_variables_by_attributes(axis="Y"):
if variable.name not in latitude_variables:
latitude_variables.append(variable.name)
check_fn = partial(
attr_membership,
value_set=VALID_LAT_UNITS,
modifier_fn=lambda s: s.lower(),
)
for variable in nc.get_variables_by_attributes(units=check_fn):
if variable.name not in latitude_variables:
latitude_variables.append(variable.name)
return latitude_variables
def get_true_latitude_variables(nc):
"""
Returns a list of variables defining true latitude.
CF Chapter 4 refers to latitude as a coordinate variable that can also be
used in non-standard coordinate systems like rotated pole and other
projections. Chapter 5 refers to a concept of true latitude where the
variable defines latitude in a standard projection.
True latitude, for lack of a better definition, is simply latitude where
the standard_name is latitude or the units are degrees_north.
:param netCDF4.Dataset nc: An open netCDF dataset
"""
lats = get_latitude_variables(nc)
true_lats = []
for lat in lats:
standard_name = getattr(nc.variables[lat], "standard_name", None)
units = getattr(nc.variables[lat], "units", None)
if standard_name == "latitude":
true_lats.append(lat)
elif isinstance(units, str) and units.lower() in VALID_LAT_UNITS:
true_lats.append(lat)
return true_lats
def get_lon_variable(nc):
"""
Returns the variable for longitude
:param netCDF4.Dataset nc: netCDF dataset
"""
longitudes = get_longitude_variables(nc)
if longitudes:
return longitudes[0]
return None
def get_longitude_variables(nc):
"""
Returns a list of all variables matching definitions for longitude
:param netcdf4.dataset nc: an open netcdf dataset object
"""
longitude_variables = []
# standard_name takes precedence
for variable in nc.get_variables_by_attributes(standard_name="longitude"):
longitude_variables.append(variable.name)
# Then axis
for variable in nc.get_variables_by_attributes(axis="X"):
if variable.name not in longitude_variables:
longitude_variables.append(variable.name)
check_fn = partial(
attr_membership,
value_set=VALID_LON_UNITS,
modifier_fn=lambda s: s.lower(),
)
for variable in nc.get_variables_by_attributes(units=check_fn):
if variable.name not in longitude_variables:
longitude_variables.append(variable.name)
return longitude_variables
def get_true_longitude_variables(nc):
"""
Returns a list of variables defining true longitude.
CF Chapter 4 refers to longitude as a coordinate variable that can also be
used in non-standard coordinate systems like rotated pole and other
projections. Chapter 5 refers to a concept of true longitude where the
variable defines longitude in a standard projection.
True longitude, for lack of a better definition, is simply longitude where
the standard_name is longitude or the units are degrees_north.
:param netCDF4.Dataset nc: An open netCDF dataset
"""
lons = get_longitude_variables(nc)
true_lons = []
for lon in lons:
standard_name = getattr(nc.variables[lon], "standard_name", None)
units = getattr(nc.variables[lon], "units", None)
if standard_name == "longitude":
true_lons.append(lon)
elif isinstance(units, str) and units.lower() in VALID_LON_UNITS:
true_lons.append(lon)
return true_lons
def get_platform_variables(nc):
"""
Returns a list of platform variable NAMES
:param netCDF4.Dataset nc: An open netCDF4 Dataset
"""
candidates = []
for variable in nc.variables:
platform = getattr(nc.variables[variable], "platform", "")
if platform and platform in nc.variables:
if platform not in candidates:
candidates.append(platform)
platform = getattr(nc, "platform", "")
if platform and platform in nc.variables:
if platform not in candidates:
candidates.append(platform)
return candidates
def get_instrument_variables(nc):
"""
Returns a list of instrument variables
:param netCDF4.Dataset nc: An open netCDF4 Dataset
"""
candidates = []
for variable in nc.variables:
instrument = getattr(nc.variables[variable], "instrument", "")
if instrument and instrument in nc.variables:
if instrument not in candidates:
candidates.append(instrument)
instrument = getattr(nc, "instrument", "")
if instrument and instrument in nc.variables:
if instrument not in candidates:
candidates.append(instrument)
return candidates
def get_time_variable(nc):
"""
Returns the likeliest variable to be the time coordinate variable
:param netCDF4.Dataset nc: An open netCDF4 Dataset
"""
for var in nc.variables:
if getattr(nc.variables[var], "axis", "") == "T":
return var
else:
candidates = nc.get_variables_by_attributes(standard_name="time")
if len(candidates) == 1:
return candidates[0].name
else: # Look for a coordinate variable time
for candidate in candidates:
if candidate.dimensions == (candidate.name,):
return candidate.name
# If we still haven't found the candidate
time_variables = set(get_time_variables(nc))
coordinate_variables = set(get_coordinate_variables(nc))
if len(time_variables.intersection(coordinate_variables)) == 1:
return list(time_variables.intersection(coordinate_variables))[0]
auxiliary_coordinates = set(get_auxiliary_coordinate_variables(nc))
if len(time_variables.intersection(auxiliary_coordinates)) == 1:
return list(time_variables.intersection(auxiliary_coordinates))[0]
return None
def get_time_variables(nc):
"""
Returns a list of variables describing the time coordinate
:param netCDF4.Dataset nc: An open netCDF4 Dataset
"""
time_variables = set()
for variable in nc.get_variables_by_attributes(standard_name="time"):
time_variables.add(variable.name)
for variable in nc.get_variables_by_attributes(axis="T"):
if variable.name not in time_variables:
time_variables.add(variable.name)
regx = r"^(?:day|d|hour|hr|h|minute|min|second|s)s? since .*$"
for variable in nc.get_variables_by_attributes(units=lambda x: isinstance(x, str)):
if re.match(regx, variable.units) and variable.name not in time_variables:
time_variables.add(variable.name)
return time_variables
def get_axis_variables(nc):
"""
Returns a list of variables that define an axis of the dataset
:param netCDF4.Dataset nc: An open netCDF4 Dataset
"""
axis_variables = []
for ncvar in nc.get_variables_by_attributes(axis=lambda x: x is not None):
axis_variables.append(ncvar.name)
return axis_variables
def get_climatology_variable(nc):
"""
Returns the variable describing climatology bounds if it exists.
Climatology variables are similar to cell boundary variables that describe
the climatology bounnc.
See Example 7.8 in CF 1.6
:param netCDF4.Dataset nc: An open netCDF4 Dataset
:rtype: str or None
"""
time = get_time_variable(nc)
# If there's no time dimension there's no climatology bounds
if not time:
return None
# Climatology variable is simply whatever time points to under the
# `climatology` attribute.
if hasattr(nc.variables[time], "climatology"):
if nc.variables[time].climatology in nc.variables:
return nc.variables[time].climatology
return None
def _find_standard_name_modifier_variables(nc, return_deprecated=False):
def match_modifier_variables(standard_name_str):
if standard_name_str is None:
return False
if not return_deprecated:
matches = re.search(r"^\w+ +\w+", standard_name_str)
else:
matches = re.search(
r"^\w+ +(?:status_flag|number_of_observations)$",
standard_name_str,
)
return bool(matches)
return [
var.name
for var in nc.get_variables_by_attributes(
standard_name=match_modifier_variables,
)
]
def get_flag_variables(nc):
"""
Returns a list of variables that are defined as flag variables
:param netCDF4.Dataset nc: An open netCDF4 Dataset
"""
flag_variables = []
for name, ncvar in nc.variables.items():
standard_name = getattr(ncvar, "standard_name", None)
if isinstance(standard_name, str) and "status_flag" in standard_name:
flag_variables.append(name)
elif hasattr(ncvar, "flag_meanings"):
flag_variables.append(name)
return flag_variables
def get_grid_mapping_variables(nc):
"""
Returns a list of grid mapping variables
:param netCDF4.Dataset nc: An open netCDF4 Dataset
"""
grid_mapping_variables = set()
for ncvar in nc.get_variables_by_attributes(grid_mapping=lambda x: x is not None):
if ncvar.grid_mapping in nc.variables:
grid_mapping_variables.add(ncvar.grid_mapping)
return grid_mapping_variables
def get_axis_map(nc, variable):
"""
Returns an axis_map dictionary that contains an axis key and the coordinate
names as values.
For example::
{'X': ['longitude'], 'Y': ['latitude'], 'T': ['time']}
The axis C is for compressed coordinates like a reduced grid, and U is for
unknown axis. This can sometimes be physical quantities representing a
continuous discrete axis, like temperature or density.
:param netCDF4.Dataset nc: An open netCDF dataset
:param str variable: Variable name
"""
all_coords = get_coordinate_variables(nc) + get_auxiliary_coordinate_variables(nc)
latitudes = get_latitude_variables(nc)
longitudes = get_longitude_variables(nc)
times = get_time_variables(nc)
heights = get_z_variables(nc)
coordinates = getattr(nc.variables[variable], "coordinates", None)
if not isinstance(coordinates, str):
coordinates = []
else:
coordinates = coordinates.split(" ")
# For example
# {'x': ['longitude'], 'y': ['latitude'], 't': ['time']}
axis_map = defaultdict(list)
for coord_name in all_coords:
axis = getattr(nc.variables[coord_name], "axis", None)
if not axis or axis not in ("X", "Y", "Z", "T"):
if is_compression_coordinate(nc, coord_name):
axis = "C"
elif coord_name in times:
axis = "T"
elif coord_name in longitudes:
axis = "X"
elif coord_name in latitudes:
axis = "Y"
elif coord_name in heights:
axis = "Z"
else:
axis = "U"
if coord_name in nc.variables[variable].dimensions:
if coord_name not in axis_map[axis]:
axis_map[axis].append(coord_name)
elif coord_name in coordinates:
if coord_name not in axis_map[axis]:
axis_map[axis].append(coord_name)
return axis_map
def is_coordinate_variable(nc, variable):
"""
Returns True if the variable is a coordinate variable
:param netCDF4.Dataset nc: An open netCDF dataset
:param str variable: Variable name
"""
if variable not in nc.variables:
return False
return nc.variables[variable].dimensions == (variable,)
def is_compression_coordinate(nc, variable):
"""
Returns True if the variable is a coordinate variable that defines a
compression scheme.
:param netCDF4.Dataset nc: An open netCDF dataset
:param str variable: Variable name
"""
# Must be a coordinate variable
if not is_coordinate_variable(nc, variable):
return False
# must have a string attribute compress
compress = getattr(nc.variables[variable], "compress", None)
if not isinstance(compress, str):
return False
if not compress:
return False
# This should never happen or be allowed
if variable in compress:
return False
# Must point to dimensions
for dim in compress.split():
if dim not in nc.dimensions:
return False
return True
def coordinate_dimension_matrix(nc):
"""
Returns a dictionary of coordinates mapped to their dimensions
:param netCDF4.Dataset nc: An open netCDF dataset
"""
retval = {}
x = get_lon_variable(nc)
if x:
retval["x"] = nc.variables[x].dimensions
y = get_lat_variable(nc)
if y:
retval["y"] = nc.variables[y].dimensions
z = get_z_variable(nc)
if z:
retval["z"] = nc.variables[z].dimensions
t = get_time_variable(nc)
if t:
retval["t"] = nc.variables[t].dimensions
return retval
def is_dataset_valid_ragged_array_repr_featureType(nc, feature_type: str):
"""
Check if a data set is a valid representation of a ragged
array structure. See inline comments.
"""
is_compound = False
if feature_type.lower() in {"timeseriesprofile", "trajectoryprofile"}:
is_compound = True
ftype = feature_type.lower().split("profile")[0]
else:
ftype = feature_type.lower()
# regardless of if compound type or not, must have a cf_role
# variable; if compound, this will be the first part of the
# feature_type as we'll have to search for one with profile_id
# regardless; if single feature type, cf_role must match that
# featureType
cf_role_vars = nc.get_variables_by_attributes(cf_role=lambda x: x is not None)
if (
not cf_role_vars
or (len(cf_role_vars) > 1 and not is_compound)
or (len(cf_role_vars) > 2 and is_compound)
):
return False
cf_role_var = nc.get_variables_by_attributes(cf_role=f"{ftype}_id")[0]
if (
cf_role_var.cf_role.split("_id")[0].lower() != ftype
): # if cf_role_var returns None, this should raise an error?
return False
# now we'll check dimensions for singular feature types and/or
# the first half of the compound featureType
instance_dim = cf_role_var.dimensions
if len(instance_dim) != 1:
return False
# Wow we check for the presence of an index variable or count variable;
# NOTE that if no index or count variables exist, we can't determine with
# certainty that this is invalid, because single-instance data sets
# are valid representations of the ragged array structures. Instead,
# if the index/count variable is present, we check that only one of
# each is present and that their dimensions are correct
index_vars = nc.get_variables_by_attributes(
instance_dimension=lambda x: x is not None,
)
count_vars = nc.get_variables_by_attributes(
sample_dimension=lambda x: x is not None,
)
# if the featureType isn't compound, shouldn't have both count and index
if index_vars and count_vars and not is_compound:
return False
# single featureType, checking for valid index variable
elif index_vars and not is_compound:
if len(index_vars) > 1:
return False
# the index variable's attr 'instance_dimension'
# must be the same as the actual instance dimension,
# which we get from the cf_role variable
if index_vars[0].instance_dimension != instance_dim[0]:
return False
# single featureType, checking for valid count variable
elif count_vars and not is_compound:
if len(count_vars) > 1:
return False
# the count variable must have the same dimensions
# as the instance variable, which has the instance
# dimension as its dimension
if count_vars[0].dimensions != instance_dim:
return False
# Now, if the featureType is compound, an index variable
# must be present for the profile variable. To verify this, we will
# check that the dimension of the index variable is the same dimension
# that is present on the variable which has the attribute cf_role=profile_id.
# The attribute of the index variable 'instance_dimension' should point to the
# name of the dimension of the cf_role variable for either timeSeries or trajectory.
# A count variable must also be present, and should have the same dimension,
# but its attribute 'sample_dimension' must refer to the dimension, which is
# DIFFERENT than the variable with the attribute cf_role=ftype, where ftype is the
# first half of the compound featureType (so either timeseries or trajectory).
# Thus, the dimension of the count variable must be the same dimension as the
# dimension that all the other geophysical variables have.
elif index_vars and count_vars and is_compound:
if len(index_vars) > 1 or len(count_vars) > 1:
return False
profile_cf_role_vars = nc.get_variables_by_attributes(cf_role="profile_id")
if len(profile_cf_role_vars) > 1:
return False
profile_cf_role_var = profile_cf_role_vars[0]
# we first check the dimension of the index variable
if index_vars[0].dimensions != profile_cf_role_var.dimensions:
return False