Skip to content

Commit

Permalink
Add dn_escape and parse_dn filters
Browse files Browse the repository at this point in the history
Adds filters that can be used to escape values for use inside a DN
attribute value and to parse a DN string into a more structured object.
These filters are useful with the `microsoft.ad.ldap` inventory plugin
as well as when forming values like the `path` or other DN attributes.
  • Loading branch information
jborean93 committed Nov 24, 2023
1 parent 38ae305 commit dcaba1c
Show file tree
Hide file tree
Showing 9 changed files with 558 additions and 18 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@
"[powershell]": {
"editor.formatOnSave": true,
},
"[python]": {
"editor.formatOnSave": false
}
}
3 changes: 2 additions & 1 deletion docs/docsite/rst/guide_ldap_inventory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ The following filters can be used as an easy way to further convert the coerced
* :ref:`microsoft.ad.as_datetime <ansible_collections.microsoft.ad.as_datetime_filter>`
* :ref:`microsoft.ad.as_guid <ansible_collections.microsoft.ad.as_guid_filter>`
* :ref:`microsoft.ad.as_sid <ansible_collections.microsoft.ad.as_sid_filter>`
* :ref:`microsoft.ad.parse_dn <ansible_collections.microsoft.ad.parse_dn_filter>`

An example of these filters being used in the ``attributes`` option can be seen below:

Expand Down Expand Up @@ -409,7 +410,7 @@ The ``raw`` value contains the raw base64 encoded value as stored in AD. The ``t

* ``encrypted_value``: The encrypted password blob as a base64 string
* ``flags``: The flags set as a bitwise int value, currently these are undocumented by Microsoft
* ``update_timestamp``: The FILETIME value of when the
* ``update_timestamp``: The FILETIME value of when the
* ``value``: The decrypted value containing the username and password as a JSON string
* ``debug``: Debug information that indicates why it failed to decrypt the value

Expand Down
2 changes: 1 addition & 1 deletion galaxy.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace: microsoft
name: ad
version: 1.4.1
version: 1.5.0
readme: README.md
authors:
- Jordan Borean @jborean93
Expand Down
46 changes: 46 additions & 0 deletions plugins/filter/dn_escape.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright (c) 2023 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

DOCUMENTATION:
name: dn_escape
author:
- Jordan Borean (@jborean93)
short_description: Escape an LDAP DistinguishedName value string.
version_added: 1.5.0
seealso:
- ref: microsoft.ad.parse_dn <ansible_collections.microsoft.ad.parse_dn_filter>
description: microsoft.ad.parse_dn filter
- ref: microsoft.ad.ldap <ansible_collections.microsoft.ad.ldap_inventory>
description: microsoft.ad.ldap inventory
description:
- Escapes a string value for use in an LDAP DistinguishedName.
- This can be used to escape special characters when building a
DistinguishedName value.
positional: _input
options:
_input:
description:
- The string value to escape.
- This should be just the RDN value not including the attribute type
that prefixes the value, for example C(MyValue) and not C(CN=MyValue).
type: str
required: true

EXAMPLES: |
# This is an example used in the microsoft.ad.ldap plugin
search_base: OU={{ my_ou_variable | microsoft.ad.dn_escape }},DC=domain,DC=com
# This is an example with the microsoft.ad.user module
- microsoft.ad.user:
name: MyUser
password: MyPassword123
state: present
path: OU={{ my_ou_variable | microsoft.ad.dn_escape }},DC=domain,DC=com
RETURN:
_value:
description:
- The escaped RDN attribute value.
type: string
247 changes: 245 additions & 2 deletions plugins/filter/ldap_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import base64
import datetime
import re
import struct
import typing as t
import uuid
Expand All @@ -11,6 +12,154 @@
from ansible.module_utils.common.collections import is_sequence


_RDN_TYPE_PATTERN = re.compile(
r"""
[\ ]* # Ignore leading spaces
(
(
# Lead char is a letter, subsequent chars can be numbers or -
[a-zA-Z][a-zA-Z0-9-]*
)
|
(
# First number must a decimal without a leading 0 unless 0.
# Must also contain at least another entry separated by '.'.
([0-9]|[1-9][0-9]+)
(
\.([0-9]|[1-9][0-9]+)
)+
)
)
[\ ]*= # Ignore trailing spaces before the =
""".encode(
"utf-8"
),
re.VERBOSE,
)

_RDN_VALUE_HEXSTRING_PATTERN = re.compile(
r"""
[\ ]* # Ignore leading spaces
\# # Starts with '#'
(
([0-9a-fA-F]{2})+
)
[\ ]* # Ignore trailing spaces
(?:[+,]|$) # Terminated by '+', ',', or the end of the string
""".encode(
"utf-8"
),
re.VERBOSE,
)

_RDN_VALUE_ESCAPE_PATTERN = re.compile(
r"""
(
(?P<literal>
[+,;<>#=\\\"\ ]
)
|
(?P<hex>
([0-9a-fA-F]{2})
)
)
""".encode(
"utf-8"
),
re.VERBOSE,
)


def _parse_rdn_type(value: memoryview) -> t.Optional[t.Tuple[bytes, int]]:
if match := _RDN_TYPE_PATTERN.match(value):
return match.group(1), len(match.group(0))

return None


def _parse_rdn_value(value: memoryview) -> t.Optional[t.Tuple[bytes, int, bool]]:
if hex_match := _RDN_VALUE_HEXSTRING_PATTERN.match(value):
full_value = hex_match.group(0)
more_rdns = full_value.endswith(b"+")

b_value = base64.b16decode(hex_match.group(1).upper())
return b_value, len(full_value), more_rdns

# Parsing the string value variant as regex is too complicated due to the
# myriad of rules and escaping so it is done manually.
read = 0
new_value = bytearray()
found_spaces = 0

total_len = len(value)
while read < total_len:
current_value = value[read]
current_char = chr(current_value)
read += 1

# We only count the spaces in the middle of the string so we need to
# keep track of how many have been found until the next character.
if current_char == " ":
if new_value:
found_spaces += 1

continue

if current_char in [",", "+"]:
break

# We can add any spaces we are still tentatively collecting as there's
# a real value after it.
if found_spaces:
new_value += b" " * found_spaces
found_spaces = 0

if current_char == "#" and not new_value:
remaining = (
value[read - 1:].tobytes().decode("utf-8", errors="surrogateescape")
)
raise AnsibleFilterError(
f"Found leading # for attribute value but does not match hexstring format at '{remaining}'"
)

elif current_char in ["\00", '"', ";", "<", ">"]:
remaining = (
value[read - 1:].tobytes().decode("utf-8", errors="surrogateescape")
)
raise AnsibleFilterError(
f"Found unescaped character '{current_char}' in attribute value at '{remaining}'"
)

elif current_char == "\\":
if escape_match := _RDN_VALUE_ESCAPE_PATTERN.match(value, pos=read):
if literal_value := escape_match.group("literal"):
new_value += literal_value
read += 1

else:
new_value += base64.b16decode(escape_match.group("hex").upper())
read += 2

else:
remaining = (
value[read - 1:]
.tobytes()
.decode("utf-8", errors="surrogateescape")
)
raise AnsibleFilterError(
f"Found invalid escape sequence in attribute value at '{remaining}"
)

else:
new_value.append(current_value)

if new_value:
return bytes(new_value), read, current_char == "+"

else:
return None


def per_sequence(func: t.Callable[[t.Any], t.Any]) -> t.Any:
def wrapper(value: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:
if is_sequence(value):
Expand All @@ -22,7 +171,10 @@ def wrapper(value: t.Any, *args: t.Any, **kwargs: t.Any) -> t.Any:


@per_sequence
def as_datetime(value: t.Any, format: str = "%Y-%m-%dT%H:%M:%S.%f%z") -> str:
def as_datetime(
value: t.Any,
format: str = "%Y-%m-%dT%H:%M:%S.%f%z",
) -> str:
if isinstance(value, bytes):
value = value.decode("utf-8")

Expand All @@ -31,8 +183,14 @@ def as_datetime(value: t.Any, format: str = "%Y-%m-%dT%H:%M:%S.%f%z") -> str:

# FILETIME is 100s of nanoseconds since 1601-01-01. As Python does not
# support nanoseconds the delta is number of microseconds.
ft_epoch = datetime.datetime(
year=1601,
month=1,
day=1,
tzinfo=datetime.timezone.utc,
)
delta = datetime.timedelta(microseconds=value // 10)
dt = datetime.datetime(year=1601, month=1, day=1, tzinfo=datetime.timezone.utc) + delta
dt = ft_epoch + delta

return dt.strftime(format)

Expand Down Expand Up @@ -77,10 +235,95 @@ def as_sid(value: t.Any) -> str:
return f"S-{revision}-{authority}-{'-'.join(sub_authorities)}"


@per_sequence
def dn_escape(value: str) -> str:
"""Escapes a DistinguisedName attribute value."""
escaped_value = []

end_idx = len(value) - 1
for idx, c in enumerate(value):
if (
# Starting char cannot be ' ' or #
(idx == 0 and c in [" ", "#"])
# Ending char cannot be ' '
or (idx == end_idx and c == " ")
# Any of these chars need to be escaped
# These are documented in RFC 4514
or (c in ['"', "+", ",", ";", "<", ">", "\\"])
):
escaped_value.append(rf"\{c}")

elif c in ["\00", "\n", "\r", "=", "/"]:
# These are extra chars MS says to escape, it must be done using
# the hex syntax
# https://learn.microsoft.com/en-us/previous-versions/windows/desktop/ldap/distinguished-names
escaped_int = ord(c)
escaped_value.append(rf"\{escaped_int:02X}")

else:
escaped_value.append(c)

return "".join(escaped_value)


@per_sequence
def parse_dn(value: str) -> t.List[t.List[str]]:
"""Parses a DistinguishedName and emits a structured object."""

# This behaviour is defined in RFC 4514 and while not defined in that RFC
# this will also remove any extra spaces before and after , = and +.
dn: t.List[t.List[str]] = []

# This operates on bytes for 2 reasons:
# 1. We can use a memoryview for more efficient slicing
# 2. Attribute value hex escaping is done per byte, we cannot decode
# back to a string until we have the final value.
# surrogateescape is used for all conversions to ensure non-unicode bytes
# are preserved using the escape behaviour in UTF-8.
b_value = value.encode("utf-8", errors="surrogateescape")
b_view = memoryview(b_value)

while b_view:
rdns: t.List[str] = []

while True:
attr_type = _parse_rdn_type(b_view)
if not attr_type:
remaining = b_view.tobytes().decode("utf-8", errors="surrogateescape")
raise AnsibleFilterError(
f"Expecting attribute type in RDN entry from '{remaining}'"
)

rdns.append(attr_type[0].decode("utf-8", errors="surrogateescape"))
b_view = b_view[attr_type[1]:]

attr_value = _parse_rdn_value(b_view)
if not attr_value:
remaining = b_view.tobytes().decode("utf-8", errors="surrogateescape")
raise AnsibleFilterError(
f"Expecting attribute value in RDN entry from '{remaining}'"
)

rdns.append(attr_value[0].decode("utf-8", errors="surrogateescape"))
b_view = b_view[attr_value[1]:]

# If ended with + we want to continue parsing the AVA values
if attr_value[2]:
continue
else:
break

dn.append(rdns)

return dn


class FilterModule:
def filters(self) -> t.Dict[str, t.Callable]:
return {
"as_datetime": as_datetime,
"as_guid": as_guid,
"as_sid": as_sid,
"dn_escape": dn_escape,
"parse_dn": parse_dn,
}
Loading

0 comments on commit dcaba1c

Please sign in to comment.