-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy pathtoz.py
192 lines (149 loc) · 6.5 KB
/
toz.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
"""Derivation of variable `toz`."""
import cf_units
import iris
import numba
import numpy as np
from scipy import constants
from ._baseclass import DerivedVariableBase
# Constants
AVOGADRO_CONST = constants.value('Avogadro constant')
AVOGADRO_CONST_UNIT = constants.unit('Avogadro constant')
STANDARD_GRAVITY = 9.81
STANDARD_GRAVITY_UNIT = cf_units.Unit('m s^-2')
MW_AIR = 29
MW_AIR_UNIT = cf_units.Unit('g mol^-1')
MW_O3 = 48
MW_O3_UNIT = cf_units.Unit('g mol^-1')
DOBSON_UNIT = cf_units.Unit('2.69e20 m^-2')
class DerivedVariable(DerivedVariableBase):
"""Derivation of variable `toz`."""
@staticmethod
def required(project):
"""Declare the variables needed for derivation."""
if project == 'CMIP6':
required = [{'short_name': 'o3'}, {'short_name': 'ps'}]
else:
required = [{'short_name': 'tro3'}, {'short_name': 'ps'}]
return required
@staticmethod
def calculate(cubes):
"""Compute total column ozone.
Note
----
The surface pressure is used as a lower integration bound. A fixed
upper integration bound of 0 Pa is used.
"""
tro3_cube = cubes.extract_strict(
iris.Constraint(name='mole_fraction_of_ozone_in_air'))
ps_cube = cubes.extract_strict(
iris.Constraint(name='surface_air_pressure'))
p_layer_widths = _pressure_level_widths(tro3_cube,
ps_cube,
top_limit=0.0)
toz_cube = (tro3_cube * p_layer_widths / STANDARD_GRAVITY * MW_O3 /
MW_AIR)
toz_cube = toz_cube.collapsed('air_pressure', iris.analysis.SUM)
toz_cube.units = (tro3_cube.units * p_layer_widths.units /
STANDARD_GRAVITY_UNIT * MW_O3_UNIT / MW_AIR_UNIT)
# Convert from kg m^-2 to Dobson unit (2.69e20 m^-2 )
toz_cube = toz_cube / MW_O3 * AVOGADRO_CONST
toz_cube.units = toz_cube.units / MW_O3_UNIT * AVOGADRO_CONST_UNIT
toz_cube.convert_units(DOBSON_UNIT)
toz_cube.units = 'DU'
return toz_cube
# Helper functions
def _pressure_level_widths(tro3_cube, ps_cube, top_limit=0.0):
"""Create a cube with pressure level widths.
This is done by taking a 2D surface pressure field as lower bound.
Parameters
----------
tro3_cube : iris.cube.Cube
`Cube` containing `mole_fraction_of_ozone_in_air`.
ps_cube : iris.cube.Cube
`Cube` containing `surface_air_pressure`.
top_limit : float
Pressure in Pa.
Returns
-------
iris.cube.Cube
`Cube` of same shape as `tro3_cube` containing pressure level widths.
"""
pressure_array = _create_pressure_array(tro3_cube, ps_cube, top_limit)
data = _apply_pressure_level_widths(pressure_array)
p_level_widths_cube = tro3_cube.copy(data=data)
p_level_widths_cube.rename('pressure level widths')
p_level_widths_cube.units = ps_cube.units
return p_level_widths_cube
def _create_pressure_array(tro3_cube, ps_cube, top_limit):
"""Create an array filled with the `air_pressure` coord values.
The array is created from the `tro3_cube` with the same dimensions
as `tro3_cube`. This array is then sandwiched with a 2D array
containing the surface pressure and a 2D array containing the top
pressure limit.
"""
# Create 4D array filled with pressure level values
p_levels = tro3_cube.coord('air_pressure').points.astype(np.float32)
p_4d_array = iris.util.broadcast_to_shape(p_levels, tro3_cube.shape, [1])
# Create 4d array filled with surface pressure values
shape = tro3_cube.shape
ps_4d_array = iris.util.broadcast_to_shape(ps_cube.data, shape, [0, 2, 3])
# Set pressure levels below the surface pressure to NaN
pressure_4d = np.where((ps_4d_array - p_4d_array) < 0, np.NaN, p_4d_array)
# Make top_limit last pressure level
top_limit_array = np.full(ps_cube.shape, top_limit, dtype=np.float32)
data = top_limit_array[:, np.newaxis, :, :]
pressure_4d = np.concatenate((pressure_4d, data), axis=1)
# Make surface pressure the first pressure level
data = ps_cube.data[:, np.newaxis, :, :]
pressure_4d = np.concatenate((data, pressure_4d), axis=1)
return pressure_4d
def _apply_pressure_level_widths(array, air_pressure_axis=1):
"""Compute pressure level widths.
For a 1D array with pressure level columns, return a 1D array with
pressure level widths.
"""
return np.apply_along_axis(_p_level_widths, air_pressure_axis, array)
@numba.jit() # ~10x faster
def _p_level_widths(array):
"""Create pressure level widths.
The array with pressure levels is assumed to be monotonic and the
values are decreasing.
The first element is the lower boundary (surface pressure), the last
value is the upper boundary. Thicknesses are only calculated for the
values between these boundaries, the returned array, therefore,
contains two elements less.
>>> _p_level_widths(np.array([1020, 1000, 700, 500, 5]))
array([170., 250., 595.], dtype=float32)
>>> _p_level_widths(np.array([990, np.NaN, 700, 500, 5]))
array([ 0., 390., 595.], dtype=float32)
"""
surface_pressure = array[0]
top_limit = array[-1]
array = array[1:-1]
p_level_widths = np.full(array.shape, np.NAN, dtype=np.float32)
last_pressure_level = len(array) - 1
for i, val in enumerate(array):
# numba would otherwise initialize it to 0 and
# hide bugs that would occur in raw Python
bounds_width = np.NAN
if np.isnan(val):
bounds_width = 0
else:
# Distance to lower bound
if i == 0 or np.isnan(array[i - 1]):
# First pressure level with value
dist_to_lower_bound = surface_pressure - val
else:
dist_to_lower_bound = 0.5 * (array[i - 1] - val)
# Distance to upper bound
if i == last_pressure_level: # last pressure level
dist_to_upper_bound = val - top_limit
else:
dist_to_upper_bound = 0.5 * (val - array[i + 1])
# Check monotonicity - all distances must be >= 0
if dist_to_lower_bound < 0.0 or dist_to_upper_bound < 0.0:
raise ValueError("Pressure level value increased with "
"height.")
bounds_width = dist_to_lower_bound + dist_to_upper_bound
p_level_widths[i] = bounds_width
return p_level_widths