Skip to content

Commit

Permalink
Merge pull request #2205 from devitocodes/misc-issues
Browse files Browse the repository at this point in the history
misc: various Dimension internal fixes
  • Loading branch information
mloubout authored Sep 20, 2023
2 parents 5eaad80 + 97f1cc8 commit 237a4a8
Show file tree
Hide file tree
Showing 21 changed files with 204 additions and 71 deletions.
7 changes: 6 additions & 1 deletion devito/builtins/initializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ def assign(f, rhs=0, options=None, name='assign', assign_halo=False, **kwargs):
symbolic_max=d.symbolic_max + h.right)
eqs = [eq.xreplace(subs) for eq in eqs]

dv.Operator(eqs, name=name, **kwargs)()
op = dv.Operator(eqs, name=name, **kwargs)
try:
op()
except ValueError:
# Corner case such as assign(u, v) with v a Buffered TimeFunction
op(time_M=f._time_size)


def smooth(f, g, axis=None):
Expand Down
5 changes: 3 additions & 2 deletions devito/core/autotuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,9 @@ def init_time_bounds(stepper, at_args, args):
else:
at_args[dim.max_name] = at_args[dim.min_name] + options['squeezer']
if dim.size_name in args:
# May need to shrink to avoid OOB accesses
at_args[dim.max_name] = min(at_args[dim.max_name], args[dim.max_name])
if not isinstance(args[dim.size_name], range):
# May need to shrink to avoid OOB accesses
at_args[dim.max_name] = min(at_args[dim.max_name], args[dim.max_name])
if at_args[dim.min_name] > at_args[dim.max_name]:
warning("too few time iterations; skipping")
return False
Expand Down
6 changes: 6 additions & 0 deletions devito/ir/stree/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ def preprocess(clusters, options=None, **kwargs):
found = []
for c1 in list(queue):
distributed_aindices = c1.halo_scheme.distributed_aindices
h_indices = set().union(*[d._defines for d in c1.halo_scheme.loc_indices])

# Skip if the halo exchange would end up outside
# its iteration space
if h_indices and not h_indices & dims:
continue

diff = dims - distributed_aindices
intersection = dims & distributed_aindices
Expand Down
4 changes: 4 additions & 0 deletions devito/mpi/halo_scheme.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,10 @@ def distributed(self):
def distributed_aindices(self):
return set().union(*[i.dims for i in self.fmapper.values()])

@cached_property
def loc_indices(self):
return set().union(*[i.loc_indices.keys() for i in self.fmapper.values()])

@cached_property
def arguments(self):
return self.dimensions | set(flatten(self.honored.values()))
Expand Down
57 changes: 39 additions & 18 deletions devito/operations/interpolators.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,31 @@ def _rdim(self):
-self.r+1, self.r, 2*self.r, parent)
for d in self._gdims]

return DimensionTuple(*dims, getters=self._gdims)
# Make radius dimension conditional to avoid OOB
rdims = []
pos = self.sfunction._position_map.values()

for (d, rd, p) in zip(self._gdims, dims, pos):
# Add conditional to avoid OOB
lb = sympy.And(rd + p >= d.symbolic_min - self.r, evaluate=False)
ub = sympy.And(rd + p <= d.symbolic_max + self.r, evaluate=False)
cond = sympy.And(lb, ub, evaluate=False)
rdims.append(ConditionalDimension(rd.name, rd, condition=cond,
indirect=True))

return DimensionTuple(*rdims, getters=self._gdims)

def _augment_implicit_dims(self, implicit_dims, extras=None):
if extras is not None:
extra = set([i for v in extras for i in v.dimensions]) - set(self._gdims)
extra = tuple(extra)
else:
extra = tuple()

def _augment_implicit_dims(self, implicit_dims):
if self.sfunction._sparse_position == -1:
return self.sfunction.dimensions + as_tuple(implicit_dims)
return self.sfunction.dimensions + as_tuple(implicit_dims) + extra
else:
return as_tuple(implicit_dims) + self.sfunction.dimensions
return as_tuple(implicit_dims) + self.sfunction.dimensions + extra

def _coeff_temps(self, implicit_dims):
return []
Expand All @@ -177,24 +195,17 @@ def _interp_idx(self, variables, implicit_dims=None):
mapper = {}
pos = self.sfunction._position_map.values()

for ((di, d), rd, p) in zip(enumerate(self._gdims), self._rdim, pos):
# Add conditional to avoid OOB
lb = sympy.And(rd + p >= d.symbolic_min - self.r, evaluate=False)
ub = sympy.And(rd + p <= d.symbolic_max + self.r, evaluate=False)
cond = sympy.And(lb, ub, evaluate=False)
mapper[d] = ConditionalDimension(rd.name, rd, condition=cond, indirect=True)

# Temporaries for the position
temps = self._positions(implicit_dims)

# Coefficient symbol expression
temps.extend(self._coeff_temps(implicit_dims))

# Substitution mapper for variables
mapper = self._rdim._getters
idx_subs = {v: v.subs({k: c + p
for ((k, c), p) in zip(mapper.items(), pos)})
for v in variables}
idx_subs.update(dict(zip(self._rdim, mapper.values())))

return idx_subs, temps

Expand Down Expand Up @@ -247,8 +258,6 @@ def _interpolate(self, expr, increment=False, self_subs={}, implicit_dims=None):
interpolation expression, but that should be honored when constructing
the operator.
"""
implicit_dims = self._augment_implicit_dims(implicit_dims)

# Derivatives must be evaluated before the introduction of indirect accesses
try:
_expr = expr.evaluate
Expand All @@ -258,6 +267,9 @@ def _interpolate(self, expr, increment=False, self_subs={}, implicit_dims=None):

variables = list(retrieve_function_carriers(_expr))

# Implicit dimensions
implicit_dims = self._augment_implicit_dims(implicit_dims)

# List of indirection indices for all adjacent grid points
idx_subs, temps = self._interp_idx(variables, implicit_dims=implicit_dims)

Expand Down Expand Up @@ -290,11 +302,10 @@ def _inject(self, field, expr, implicit_dims=None):
injection expression, but that should be honored when constructing
the operator.
"""
implicit_dims = self._augment_implicit_dims(implicit_dims) + self._rdim

# Make iterable to support inject((u, v), expr=expr)
# or inject((u, v), expr=(expr1, expr2))
fields, exprs = as_tuple(field), as_tuple(expr)

# Provide either one expr per field or on expr for all fields
if len(fields) > 1:
if len(exprs) == 1:
Expand All @@ -310,6 +321,14 @@ def _inject(self, field, expr, implicit_dims=None):
_exprs = exprs

variables = list(v for e in _exprs for v in retrieve_function_carriers(e))

# Implicit dimensions
implicit_dims = self._augment_implicit_dims(implicit_dims, variables)
# Move all temporaries inside inner loop to improve parallelism
# Can only be done for inject as interpolation need a temporary
# summing temp that wouldn't allow collapsing
implicit_dims = implicit_dims + tuple(r.parent for r in self._rdim)

variables = variables + list(fields)

# List of indirection indices for all adjacent grid points
Expand Down Expand Up @@ -380,5 +399,7 @@ def interpolation_coeffs(self):
@property
def _weights(self):
ddim, cdim = self.interpolation_coeffs.dimensions[1:]
return Mul(*[self.interpolation_coeffs.subs({ddim: ri, cdim: rd-rd.symbolic_min})
for (ri, rd) in enumerate(self._rdim)])
mappers = [{ddim: ri, cdim: rd-rd.parent.symbolic_min}
for (ri, rd) in enumerate(self._rdim)]
return Mul(*[self.interpolation_coeffs.subs(mapper)
for mapper in mappers])
11 changes: 9 additions & 2 deletions devito/operator/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from devito.symbolics import estimate_cost
from devito.tools import (DAG, OrderedSet, Signer, ReducerMap, as_tuple, flatten,
filter_sorted, frozendict, is_integer, split, timed_pass,
timed_region)
timed_region, contains_val)
from devito.types import Grid, Evaluable

__all__ = ['Operator']
Expand Down Expand Up @@ -526,6 +526,7 @@ def _prepare_arguments(self, autotune=None, **kwargs):
edges = [(i, i.parent) for i in self.dimensions
if i.is_Derived and i.parent in set(nodes)]
toposort = DAG(nodes, edges).topological_sort()

futures = {}
for d in reversed(toposort):
if set(d._arg_names).intersection(kwargs):
Expand Down Expand Up @@ -560,18 +561,24 @@ def _prepare_arguments(self, autotune=None, **kwargs):
# a TimeFunction `usave(t_sub, x, y)`, an override for `fact` is
# supplied w/o overriding `usave`; that's legal
pass
elif is_integer(args[k]) and args[k] not in as_tuple(v):
elif is_integer(args[k]) and not contains_val(args[k], v):
raise ValueError("Default `%s` is incompatible with other args as "
"`%s=%s`, while `%s=%s` is expected. Perhaps you "
"forgot to override `%s`?" %
(p, k, v, k, args[k], p))

args = kwargs['args'] = args.reduce_all()

# DiscreteFunctions may be created from CartesianDiscretizations, which in
# turn could be Grids or SubDomains. Both may provide arguments
discretizations = {getattr(kwargs[p.name], 'grid', None) for p in overrides}
discretizations.update({getattr(p, 'grid', None) for p in defaults})
discretizations.discard(None)
# Remove subgrids if multiple grids
if len(discretizations) > 1:
discretizations = {g for g in discretizations
if not any(d.is_Derived for d in g.dimensions)}

for i in discretizations:
args.update(i._arg_values(**kwargs))

Expand Down
8 changes: 8 additions & 0 deletions devito/passes/iet/langbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ def DeviceIteration(self):
def Prodder(self):
return self.lang.Prodder

def _device_pointers(self, *args, **kwargs):
return {}


class DeviceAwareMixin(object):

Expand Down Expand Up @@ -325,6 +328,11 @@ def _(iet):

return _initialize(iet)

def _device_pointers(self, iet):
functions = FindSymbols().visit(iet)
devfuncs = [f for f in functions if f.is_Array and f._mem_local]
return set(devfuncs)

def _is_offloadable(self, iet):
"""
True if the IET computation is offloadable to device, False otherwise.
Expand Down
18 changes: 8 additions & 10 deletions devito/passes/iet/parpragma.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,15 +295,13 @@ def _select_candidates(self, candidates):
except TypeError:
pass

# At least one inner loop (nested) or
# we do not collapse most inner loop if it is an atomic reduction
if not i.is_ParallelAtomic or nested:
collapsable.append(i)
collapsable.append(i)

# Give a score to this candidate, based on the number of fully-parallel
# Iterations and their position (i.e. outermost to innermost) in the nest
score = (
int(root.is_ParallelNoAtomic),
len(self._device_pointers(root)), # Outermost offloadable
int(len([i for i in collapsable if i.is_ParallelNoAtomic]) >= 1),
int(len([i for i in collapsable if i.is_ParallelRelaxed]) >= 1),
-(n0 + 1) # The outermost, the better
Expand Down Expand Up @@ -377,9 +375,14 @@ def _make_partree(self, candidates, nthreads=None):
ncollapsed=ncollapsed, nthreads=nthreads,
**root.args)
prefix = []
elif nthreads is not None:
body = self.HostIteration(schedule='static',
parallel=nthreads is not self.nthreads_nested,
ncollapsed=ncollapsed, nthreads=nthreads,
**root.args)
prefix = []
else:
# pragma ... for ... schedule(..., expr)
assert nthreads is None
nthreads = self.nthreads_nonaffine
chunk_size = Symbol(name='chunk_size')
body = self.HostIteration(ncollapsed=ncollapsed, chunk_size=chunk_size,
Expand Down Expand Up @@ -428,11 +431,6 @@ def _make_nested_partree(self, partree):
if self.nhyperthreads <= self.nested:
return partree

# Loop nest with atomic reductions are more likely to have less latency
# keep outer loop parallel
if partree.root.is_ParallelAtomic:
return partree

# Note: there might be multiple sub-trees amenable to nested parallelism,
# hence we loop over all of them
#
Expand Down
12 changes: 12 additions & 0 deletions devito/tools/data_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def unique(self, key):
Key for which to retrieve a unique value.
"""
candidates = self.getall(key)
candidates = [c for c in candidates if c is not None]

def compare_to_first(v):
first = candidates[0]
Expand All @@ -122,12 +123,23 @@ def compare_to_first(v):
return first in v
elif isinstance(first, Set):
return v in first
elif isinstance(v, range):
if isinstance(first, range):
return first.stop > v.start or v.stop > first.start
else:
return first >= v.start and first < v.stop
elif isinstance(first, range):
return v >= first.start and v < first.stop
else:
return first == v

if len(candidates) == 1:
return candidates[0]
elif all(map(compare_to_first, candidates)):
# return first non-range
for c in candidates:
if not isinstance(c, range):
return c
return candidates[0]
else:
raise ValueError("Unable to find unique value for key %s, candidates: %s"
Expand Down
9 changes: 8 additions & 1 deletion devito/tools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
'roundm', 'powerset', 'invert', 'flatten', 'single_or', 'filter_ordered',
'as_mapper', 'filter_sorted', 'pprint', 'sweep', 'all_equal', 'as_list',
'indices_to_slices', 'indices_to_sections', 'transitive_closure',
'humanbytes']
'humanbytes', 'contains_val']


def prod(iterable, initial=1):
Expand Down Expand Up @@ -75,6 +75,13 @@ def is_integer(value):
return isinstance(value, (int, np.integer, sympy.Integer))


def contains_val(val, items):
try:
return val in items
except TypeError:
return val == items


def generator():
"""
Return a function ``f`` that generates integer numbers starting at 0
Expand Down
4 changes: 3 additions & 1 deletion devito/types/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,13 +838,15 @@ def __new__(cls, *args, **kwargs):
newobj._dimensions = dimensions
newobj._shape = cls.__shape_setup__(**kwargs)
newobj._dtype = cls.__dtype_setup__(**kwargs)
newobj.__init_finalize__(*args, **kwargs)

# All objects created off an existing AbstractFunction `f` (e.g.,
# via .func, or .subs, such as `f(x + 1)`) keep a reference to `f`
# through the `function` field
newobj.function = function or newobj

# Initialization
newobj.__init_finalize__(*args, **kwargs)

return newobj

def __init__(self, *args, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion devito/types/dense.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def __init_finalize__(self, *args, function=None, **kwargs):
# a reference to the user-provided buffer
self._initializer = None
if len(initializer) > 0:
self.data_with_halo[:] = initializer
self.data_with_halo[:] = initializer[:]
else:
# This is a corner case -- we might get here, for example, when
# running with MPI and some processes get 0-size arrays after
Expand Down
Loading

0 comments on commit 237a4a8

Please sign in to comment.