Skip to content

Commit

Permalink
vdk-core: add vdk sdk secrets api - part III (#2325)
Browse files Browse the repository at this point in the history
Incremental change for adding the secrets capability.

Add secrets implementation and tests in vdk-core.

---------

Signed-off-by: Dako Dakov <ddakov@vmware.com>
  • Loading branch information
dakodakov authored and yonitoo committed Jun 27, 2023
1 parent ce7a82e commit 401386c
Show file tree
Hide file tree
Showing 25 changed files with 779 additions and 7 deletions.
2 changes: 1 addition & 1 deletion projects/vdk-core/src/vdk/api/job_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class ISecrets:
"""

@abstractmethod
def get_secret(self, name: str, default_value: Any = None) -> str:
def get_secret(self, key: str, default_value: Any = None) -> str:
pass

@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
from vdk.internal.builtin_plugins.job_properties.properties_api_plugin import (
PropertiesApiPlugin,
)
from vdk.internal.builtin_plugins.job_secrets.secrets_api_plugin import (
SecretsApiPlugin,
)
from vdk.internal.builtin_plugins.notification.notification import NotificationPlugin
from vdk.internal.builtin_plugins.termination_message.writer import (
TerminationMessageWriterPlugin,
Expand Down Expand Up @@ -119,6 +122,7 @@ def vdk_start(plugin_registry: PluginRegistry, command_line_args: List) -> None:
plugin_registry.load_plugin_with_hooks_impl(NotificationPlugin())
plugin_registry.load_plugin_with_hooks_impl(IngesterConfigurationPlugin())
plugin_registry.load_plugin_with_hooks_impl(PropertiesApiPlugin())
plugin_registry.load_plugin_with_hooks_impl(SecretsApiPlugin())
# TODO: should be in run package only
plugin_registry.add_hook_specs(JobRunHookSpecs)
plugin_registry.load_plugin_with_hooks_impl(JobConfigIniPlugin())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from typing import List
from typing import Type


def check_valid_property(k: str, v: str, supported_types: List[Type] = []) -> None:
def check_valid_property(k: str, v: str, supported_types=None) -> None:
"""
Check if property key and value are valid
"""
if supported_types is None:
supported_types = []

if str != type(k) or k.strip() != k or "".join(k.split()) != k:
msg = (
f"Property {k} is of unsupported type or has unsupported name. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class PropertiesNotAvailable(IProperties):

def __init__(self, error_handler: Callable[[str], None]):
self._error_handler = error_handler
pass

def get_property(self, name, default_value=None): # @UnusedVariable
self.tell_user("get_property")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0


def check_valid_secret(k: str, v: str, supported_types=None) -> None:
"""
Check if secret key and value are valid
"""

if supported_types is None:
supported_types = []

if str != type(k) or k.strip() != k or "".join(k.split()) != k:
msg = (
f"Secret {k} is of unsupported type or has unsupported name. "
f"Only string secrets with no whitespaces in the name are supported."
)
raise ValueError(msg)

if not supported_types:
supported_types = [int, float, str, list, type(None)]

if type(v) not in supported_types:
msg = (
f"Value for secret {k} is of unsupported type {type(v)}. "
f"Only int, float, str, list, and NoneType types are supported. "
)
raise ValueError(msg)
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from vdk.api.job_input import ISecrets


class CachedSecretsWrapper(ISecrets):
"""
Wraps any ISecrets so that get_* calls are cached until set_* is called
Since secrets are rarely updated, it's better to have them cached.
"""

def __init__(self, secrets_impl: ISecrets):
self.provider = secrets_impl
self.cached_dict = (
None # when None then cache needs to be refreshed on next get_*()
)

def get_secret(self, name, default_value=None):
res = self._get_cached_dict().get(name, default_value)
return res

def get_all_secrets(self):
return self._get_cached_dict()

def set_all_secrets(self, secrets):
self.provider.set_all_secrets(secrets)
self.cached_dict = None

def _get_cached_dict(self):
if self.cached_dict is None:
self.cached_dict = self.provider.get_all_secrets()
return self.cached_dict
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
from copy import deepcopy
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

from vdk.api.job_input import ISecrets
from vdk.api.plugin.plugin_input import ISecretsServiceClient

from ...core.errors import log_and_throw
from ...core.errors import ResolvableBy
from .base_secrets_impl import check_valid_secret

log = logging.getLogger(__name__)

SecretValue = Union[int, float, str, list, dict, None]


class DataJobsServiceSecrets(ISecrets):
"""
Data Jobs Secrets implementation.
"""

__VALID_TYPES = [int, float, str, list, dict, type(None)]

def __init__(
self,
job_name: str,
team_name: str,
secrets_service_client: ISecretsServiceClient,
write_preprocessors: Optional[List[ISecretsServiceClient]] = None,
):
log.debug(
f"Data Job Secrets for job {job_name} with service client: {secrets_service_client}"
)
self._job_name = job_name
self._team_name = team_name
self._secrets_service_client = secrets_service_client
self._write_preprocessors = write_preprocessors

def get_secret(self, name: str, default_value: SecretValue = None) -> SecretValue:
"""
:param name: The name of the secret
:param default_value: default value ot return if missing
"""
secrets = self.get_all_secrets()
if name in secrets:
return secrets[name]
else:
log.warning(
"Secret {} is not among Job secrets, returning default value: {}".format(
name, default_value
)
)
return default_value

def get_all_secrets(self) -> Dict[str, SecretValue]:
"""
:return: all stored secrets
"""
return self._secrets_service_client.read_secrets(
self._job_name, self._team_name
)

def set_all_secrets(self, secrets: Dict[str, SecretValue]) -> None:
"""
Invokes the write pre-processors if any are configured.
Persists the passed secrets overwriting all previous properties.
"""
if self._write_preprocessors:
secrets = deepcopy(
secrets
) # keeps the outer scope originally-referenced dictionary intact
for client in self._write_preprocessors:
try:
secrets = client.write_secrets(
self._job_name, self._team_name, secrets
)
except Exception as e:
log_and_throw(
to_be_fixed_by=ResolvableBy.USER_ERROR,
log=log,
what_happened=f"A write pre-processor of secrets client {client} had failed.",
why_it_happened=f"User Error occurred. Exception was: {e}",
consequences="SECRETS_WRITE_PREPROCESS_SEQUENCE was interrupted, and "
"secrets won't be written by the SECRETS_DEFAULT_TYPE client.",
countermeasures="Handle the exception raised.",
)

for k, v in list(secrets.items()):
check_valid_secret(k, v, DataJobsServiceSecrets.__VALID_TYPES) # throws

self._secrets_service_client.write_secrets(
self._job_name, self._team_name, secrets
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
from copy import deepcopy
from typing import Dict

from vdk.api.plugin.plugin_input import ISecretsServiceClient

log = logging.getLogger(__name__)


class InMemSecretsServiceClient(ISecretsServiceClient):
"""
Implementation of IProperties that are kept only in memory.
"""

def __init__(self):
self._secrets = {}

def read_secrets(self, job_name: str, team_name: str) -> Dict:
res = deepcopy(self._secrets)
return res

def write_secrets(self, job_name: str, team_name: str, secrets: Dict) -> Dict:
log.warning(
"You are using In Memory Secrets client. "
"That means the secrets will not be persisted past the Data Job run."
)
self._secrets = deepcopy(secrets)
return self._secrets
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from vdk.api.plugin.hook_markers import hookimpl
from vdk.internal.builtin_plugins.job_secrets import secrets_config
from vdk.internal.builtin_plugins.job_secrets.inmemsecrets import (
InMemSecretsServiceClient,
)
from vdk.internal.builtin_plugins.run.job_context import JobContext
from vdk.internal.core.config import ConfigurationBuilder


class SecretsApiPlugin:
"""
Define the basic configuration needed for Secrets API.
"""

@hookimpl(tryfirst=True)
def vdk_configure(self, config_builder: ConfigurationBuilder) -> None:
secrets_config.add_definitions(config_builder)

@hookimpl
def initialize_job(self, context: JobContext) -> None:
context.secrets.set_secrets_factory_method("memory", InMemSecretsServiceClient)
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
from typing import List

from vdk.internal.core.config import Configuration
from vdk.internal.core.config import ConfigurationBuilder
from vdk.internal.util.utils import parse_config_sequence

SECRETS_DEFAULT_TYPE = "SECRETS_DEFAULT_TYPE"
SECRETS_WRITE_PREPROCESS_SEQUENCE = "SECRETS_WRITE_PREPROCESS_SEQUENCE"


class SecretsConfiguration:
def __init__(self, config: Configuration):
self.__config = config

def get_secrets_default_type(self) -> str:
return self.__config.get_value(SECRETS_DEFAULT_TYPE)

def get_secrets_write_preprocess_sequence(self) -> List[str]:
return parse_config_sequence(
self.__config, key=SECRETS_WRITE_PREPROCESS_SEQUENCE, sep=","
)


def add_definitions(config_builder: ConfigurationBuilder):
config_builder.add(
key=SECRETS_DEFAULT_TYPE,
default_value=None,
description="Set the default secrets type to use. "
"Plugins can register different secret types. "
"This option controls which is in use"
"It can be left empty, in which case "
"if there is only one type registered it will use it."
"Or it will use one register with type 'default' ",
)
config_builder.add(
key=SECRETS_WRITE_PREPROCESS_SEQUENCE,
default_value=None,
description="""A string of comma-separated secret types.
Those types are priorly registered in the ISecretsRegistry, by
mapping a factory for instantiating each ISecretsServiceClient
secrets type handler.
This comma-separated string value indicates the sequence in which those
ISecretsServiceClient implementations `write_secrets` method
will be invoked. For example:
SECRETS_WRITE_PREPROCESS_SEQUENCE="a-prefixed-secret,
replicated-secret"
would mean that the secrets data stored would be first
processed by the `a-prefixed-secret` client, then by the
`replicated-secret` client, and finally would be stored by
the default secret client.
In case of pre-processing failure, the default client won't be invoked.
""",
)
Loading

0 comments on commit 401386c

Please sign in to comment.