Skip to content

Commit

Permalink
SOPS 3.9.0: use encrypt/decrypt subcommands, use --filename-override …
Browse files Browse the repository at this point in the history
…option for encryption.
  • Loading branch information
felixfontein committed Jul 4, 2024
1 parent 03d53d0 commit d7ea8fb
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 46 deletions.
4 changes: 4 additions & 0 deletions changelogs/fragments/190-sops-3.9.0.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
minor_changes:
- "Detect SOPS 3.9.0 and use new ``decrypt`` and ``encrypt`` subcommands (https://github.com/ansible-collections/community.sops/pull/190)."
bugfixes:
- "sops_encrypt - properly support ``path_regex`` in ``.sops.yaml`` when SOPS 3.9.0 or later is used (https://github.com/ansible-collections/community.sops/issues/153, https://github.com/ansible-collections/community.sops/pull/190)."
174 changes: 129 additions & 45 deletions plugins/module_utils/sops.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@


import os
import re

from ansible.module_utils.common.text.converters import to_text, to_native

Expand Down Expand Up @@ -44,39 +45,41 @@
203: "FileAlreadyEncrypted"
}

_SOPS_VERSION = re.compile(r'^sops ([0-9]+)\.([0-9]+)\.([0-9]+)')


def _create_single_arg(argument_name):
def f(value, arguments, env):
def f(value, arguments, env, version):
arguments.extend([argument_name, to_native(value)])

return f


def _create_comma_separated(argument_name):
def f(value, arguments, env):
def f(value, arguments, env, version):
arguments.extend([argument_name, ','.join([to_native(v) for v in value])])

return f


def _create_repeated(argument_name):
def f(value, arguments, env):
def f(value, arguments, env, version):
for v in value:
arguments.extend([argument_name, to_native(v)])

return f


def _create_boolean(argument_name):
def f(value, arguments, env):
def f(value, arguments, env, version):
if value:
arguments.append(argument_name)

return f


def _create_env_variable(argument_name):
def f(value, arguments, env):
def f(value, arguments, env, version):
env[argument_name] = value

return f
Expand Down Expand Up @@ -125,46 +128,72 @@ def __init__(self, filename, exit_code, message, decryption=True):
super(SopsError, self).__init__(message)


class Sops():
''' Utility class to perform sops CLI actions '''

@staticmethod
def _add_options(command, env, get_option_value, options):
class SopsRunner(object):
def _add_options(self, command, env, get_option_value, options):
if get_option_value is None:
return
for option, f in options.items():
v = get_option_value(option)
if v is not None:
f(v, command, env)
f(v, command, env, self.version)

def _debug(self, message):
if self.display:
self.display.vvvv(message)
elif self.module:
self.module.debug(message)

def _warn(self, message):
if self.display:
self.display.warning(message)
elif self.module:
self.module.warn(message)

def __init__(self, binary, module=None, display=None):
self.binary = binary
self.module = module
self.display = display

self.version = (3, 7, 3) # if --disable-version-check is not supported, this is version 3.7.3 or older

exit_code, output, err = self._run_command([self.binary, '--version', '--disable-version-check'])
if exit_code == 0:
m = _SOPS_VERSION.match(output.decode('utf-8'))
if m:
self.version = int(m.group(1)), int(m.group(2)), int(m.group(3))
self._debug('SOPS version detected as %s' % (self.version, ))
else:
self._warn('Cannot extract SOPS version from: %s' % repr(output))
else:
self._debug('Cannot detect SOPS version efficiently, likely a version before 3.8.0')

@staticmethod
def get_sops_binary(get_option_value):
cmd = get_option_value('sops_binary') if get_option_value else None
if cmd is None:
cmd = 'sops'
return cmd
def _run_command(self, command, env=None, data=None, cwd=None):
if self.module:
return self.module.run_command(command, environ_update=env, cwd=cwd, encoding=None, data=data, binary_data=True)

@staticmethod
def decrypt(encrypted_file, content=None,
display=None, decode_output=True, rstrip=True, input_type=None, output_type=None, get_option_value=None, module=None):
process = Popen(command, stdin=None if data is None else PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd, env=env)
output, err = process.communicate(input=data)
return process.returncode, output, err

def decrypt(self, encrypted_file, content=None,
decode_output=True, rstrip=True, input_type=None, output_type=None, get_option_value=None):
# Run sops directly, python module is deprecated
command = [Sops.get_sops_binary(get_option_value)]
command = [self.binary]
if self.version >= (3, 9, 0):
command.append("decrypt")
env = os.environ.copy()
Sops._add_options(command, env, get_option_value, GENERAL_OPTIONS)
self._add_options(command, env, get_option_value, GENERAL_OPTIONS)
if input_type is not None:
command.extend(["--input-type", input_type])
if output_type is not None:
command.extend(["--output-type", output_type])
if content is not None:
encrypted_file = '/dev/stdin'
command.extend(["--decrypt", encrypted_file])
if self.version < (3, 9, 0):
command.append("--decrypt")
command.append(encrypted_file)

if module:
exit_code, output, err = module.run_command(command, environ_update=env, encoding=None, data=content, binary_data=True)
else:
process = Popen(command, stdin=None if content is None else PIPE, stdout=PIPE, stderr=PIPE, env=env)
(output, err) = process.communicate(input=content)
exit_code = process.returncode
exit_code, output, err = self._run_command(command, env=env, data=content)

if decode_output:
# output is binary, we want UTF-8 string
Expand All @@ -173,8 +202,8 @@ def decrypt(encrypted_file, content=None,

# sops logs always to stderr, as stdout is used for
# file content
if err and display:
display.vvvv(to_text(err, errors='surrogate_or_strict'))
if err:
self._debug(u'Unexpected stderr:\n' + to_text(err, errors='surrogate_or_strict'))

if exit_code != 0:
raise SopsError(encrypted_file, exit_code, err, decryption=True)
Expand All @@ -184,37 +213,92 @@ def decrypt(encrypted_file, content=None,

return output

@staticmethod
def encrypt(data, display=None, cwd=None, input_type=None, output_type=None, get_option_value=None, module=None):
def encrypt(self, data, cwd=None, input_type=None, output_type=None, filename=None, get_option_value=None):
# Run sops directly, python module is deprecated
command = [Sops.get_sops_binary(get_option_value)]
command = [self.binary]
if self.version >= (3, 9, 0):
command.append("encrypt")
env = os.environ.copy()
Sops._add_options(command, env, get_option_value, GENERAL_OPTIONS)
Sops._add_options(command, env, get_option_value, ENCRYPT_OPTIONS)
self._add_options(command, env, get_option_value, GENERAL_OPTIONS)
self._add_options(command, env, get_option_value, ENCRYPT_OPTIONS)
if input_type is not None:
command.extend(["--input-type", input_type])
if output_type is not None:
command.extend(["--output-type", output_type])
command.extend(["--encrypt", "/dev/stdin"])
if self.version < (3, 9, 0):
command.append("--encrypt")
elif filename:
command.extend(["--filename-override", filename])
command.append("/dev/stdin")

if module:
exit_code, output, err = module.run_command(command, data=data, binary_data=True, cwd=cwd, environ_update=env, encoding=None)
else:
process = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd, env=env)
(output, err) = process.communicate(input=data)
exit_code = process.returncode
exit_code, output, err = self._run_command(command, env=env, data=data, cwd=cwd)

# sops logs always to stderr, as stdout is used for
# file content
if err and display:
display.vvvv(to_text(err, errors='surrogate_or_strict'))
if err and self.display:
self._debug(u'Unexpected stderr:\n' + to_text(err, errors='surrogate_or_strict'))

if exit_code != 0:
raise SopsError('to stdout', exit_code, err, decryption=False)

return output


_SOPS_RUNNER_CACHE = dict()


class Sops():
''' Utility class to perform sops CLI actions '''

@staticmethod
def get_sops_binary(get_option_value):
cmd = get_option_value('sops_binary') if get_option_value else None
if cmd is None:
cmd = 'sops'
return cmd

@staticmethod
def get_sops_runner_from_binary(sops_binary, module=None, display=None):
candidates = _SOPS_RUNNER_CACHE.get(sops_binary, [])
for cand_module, cand_runner in candidates:
if cand_runner is module:
return cand_runner
runner = SopsRunner(sops_binary, module=module, display=display)
candidates.append((module, runner))
_SOPS_RUNNER_CACHE[sops_binary] = candidates
return runner

@staticmethod
def get_sops_runner_from_options(get_option_value, module=None, display=None):
return Sops.get_sops_runner_from_binary(Sops.get_sops_binary(get_option_value), module=module, display=display)

@staticmethod
def decrypt(encrypted_file, content=None,
display=None, decode_output=True, rstrip=True, input_type=None, output_type=None, get_option_value=None, module=None):
runner = Sops.get_sops_runner_from_options(get_option_value, module=module, display=display)
return runner.decrypt(
encrypted_file,
content=content,
decode_output=decode_output,
rstrip=rstrip,
input_type=input_type,
output_type=output_type,
get_option_value=get_option_value,
)

@staticmethod
def encrypt(data, display=None, cwd=None, input_type=None, output_type=None, get_option_value=None, module=None, filename=None):
runner = Sops.get_sops_runner_from_options(get_option_value, module=module, display=display)
return runner.encrypt(
data,
cwd=cwd,
input_type=input_type,
output_type=output_type,
get_option_value=get_option_value,
filename=filename,
)


def get_sops_argument_spec(add_encrypt_specific=False):
argument_spec = {
'sops_binary': {
Expand Down
3 changes: 2 additions & 1 deletion plugins/modules/sops_encrypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,11 @@ def get_option_value(argument_name):
output_type = None
if path.endswith('.json'):
output_type = 'json'
elif path.endswith('.yaml') or path.endswith('.yml'):
elif path.endswith(('.yml', '.yaml')):
output_type = 'yaml'
data = Sops.encrypt(
data=input_data, cwd=directory, input_type=input_type, output_type=output_type,
filename=os.path.relpath(path, directory) if directory is not None else path,
get_option_value=get_option_value, module=module,
)
write_file(module, data)
Expand Down

0 comments on commit d7ea8fb

Please sign in to comment.