From 1d87f6957aee0dd404941e75dbf3f280691dd0fd Mon Sep 17 00:00:00 2001 From: mloubout Date: Wed, 8 Nov 2023 15:05:06 -0500 Subject: [PATCH] mpi: patch missing dimensions for halospot --- devito/ir/iet/algorithms.py | 5 +++- devito/ir/stree/algorithms.py | 47 ++++++++++++++++++++++++++---- devito/mpi/halo_scheme.py | 4 +++ devito/operations/interpolators.py | 2 +- devito/passes/iet/mpi.py | 3 ++ examples/seismic/tti/operators.py | 12 +++----- tests/test_interpolation.py | 4 +-- tests/test_mpi.py | 42 +++++++++++++++++++++++--- 8 files changed, 98 insertions(+), 21 deletions(-) diff --git a/devito/ir/iet/algorithms.py b/devito/ir/iet/algorithms.py index 59f7237fcaa..ec6381698e3 100644 --- a/devito/ir/iet/algorithms.py +++ b/devito/ir/iet/algorithms.py @@ -40,7 +40,10 @@ def iet_build(stree): nsections += 1 elif i.is_Halo: - body = HaloSpot(queues.pop(i), i.halo_scheme) + try: + body = HaloSpot(queues.pop(i), i.halo_scheme) + except KeyError: + body = HaloSpot(None, i.halo_scheme) elif i.is_Sync: body = SyncSpot(i.sync_ops, body=queues.pop(i, None)) diff --git a/devito/ir/stree/algorithms.py b/devito/ir/stree/algorithms.py index b679ef0d3a1..15291dda313 100644 --- a/devito/ir/stree/algorithms.py +++ b/devito/ir/stree/algorithms.py @@ -81,10 +81,17 @@ def stree_build(clusters, profiler=None, **kwargs): tip = augment_whole_subtree(c, tip, mapper, it) # Attach NodeHalo if necessary - for it, v in mapper.items(): - if needs_nodehalo(it.dim, c.halo_scheme): - v.bottom.parent = NodeHalo(c.halo_scheme, v.bottom.parent) - break + if c.exprs: + for it, v in mapper.items(): + if needs_nodehalo(it.dim, c.halo_scheme): + v.bottom.parent = NodeHalo(c.halo_scheme, v.bottom.parent) + break + else: + for it, v in reversed(mapper.items()): + if needs_nodehalo_dim(it.dim, c.halo_scheme): + v.bottom.children = (*v.bottom.children, + NodeHalo(c.halo_scheme, v.bottom.parent)) + break # Add in NodeExprs exprs = [] @@ -182,7 +189,33 @@ def preprocess(clusters, options=None, **kwargs): processed.append(c.rebuild(exprs=[], ispace=ispace, syncs=syncs)) halo_scheme = HaloScheme.union([c1.halo_scheme for c1 in found]) - processed.append(c.rebuild(halo_scheme=halo_scheme)) + itdims = c.ispace.itdims + if halo_scheme: + # We make sure that the halo scheme placement is valid for this cluster + # Mainly if a cluster has iteration dimension {t, d, f} + # and the halo_scheme has distributed_aindices {d} and + # loc_dirs {f} then the haloscheme needs to be lifted into its own + # {t, f} cluster because it needs to be inside an {f} loop while it would + # be placed outside the {d} loop and therefore outside the {f} loop if + # attached to the cluster. + dindex = max([itdims.index(d) for d in halo_scheme.loc_dirs + if d in itdims] + [0]) + aindex = max([itdims.index(d) for d in halo_scheme.distributed_aindices + if d in itdims] + [0]) + if dindex > aindex: + hispace = c.ispace.project(itdims[:dindex]) + else: + hispace = None + else: + hispace = None + + if hispace and options['mpi']: + processed.append(c.rebuild(exprs=[], + ispace=hispace, + halo_scheme=halo_scheme)) + processed.append(c.rebuild(halo_scheme=None)) + else: + processed.append(c.rebuild(halo_scheme=halo_scheme)) # Sanity check! try: @@ -229,6 +262,10 @@ def needs_nodehalo(d, hs): return d and hs and d._defines.intersection(hs.distributed_aindices) +def needs_nodehalo_dim(d, hs): + return d and hs and d._defines.intersection(hs.loc_indices) + + def reuse_section(candidate, section): try: if not section or candidate.siblings[-1] is not section: diff --git a/devito/mpi/halo_scheme.py b/devito/mpi/halo_scheme.py index 970e84633dc..4a5a8b26fcb 100644 --- a/devito/mpi/halo_scheme.py +++ b/devito/mpi/halo_scheme.py @@ -365,6 +365,10 @@ def distributed_aindices(self): def loc_indices(self): return set().union(*[i.loc_indices.keys() for i in self.fmapper.values()]) + @cached_property + def loc_dirs(self): + return set().union(*[i.loc_dirs.keys() for i in self.fmapper.values()]) + @cached_property def arguments(self): return self.dimensions | set(flatten(self.honored.values())) diff --git a/devito/operations/interpolators.py b/devito/operations/interpolators.py index bee8bc0de48..98e0105e8aa 100644 --- a/devito/operations/interpolators.py +++ b/devito/operations/interpolators.py @@ -195,7 +195,7 @@ def _augment_implicit_dims(self, implicit_dims, extras=None): idims = self.sfunction.dimensions + as_tuple(implicit_dims) + extra else: idims = extra + as_tuple(implicit_dims) + self.sfunction.dimensions - return tuple(filter_ordered(idims)) + return tuple(idims) def _coeff_temps(self, implicit_dims): return [] diff --git a/devito/passes/iet/mpi.py b/devito/passes/iet/mpi.py index 00d96213aa4..34367367111 100644 --- a/devito/passes/iet/mpi.py +++ b/devito/passes/iet/mpi.py @@ -91,6 +91,9 @@ def rule1(dep, candidates, loc_dims): for q in d._defines]) for n, i in enumerate(iters): + if i not in scopes: + continue + candidates = [i.dim._defines for i in iters[n:]] all_candidates = set().union(*candidates) diff --git a/examples/seismic/tti/operators.py b/examples/seismic/tti/operators.py index b6e8687c041..87a00f4cbac 100644 --- a/examples/seismic/tti/operators.py +++ b/examples/seismic/tti/operators.py @@ -551,8 +551,7 @@ def ForwardOperator(model, geometry, space_order=4, # Source and receivers expr = src * dt / m if kernel == 'staggered' else src * dt**2 / m - stencils += src.inject(field=u.forward, expr=expr) - stencils += src.inject(field=v.forward, expr=expr) + stencils += src.inject(field=(u.forward, v.forward), expr=expr) stencils += rec.interpolate(expr=u + v) # Substitute spacing terms to reduce flops @@ -601,8 +600,7 @@ def AdjointOperator(model, geometry, space_order=4, # Construct expression to inject receiver values expr = rec * dt / m if kernel == 'staggered' else rec * dt**2 / m - stencils += rec.inject(field=p.backward, expr=expr) - stencils += rec.inject(field=r.backward, expr=expr) + stencils += rec.inject(field=(p.backward, r.backward), expr=expr) # Create interpolation expression for the adjoint-source stencils += srca.interpolate(expr=p + r) @@ -661,8 +659,7 @@ def JacobianOperator(model, geometry, space_order=4, eqn2 = FD_kernel(model, du, dv, space_order, qu=lin_usrc, qv=lin_vsrc) # Construct expression to inject source values, injecting at u0(t+dt)/v0(t+dt) - src_term = src.inject(field=u0.forward, expr=src * dt**2 / m) - src_term += src.inject(field=v0.forward, expr=src * dt**2 / m) + src_term = src.inject(field=(u0.forward, v0.forward), expr=src * dt**2 / m) # Create interpolation expression for receivers, extracting at du(t)+dv(t) rec_term = rec.interpolate(expr=du + dv) @@ -716,8 +713,7 @@ def JacobianAdjOperator(model, geometry, space_order=4, dm_update = Inc(dm, - (u0 * du.dt2 + v0 * dv.dt2)) # Add expression for receiver injection - rec_term = rec.inject(field=du.backward, expr=rec * dt**2 / m) - rec_term += rec.inject(field=dv.backward, expr=rec * dt**2 / m) + rec_term = rec.inject(field=(du.backward, dv.backward), expr=rec * dt**2 / m) # Substitute spacing terms to reduce flops return Operator(eqn + rec_term + [dm_update], subs=model.spacing_map, diff --git a/tests/test_interpolation.py b/tests/test_interpolation.py index efde8674024..506a3b88def 100644 --- a/tests/test_interpolation.py +++ b/tests/test_interpolation.py @@ -703,8 +703,8 @@ class SparseFirst(SparseFunction): ds = DefaultDimension("ps", default_value=3) grid = Grid((11, 11)) dims = grid.dimensions - s = SparseFirst(name="s", grid=grid, npoint=2, dimensions=(dr, ds), shape=(2, 3)) - s.coordinates.data[:] = [[.5, .5], [.2, .2]] + s = SparseFirst(name="s", grid=grid, npoint=2, dimensions=(dr, ds), shape=(2, 3), + coordinates=[[.5, .5], [.2, .2]]) # Check dimensions and shape are correctly initialized assert s.indices[s._sparse_position] == dr diff --git a/tests/test_mpi.py b/tests/test_mpi.py index 25f85d29229..08a86d0ef38 100644 --- a/tests/test_mpi.py +++ b/tests/test_mpi.py @@ -601,6 +601,42 @@ def test_precomputed_sparse(self, r): Operator(sf1.interpolate(u))() assert np.all(sf1.data == 4) + @pytest.mark.parallel(mode=1) + def test_sparse_first(self): + """ + Tests custom sprase function with sparse dimension as first index. + """ + + class SparseFirst(SparseFunction): + """ Custom sparse class with the sparse dimension as the first one""" + _sparse_position = 0 + + dr = Dimension("cd") + ds = DefaultDimension("ps", default_value=3) + grid = Grid((11, 11)) + dims = grid.dimensions + s = SparseFirst(name="s", grid=grid, npoint=2, dimensions=(dr, ds), shape=(2, 3), + coordinates=[[.5, .5], [.2, .2]]) + + # Check dimensions and shape are correctly initialized + assert s.indices[s._sparse_position] == dr + assert s.shape == (2, 3) + assert s.coordinates.indices[0] == dr + + # Operator + u = TimeFunction(name="u", grid=grid, time_order=1) + fs = Function(name="fs", grid=grid, dimensions=(*dims, ds), shape=(11, 11, 3)) + + eqs = [Eq(u.forward, u+1), Eq(fs, u)] + # No time dependence so need the implicit dim + rec = s.interpolate(expr=s+fs, implicit_dims=grid.stepping_dim) + op = Operator(eqs + rec) + + op(time_M=10) + print + expected = 10*11/2 # n (n+1)/2 + assert np.allclose(s.data, expected) + @pytest.mark.parallel(mode=4) def test_no_grid_dim_slow(self): shape = (12, 13, 14) @@ -624,11 +660,10 @@ class CoordSlowSparseFunction(SparseFunction): rec_eq = s.interpolate(expr=u) op = Operator([Eq(u, 1)] + rec_eq) - print(op) op.apply() assert np.all(s.data == 1) - @pytest.mark.parallel(mode=4) + @pytest.mark.parallel(mode=1) def test_no_grid_dim_slow_time(self): shape = (12, 13, 14) nfreq = 5 @@ -651,8 +686,7 @@ class CoordSlowSparseFunction(SparseTimeFunction): rec_eq = s.interpolate(expr=u) op = Operator([Eq(u, 1)] + rec_eq) - print(op) - op.apply() + op.apply(time_M=5) assert np.all(s.data == 1)