Skip to content


Fix #331: Add REPP masking support
Browse files Browse the repository at this point in the history
This commit does not actually check if characterization has changed
because it fails if any mask is present on an operation where
characterization would change, so a check would be redundant.
  • Loading branch information
goodmami committed Sep 11, 2021
1 parent 1916074 commit 95d66e8
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 79 deletions.
2 changes: 1 addition & 1 deletion
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

### Added

* REPP mask (`=`) operator with basic masking support ([#331])
* REPP mask (`=`) operator and full masking support ([#331])

### Changed

Expand Down
254 changes: 178 additions & 76 deletions delphin/
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from sre_parse import parse_template
from pathlib import Path
from array import array
from itertools import takewhile
import warnings
import logging

Expand Down Expand Up @@ -53,6 +54,11 @@
_CMap = array

# Mask values
_MASK_B = 1 # start of mask
_MASK_I = 2 # inside mask
_MASK_O = 0 # not masked

class REPPError(PyDelphinException):
"""Raised when there is an error in tokenizing with REPP."""
Expand Down Expand Up @@ -144,9 +150,7 @@ def __init__(self, pattern: str, replacement: str):
self.pattern = pattern
self.replacement = replacement
self._re = _compile(pattern)
tracked, untracked = _get_segments(replacement, self._re)
self._tracked = tracked
self._untracked = untracked
self._tracked, self._untracked = _get_segments(replacement, self._re)

def __str__(self):
return f'!{self.pattern}\t\t{self.replacement}'
Expand All @@ -159,6 +163,7 @@ def _apply(
) -> Iterator[REPPStep]:
logger.debug(' %s', self)

applied = False
ms = list(self._re.finditer(s))

if ms:
Expand All @@ -167,88 +172,46 @@ def _apply(
parts: List[str] = []
smap = array('i', [0])
emap = array('i', [0])
_msk = array('i', [0])
new_mask = array('i', [_MASK_O])

for m in ms:
start = m.start()

if any(mask[(start+1):(m.end()+1)]):
sub, _smap, _emap, _mask, delta, blocked = _process_match(
m, mask, shift, self._tracked, self._untracked
if blocked:
applied = True

start = m.start()

# copy up to point of match
if pos < start:
_copy_part(s[pos:start], shift, parts, smap, emap)

if self._tracked or self._untracked:
for literal, start, end, tracked in self._itersegments(m):
litlen = len(literal)
if tracked:
_copy_part(literal, shift, parts, smap, emap)
width = end - start
_insert_part(literal, width, shift, parts,
smap, emap)
_msk.extend([0] * litlen)
shift += width - litlen
# the replacement is empty (match is deleted)
shift += m.end() - start

shift += delta
pos = m.end()

if pos < len(s):
_copy_part(s[pos:], shift, parts, smap, emap)

emap.append(shift - 1)
mask = new_mask
o = ''.join(parts)
applied = True

o = s
smap = _zeromap(o)
emap = _zeromap(o)
_msk = mask
applied = False

yield REPPStep(s, o, self, applied, smap, emap, _msk)

def _itersegments(
self, m: Match[str]
) -> Iterator[Tuple[str, int, int, bool]]:
"""Yield tuples of (replacement, start, end, tracked)."""
start = m.start()

# first yield segments that might be trackable
tracked = self._tracked
if tracked:
spans = {group: m.span(group)
for _, group in tracked
if group is not None}
end = m.start(1) # if literal before tracked group
for literal, group in tracked:
if literal is None:
assert group is not None
start, end = spans[group]
yield ( or '', start, end, True)
start = end
if group + 1 in spans:
end = spans[group + 1][0]
assert literal is not None
yield (literal, start, end, False)

# then group all remaining segments together
remaining: List[Optional[str]] = [ if grp is not None else lit
for lit, grp in self._untracked
if remaining:
# in some cases can return None, so replace with ''
literal = ''.join(segment or '' for segment in remaining)
yield (literal, start, m.end(), False)
yield REPPStep(s, o, self, applied, smap, emap, mask)

class _REPPMask(_REPPOperation):
Expand Down Expand Up @@ -278,9 +241,9 @@ def _apply(
newmask = array('i', mask) # make a copy
for m in self._re.finditer(s):
start = m.start() + 1
newmask[start] = max(1, mask[start])
newmask[start] = max(_MASK_B, mask[start])
for i in range(start + 1, m.end() + 1):
newmask[i] = 2
newmask[i] = _MASK_I
yield REPPStep(s, s, self, True, _zeromap(s), _zeromap(s), newmask)

Expand Down Expand Up @@ -717,25 +680,164 @@ def _copy_part(
emap: _CMap
) -> None:
smap.extend([shift] * len(s))
emap.extend([shift] * len(s))
map_part = [shift] * len(s)

def _insert_part(
s: str,
w: int,
width: int,
shift: int,
parts: List[str],
smap: _CMap,
emap: _CMap
) -> None:
a = shift
b = a - len(s)
smap.extend(range(a, b, -1))
a = shift + w - 1
b = a - len(s)
emap.extend(range(a, b, -1))
endshift = shift - len(s)
_width = width - 1
smap.extend(range(shift, endshift, -1))
emap.extend(range(shift + _width, endshift + _width, -1))

def _process_match(
m: Match[str],
prev_mask: _CMap,
shift: int,
) -> Tuple[str, _CMap, _CMap, _CMap, int, bool]:
parts: List[str] = []
smap = array('i', [])
emap = array('i', [])
mask = array('i', [])
delta = 0
blocked = False

if tracked or untracked:
start = m.start()
# for literals, the end is the start of the next backreference
end = next((m.start(g) for _, g in tracked if g), m.end())

for literal, group in tracked:
if literal is None:
literal = or ''
_copy_part(literal, shift + delta, parts, smap, emap)
end = m.start(group+1) if group < m.lastindex else m.end()
# block if overlap with mask
if any(prev_mask[start+1:end+1]):
blocked = True
width = end - start
litlen = len(literal)
_insert_part(literal, width, shift + delta, parts, smap, emap)
mask.extend([_MASK_O] * litlen)
delta += width - litlen

start = end

if untracked:
# block if untracked overlaps with mask, including backreferences
if (any(prev_mask[start+1:m.end()+1])
or any(any(prev_mask[m.start(grp)+1:m.end(grp)+1])
for lit, grp in untracked if grp)):
blocked = True
# untracked segments can be collapsed into one substring
literal = ''.join( or '' if literal is None else literal
for literal, group in untracked
width = m.end() - start
litlen = len(literal)
_insert_part(literal, width, shift+delta, parts, smap, emap)
mask.extend([_MASK_O] * litlen)
delta += width - litlen
# the replacement is empty (match is deleted)
delta = m.end() - m.start()

substring = ''.join(parts)
if not blocked:
blocked = _check_mask(substring, m, smap, emap, mask, prev_mask)

return substring, smap, emap, mask, delta, blocked

def _check_mask(
s: str,
m: Match[str],
smap: _CMap,
emap: _CMap,
mask: _CMap,
prev_mask: _CMap
) -> bool:
"""Returns True if any masked material has changed.
There are three contexts for mask checks:
1. The first character in the match is a mask continuation
2. The first character after the end of the match is a mask continuation
3. The whole mask is contained within the match
For (1) and (2), the mask must not change nor move around. For (1)
it cannot change but it can move.
start, end = m.span()
mstart, mend = start + 1, end + 1
orig =
# no masked material; no problem
if not any(prev_mask[mstart:mend]):
return False
# whether the final mask is fixed depends on the next mask value;
# check now as the _make_mask_info() function won't see it
end_fixed = prev_mask[mend] == _MASK_I
# prev/next left, middle, right
pl, pm, pr = _make_mask_info(orig, prev_mask[mstart:mend], end_fixed)
nl, nm, nr = _make_mask_info(s, mask, end_fixed)

# check fixed start/end masks
if pl != nl or pr != nr:
return True
# other masks just need to be present in equal number
for substr in set(pm).union(nm):
if pm.get(substr, 0) != nm.get(substr, 0):
return True

return False

def _make_mask_info(
s: str,
mask: _CMap,
end_fixed: bool
) -> Tuple[str, Dict[str, int], str]:
if not mask:
return '', {}, ''
left = s[:_get_mask_len(mask, 0)] if mask[0] == _MASK_I else ''
right = s[-_get_mask_len(mask[::-1], 0) or len(s):] if end_fixed else ''
middle: Dict[str, int] = {}
i = len(left)
j = len(s) - len(right)
while i < j:
if mask[i] == _MASK_B:
mlen = _get_mask_len(mask, i+1) + 1
substr = s[i:mlen]
if substr not in middle:
middle[substr] = 1
middle[substr] += 1
i += mlen
i += 1
return left, middle, right

def _get_mask_len(mask: _CMap, i: int):
return sum(1 for _ in takewhile(lambda v: v == _MASK_I, mask[i:]))

def _tokenize(result: REPPResult, pattern: str) -> List[Tuple[int, int, str]]:
Expand Down
20 changes: 18 additions & 2 deletions tests/
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ def test_basic_external_group_active():
'b': r.from_string(r'!b c')},
x = _r.apply('baba')
assert x.string == 'bbbb'
assert x.startmap.tolist() == [1, 0, 0, 0, 0, 0]
Expand Down Expand Up @@ -261,7 +260,8 @@ def test_mask_basic():
'!~ \n'
'!- \n'
'!(?<=[0-9])~*(?=[0-9]) =\n')
'!(?<=[0-9])~*(?=[0-9]) =\n'
'!8[-+]9 \n')
assert rpp.apply('0-a').string == '0 a' # mask doesn't match
assert rpp.apply('0_a').string == '0 a' # mask doesn't match
assert rpp.apply('0-2').string == '0-2' # mask matches
Expand All @@ -272,6 +272,22 @@ def test_mask_basic():
assert rpp.apply('0-23-4').string == '0-2=3-4'
# mask is intact after shifting
assert rpp.apply('~0-1').string == ' 0-1'
# mask may not be deleted
assert rpp.apply('8-9 8+9').string == '8-9 '

def test_mask_captures():
rpp = r.from_string('=[0-9]-[0-9]\n'
'!([0-9])([0-9]) \\1 \\2\n'
r'!([^=]+)=(.*)$ \2=\1')
assert rpp.apply('123-4').string == '1 23-4' # mask not captured
assert rpp.apply('12-3').string == '1 2-3' # mask partially captured
assert rpp.apply('01-23-45').string == '0 1-2 3-4 5' # adjacent masks
assert rpp.apply('1-2=3-4').string == '1-2=3-4' # no swapping

rpp = r.from_string('=[0-9]\n'
'!([0-9A-Fa-f]) \\1\\1\n')
assert rpp.apply('1a').string == '1aa' # no duplication of masks

def test_trace():
Expand Down

0 comments on commit 95d66e8

Please sign in to comment.