Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative implementation of the policy document parser #3246

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
290 changes: 158 additions & 132 deletions azurelinuxagent/ga/policy/policy_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
# Requires Python 2.4+ and Openssl 1.0+
#

import copy
import json
import re
import os
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common import logger
Expand All @@ -26,25 +26,10 @@
from azurelinuxagent.common.protocol.extensions_goal_state_from_vm_settings import _CaseFoldedDict
from azurelinuxagent.common.utils.flexible_version import FlexibleVersion


# Schema for policy file.
_POLICY_SCHEMA = \
{
"policyVersion": ustr,
"extensionPolicies": {
"allowListedExtensionsOnly": bool,
"signatureRequired": bool,
"extensions": {
"<extensionName>": {
"signatureRequired": bool
}
}
}
}

# Default policy values to be used when customer does not specify these attributes in the policy file.
_DEFAULT_ALLOW_LISTED_EXTENSIONS_ONLY = False
_DEFAULT_SIGNATURE_REQUIRED = False
_DEFAULT_EXTENSIONS = {}

# Agent supports up to this version of the policy file ("policyVersion" in schema).
# Increment this number when any new attributes are added to the policy schema.
Expand Down Expand Up @@ -72,25 +57,11 @@ class _PolicyEngine(object):
"""
def __init__(self):
# Set defaults for policy
self._policy = \
{
"policyVersion": _MAX_SUPPORTED_POLICY_VERSION,
"extensionPolicies": {
"allowListedExtensionsOnly": _DEFAULT_ALLOW_LISTED_EXTENSIONS_ONLY,
"signatureRequired": _DEFAULT_SIGNATURE_REQUIRED,
"extensions": {}
}
}

self._policy_enforcement_enabled = self.__get_policy_enforcement_enabled()
if not self.policy_enforcement_enabled:
return

# Use a copy of the policy as a template. Update the template as we parse the custom policy.
template = copy.deepcopy(self._policy)
custom_policy = self.__read_policy()
self.__parse_policy(template, custom_policy)
self._policy = template
self._policy = self._parse_policy(self.__read_policy())

@staticmethod
def _log_policy_event(msg, is_success=True, op=WALAEventOperation.Policy, send_event=True):
Expand Down Expand Up @@ -142,139 +113,194 @@ def __read_policy():
return custom_policy

@staticmethod
def __parse_policy(template, custom_policy):
def _parse_policy(policy):
"""
Update template with attributes specified in custom_policy:
- attributes provided in custom_policy override the default values in template
- if an attribute is not provided, use the default value
- if an unrecognized attribute is present in custom_policy (not defined in _POLICY_SCHEMA), raise an error
- if an attribute does not match the type specified in the schema, raise an error
Parses the given policy document and an equivalent document that has been populated with default values and verified for correctness, i.e.
that conforms the following schema:

{
"policyVersion": "0.1.0",
"extensionPolicies": {
"allowListedExtensionsOnly": <true, false>,
"signatureRequired": <true, false>,
"extensions": {
"<extension_name>": {
"signatureRequired": <true, false>
"runtimePolicy": {
<extension-specific policy>
}
}
},
}
}

Raises InvalidPolicyError if the policy document is invalid.
"""
# Validate top level attributes and then parse each section of the custom policy.
# Individual parsing functions are responsible for validating schema of that section (any nested dicts).
# Note that validation must happen before parsing.
_PolicyEngine.__validate_schema(custom_policy, _POLICY_SCHEMA)
_PolicyEngine.__parse_version(template, custom_policy)
_PolicyEngine.__parse_extension_policies(template, custom_policy)
if not isinstance(policy, dict):
raise InvalidPolicyError("expected an object describing a Policy; got {0}.".format(type(policy).__name__))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: policy doesn't need to be capitalized

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks; in that message, I'm trying to refer to the concept/type of "Policy", rather than a specific "policy" instance.

if that does not make sense, i can lowercase it


_PolicyEngine._check_attributes(policy, object_name="policy", valid_attributes=["policyVersion", "extensionPolicies"])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we get valid_attributes from a schema constant instead of hardcoding them here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i want them to be inline so that people can scan/read the code linearly, without having to jump to the definition of a constant to check what the valid attributes are


return {
"policyVersion": _PolicyEngine._parse_policy_version(policy),
"extensionPolicies": _PolicyEngine._parse_extension_policies(policy)
}

@staticmethod
def __parse_version(template, policy):
def _parse_policy_version(policy):
"""
Validate and return "policyVersion" attribute. If not a string in the format "x.y.z", raise InvalidPolicyError.
If policy_version is greater than maximum supported version, raise InvalidPolicyError.
"""
version = policy.get("policyVersion")
if version is None:
return
version = _PolicyEngine._get_string(policy, attribute="policyVersion")

try:
flexible_version = FlexibleVersion(version)
except ValueError:
raise InvalidPolicyError(
"invalid value for attribute 'policyVersion' attribute 'policyVersion' is expected to be in format 'major.minor.patch' "
"(e.g., '1.0.0'). Please change to a valid value.")
if not re.match(r"^\d+\.\d+\.\d+$", version):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also accept "1" or "1.0" as valid versions (I should have added a unit test for too).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the strict check since that is what the error message was specifically asking ('major.minor.patch'). Note, though, that this check should be done with a regex or similar, since FlexibleVersion accepts many, many other formats than 'major.minor.patch'.

"1", and "1.0" sound ok to me. If you want, you could add that after this change gets merged in

raise InvalidPolicyError("invalid value for attribute 'policyVersion'; it should be in format 'major.minor.patch' (e.g., '1.0.0')")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe include the provided version in the error message

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks; added


if FlexibleVersion(_MAX_SUPPORTED_POLICY_VERSION) < flexible_version:
raise InvalidPolicyError("policy version '{0}' is not supported. The agent supports policy versions up to '{1}'. Please provide a compatible policy version."
.format(version, _MAX_SUPPORTED_POLICY_VERSION))
if FlexibleVersion(_MAX_SUPPORTED_POLICY_VERSION) < FlexibleVersion(version):
raise InvalidPolicyError("policy version '{0}' is not supported. The agent supports policy versions up to '{1}'.".format(version, _MAX_SUPPORTED_POLICY_VERSION))

template["policyVersion"] = version
return version

@staticmethod
def __parse_extension_policies(template, policy):
extension_policies = policy.get("extensionPolicies")
if extension_policies is not None:
_PolicyEngine.__validate_schema(extension_policies, _POLICY_SCHEMA["extensionPolicies"], "extensionPolicies")
def _parse_extension_policies(policy):
"""
Parses the "extensionPolicies" attribute of the policy document. It should conform to the following schema:

# Parse allowlist policy
allowlist_policy = extension_policies.get("allowListedExtensionsOnly")
if allowlist_policy is not None:
template["extensionPolicies"]["allowListedExtensionsOnly"] = allowlist_policy
"extensionPolicies": {
"allowListedExtensionsOnly": <true, false>,
"signatureRequired": <true, false>,
"extensions": {
"<extension_name>": {
"signatureRequired": <true, false>
"runtimePolicy": {
<extension-specific policy>
}
}
},
}
"""
extension_policies = _PolicyEngine._get_dictionary(policy, attribute="extensionPolicies", optional=True, default={})

# Parse global signature policy
signature_policy = extension_policies.get("signatureRequired")
if signature_policy is not None:
template["extensionPolicies"]["signatureRequired"] = signature_policy
_PolicyEngine._check_attributes(extension_policies, object_name="extensionPolicies", valid_attributes=["allowListedExtensionsOnly", "signatureRequired", "extensions"])

# Parse individual extension policies
_PolicyEngine.__parse_extensions(template, extension_policies)
return {
"allowListedExtensionsOnly": _PolicyEngine._get_boolean(extension_policies, attribute="allowListedExtensionsOnly", name_prefix="extensionPolicies.", optional=True, default=_DEFAULT_ALLOW_LISTED_EXTENSIONS_ONLY),
"signatureRequired": _PolicyEngine._get_boolean(extension_policies, attribute="signatureRequired", name_prefix="extensionPolicies.", optional=True, default=_DEFAULT_SIGNATURE_REQUIRED),
"extensions": _PolicyEngine._parse_extensions(
_PolicyEngine._get_dictionary(extension_policies, attribute="extensions", name_prefix="extensionPolicies.", optional=True, default=_CaseFoldedDict.from_dict(_DEFAULT_EXTENSIONS))
)
}

@staticmethod
def __parse_extensions(template, extensions_policy):
def _parse_extensions(extensions):
"""
Parse "extensions" dict and update in template.
"extensions" is expected to be in the format:
{
"extensions": {
"<extensionName>": {
"signatureRequired": bool
}
Parses the "extensions" attribute. It should conform to the following schema:

"extensions": {
"<extensionName>": {
"signatureRequired": bool
"runtimePolicy": {
<extension-specific policy>
}
}
}

If "signatureRequired" isn't provided, the global "signatureRequired" value will be used instead.
The "extensions" attribute will be converted to a case-folded dict. CRP allows extensions to be any
case, so we use case-folded dict to allow for case-insensitive lookup of individual extension policies.
The return value is a case-folded dict. CRP allows extensions to be any case, so we allow for case-insensitive lookup of individual extension policies.
"""
extensions = extensions_policy.get("extensions")
if extensions is None:
return
parsed = _CaseFoldedDict.from_dict({})

parsed_extensions_dict = {}
for extension_name, individual_policy in extensions.items():
for extension, extension_policy in extensions.items():
if not isinstance(extension_policy, dict):
raise InvalidPolicyError("invalid type {0} for attribute '{1}', must be 'object'".format(type(extension_policy).__name__, extension))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: semicolon before "must" for consistency with the error message in get_value( )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks; fixed

parsed[extension] = _PolicyEngine._parse_extension(extension_policy)

# We don't validate "extensions" against the schema, because the attributes (individual extension names)
# are dynamic and not defined in the schema. We do validate that individual_policy is a dict, and validate
# the schema of individual_policy.
individual_policy_schema = _POLICY_SCHEMA["extensionPolicies"]["extensions"]["<extensionName>"]
if not isinstance(individual_policy, dict):
raise InvalidPolicyError("invalid type {0} for attribute '{1}', please change to object."
.format(type(individual_policy).__name__, extension_name))
_PolicyEngine.__validate_schema(individual_policy, individual_policy_schema, extension_name)
return parsed

extension_signature_policy = individual_policy.get("signatureRequired")
if extension_signature_policy is None:
extension_signature_policy = template["extensionPolicies"]["signatureRequired"]
@staticmethod
def _parse_extension(extension):
"""
Parses an individual extension. It should conform to the following schema:

parsed_extensions_dict[extension_name] = \
{
"signatureRequired": extension_signature_policy
}
"<extensionName>": {
"signatureRequired": bool
"runtimePolicy": {
<extension-specific policy>
}
}
"""
extension_attribute_name = "extensionPolicies.extensions.{0}".format(extension)

_PolicyEngine._check_attributes(extension, object_name=extension_attribute_name, valid_attributes=["signatureRequired", "runtimePolicy"])

return_value = {}

signature_required = _PolicyEngine._get_boolean(extension, attribute="signatureRequired", name_prefix=extension_attribute_name, optional=True, default=None)
if signature_required is not None:
return_value["signatureRequired"] = signature_required

# The runtimePolicy is an arbitrary object.
runtime_policy = extension.get("runtimePolicy")
if runtime_policy is not None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to the schema defined spec, runtimePolicy is an object with arbitrary attributes, so we should probably validate that the type is dict.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or if we want to accept list, string, etc., update the schema and spec accordingly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the extension policy is completely arbitrary and defined by the extension. spec needs to be updated

return_value["runtimePolicy"] = runtime_policy

# Convert "extensions" to a case-folded dict for case-insensitive lookup
case_folded_extensions_dict = _CaseFoldedDict.from_dict(parsed_extensions_dict)
template["extensionPolicies"]["extensions"] = case_folded_extensions_dict
return return_value

@staticmethod
def __validate_schema(policy, schema, section_name=None):
def _check_attributes(object_, object_name, valid_attributes):
"""
Validate the provided policy against the schema - we only do a shallow check (no recursion into nested dicts).
If there is an unrecognized attribute, raise an error.
Check that the given object, which should be a dictionary, has only the specified attributes.
If any other attributes are present, raise InvalidPolicyError.
The object_name is used in the error message.
"""
for key, value in policy.items():
if key not in schema:
raise InvalidPolicyError("attribute '{0}' is not defined in the policy schema. Please refer to the policy documentation "
"and change or remove this attribute accordingly.".format(key))

expected_type = schema.get(key)
if isinstance(expected_type, dict):
expected_type = dict
type_in_err_msg = {
dict: "object",
ustr: "string",
bool: "boolean"
}
for k in object_.keys():
if k not in valid_attributes:
raise InvalidPolicyError("invalid attribute '{0}' in {1}".format(k, object_name))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe "unrecognized" or "undefined" instead of "invalid" to make it clear that the attribute is not defined in the schema, as opposed to the type or format being invalid

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invalid: (of computer instructions, data, etc.) not conforming to the correct format or specifications

your comment made me doubt :)

but i can change it to "unknown" if it helps


@staticmethod
def _get_dictionary(object_, attribute, name_prefix="", optional=False, default=None):
"""
Returns object[attribute] if it exists, verifying that it is a dictionary, else returns default.
If optional is False and object[attribute] does not exist, raise InvalidPolicyError.
The name_prefix indicates the path of the attribute within the policy document and is used in the error message.
"""
return _PolicyEngine._get_value(object_, attribute, name_prefix, dict, "object", optional=optional, default=default)

if not isinstance(value, expected_type):
if section_name is None:
msg = ("invalid type {0} for attribute '{1}', please change to {2}."
.format(type(value).__name__, key, type_in_err_msg.get(expected_type)))
else:
msg = ("invalid type {0} for attribute '{1}' in section '{2}', please change to {3}."
.format(type(value).__name__, key, section_name,
type_in_err_msg.get(expected_type)))
@staticmethod
def _get_string(object_, attribute, name_prefix="", optional=False, default=None):
"""
Returns object[attribute] if it exists, verifying that it is a string, else returns default.
If optional is False and object[attribute] does not exist, raise InvalidPolicyError.
The name_prefix indicates the path of the attribute within the policy document and is used in the error message.
"""
return _PolicyEngine._get_value(object_, attribute, name_prefix, ustr, "string", optional=optional, default=default)

@staticmethod
def _get_boolean(object_, attribute, name_prefix="", optional=False, default=None):
"""
Returns object[attribute] if it exists, verifying that it is a boolean, else returns default.
If optional is False and object[attribute] does not exist, raise InvalidPolicyError.
The name_prefix indicates the path of the attribute within the policy document and is used in the error message.
"""
return _PolicyEngine._get_value(object_, attribute, name_prefix, bool, "boolean", optional=optional, default=default)

raise InvalidPolicyError(msg)
@staticmethod
def _get_value(object_, attribute, name_prefix, type_, type_name, optional, default):
"""
Returns object[attribute] if it exists, verifying that it is of the given type_, else returns default.
If optional is False and object[attribute] does not exist, raise InvalidPolicyError.
The name_prefix indicates the path of the attribute within the policy document, the type_name indicates a user-friendly name for type_; both are used in the error message.
"""
if default is not None and not optional:
raise ValueError("default value should only be provided for optional attributes")
value = object_.get(attribute)
if value is None:
if not optional:
raise InvalidPolicyError("missing required attribute '{0}{1}'".format(name_prefix, attribute))
return default
if not isinstance(value, type_):
raise InvalidPolicyError("invalid type {0} for attribute '{1}{2}'; must be '{3}'".format(type(value).__name__, name_prefix, attribute, type_name))
return value


class ExtensionPolicyEngine(_PolicyEngine):
Expand Down Expand Up @@ -311,7 +337,7 @@ def should_enforce_signature_validation(self, extension_to_check):

global_signature_required = self._policy.get("extensionPolicies").get("signatureRequired")
individual_policy = self._policy.get("extensionPolicies").get("extensions").get(extension_to_check.name)
if individual_policy is None:
if individual_policy is None or len(individual_policy) == 0:
return global_signature_required
else:
return individual_policy.get("signatureRequired")
Loading
Loading