-
Notifications
You must be signed in to change notification settings - Fork 226
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactor: Pipeline Management State Machine
Seperate out Pipeline Mananagement away from CodePipeline and remove the major bottlenecks of building each deployment map sequentially. Starts to isolate the CodeCOmmit repository and moves the source of deployment maps into an S3 Bucket. Sets the groundwork for future refactoring of Pipeline Management and moves towards enabling decentralised deployment maps.
- Loading branch information
Showing
31 changed files
with
1,604 additions
and
318 deletions.
There are no files selected for viewing
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
60 changes: 60 additions & 0 deletions
60
...ory/adf-bootstrap/deployment/lambda_codebase/pipeline_management/create_or_update_rule.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
""" | ||
Pipeline Management Lambda Function | ||
Creates or Updates an Event Rule for forwarding events | ||
If the source account != the Deplyment account | ||
""" | ||
|
||
import os | ||
import boto3 | ||
|
||
from cache import Cache | ||
from rule import Rule | ||
from logger import configure_logger | ||
from cloudwatch import ADFMetrics | ||
|
||
|
||
LOGGER = configure_logger(__name__) | ||
DEPLOYMENT_ACCOUNT_REGION = os.environ["AWS_REGION"] | ||
DEPLOYMENT_ACCOUNT_ID = os.environ["ACCOUNT_ID"] | ||
PIPELINE_MANAGEMENT_STATEMACHINE = os.getenv("PIPELINE_MANAGEMENT_STATEMACHINE_ARN") | ||
CLOUDWATCH = boto3.client("cloudwatch") | ||
METRICS = ADFMetrics(CLOUDWATCH, "PIPELINE_MANAGEMENT/RULE") | ||
|
||
_cache = None | ||
|
||
|
||
def lambda_handler(pipeline, _): | ||
"""Main Lambda Entry point""" | ||
|
||
# pylint: disable=W0603 | ||
# Global variable here to cache across lambda execution runtimes. | ||
global _cache | ||
if not _cache: | ||
_cache = Cache() | ||
METRICS.put_metric_data( | ||
{"MetricName": "CacheInitalised", "Value": 1, "Unit": "Count"} | ||
) | ||
|
||
LOGGER.info(pipeline) | ||
|
||
_source_account_id = ( | ||
pipeline.get("default_providers", {}) | ||
.get("source", {}) | ||
.get("properties", {}) | ||
.get("account_id", {}) | ||
) | ||
if ( | ||
_source_account_id | ||
and int(_source_account_id) != int(DEPLOYMENT_ACCOUNT_ID) | ||
and not _cache.check(_source_account_id) | ||
): | ||
rule = Rule(pipeline["default_providers"]["source"]["properties"]["account_id"]) | ||
rule.create_update() | ||
_cache.add( | ||
pipeline["default_providers"]["source"]["properties"]["account_id"], True | ||
) | ||
METRICS.put_metric_data( | ||
{"MetricName": "CreateOrUpdate", "Value": 1, "Unit": "Count"} | ||
) | ||
|
||
return pipeline |
53 changes: 53 additions & 0 deletions
53
...ository/adf-bootstrap/deployment/lambda_codebase/pipeline_management/create_repository.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
""" | ||
Pipeline Management Lambda Function | ||
Creates or Updates a CodeCommit Repository | ||
""" | ||
|
||
import os | ||
import boto3 | ||
from repo import Repo | ||
|
||
from logger import configure_logger | ||
from cloudwatch import ADFMetrics | ||
|
||
|
||
CLOUDWATCH = boto3.client("cloudwatch") | ||
METRICS = ADFMetrics(CLOUDWATCH, "PIPELINE_MANAGEMENT/REPO") | ||
|
||
|
||
LOGGER = configure_logger(__name__) | ||
DEPLOYMENT_ACCOUNT_REGION = os.environ["AWS_REGION"] | ||
DEPLOYMENT_ACCOUNT_ID = os.environ["ACCOUNT_ID"] | ||
|
||
|
||
def lambda_handler(pipeline, _): | ||
"""Main Lambda Entry point""" | ||
auto_create_repositories = "enabled" | ||
if auto_create_repositories == "enabled": | ||
code_account_id = ( | ||
pipeline.get("default_providers", {}) | ||
.get("source", {}) | ||
.get("properties", {}) | ||
.get("account_id", {}) | ||
) | ||
has_custom_repo = ( | ||
pipeline.get("default_providers", {}) | ||
.get("source", {}) | ||
.get("properties", {}) | ||
.get("repository", {}) | ||
) | ||
if ( | ||
auto_create_repositories | ||
and code_account_id | ||
and str(code_account_id).isdigit() | ||
and not has_custom_repo | ||
): | ||
repo = Repo( | ||
code_account_id, pipeline.get("name"), pipeline.get("description") | ||
) | ||
repo.create_update() | ||
METRICS.put_metric_data( | ||
{"MetricName": "CreateOrUpdate", "Value": 1, "Unit": "Count"} | ||
) | ||
|
||
return pipeline |
109 changes: 109 additions & 0 deletions
109
.../adf-bootstrap/deployment/lambda_codebase/pipeline_management/generate_pipeline_inputs.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
""" | ||
Pipeline Management Lambda Function | ||
Generates Pipeline Inputs | ||
""" | ||
|
||
import os | ||
import boto3 | ||
|
||
from pipeline import Pipeline | ||
from target import Target, TargetStructure | ||
from organizations import Organizations | ||
from parameter_store import ParameterStore | ||
from sts import STS | ||
from logger import configure_logger | ||
from partition import get_partition | ||
|
||
|
||
LOGGER = configure_logger(__name__) | ||
DEPLOYMENT_ACCOUNT_REGION = os.environ["AWS_REGION"] | ||
DEPLOYMENT_ACCOUNT_ID = os.environ["ACCOUNT_ID"] | ||
ROOT_ACCOUNT_ID = os.environ["ROOT_ACCOUNT_ID"] | ||
|
||
|
||
def store_regional_parameter_config(pipeline, parameter_store): | ||
""" | ||
Responsible for storing the region information for specific | ||
pipelines. These regions are defined in the deployment_map | ||
either as top level regions for a pipeline or stage specific regions | ||
""" | ||
if pipeline.top_level_regions: | ||
parameter_store.put_parameter( | ||
f"/deployment/{pipeline.name}/regions", | ||
str(list(set(pipeline.top_level_regions))), | ||
) | ||
return | ||
|
||
parameter_store.put_parameter( | ||
f"/deployment/{pipeline.name}/regions", | ||
str(list(set(Pipeline.flatten_list(pipeline.stage_regions)))), | ||
) | ||
|
||
|
||
def fetch_required_ssm_params(regions): | ||
output = {} | ||
for region in regions: | ||
parameter_store = ParameterStore(region, boto3) | ||
output[region] = { | ||
"s3": parameter_store.fetch_parameter( | ||
f"/cross_region/s3_regional_bucket/{region}" | ||
), | ||
"kms": parameter_store.fetch_parameter(f"/cross_region/kms_arn/{region}"), | ||
} | ||
if region == DEPLOYMENT_ACCOUNT_REGION: | ||
output[region]["modules"] = parameter_store.fetch_parameter( | ||
"deployment_account_bucket" | ||
) | ||
return output | ||
|
||
|
||
def generate_pipeline_inputs(pipeline, organizations, parameter_store): | ||
data = {} | ||
pipeline_object = Pipeline(pipeline) | ||
regions = [] | ||
for target in pipeline.get("targets", []): | ||
target_structure = TargetStructure(target) | ||
for step in target_structure.target: | ||
regions = step.get( | ||
"regions", pipeline.get("regions", DEPLOYMENT_ACCOUNT_REGION) | ||
) | ||
paths_tags = [] | ||
for path in step.get("path", []): | ||
paths_tags.append(path) | ||
if step.get("tags") is not None: | ||
paths_tags.append(step.get("tags", {})) | ||
for path_or_tag in paths_tags: | ||
pipeline_object.stage_regions.append(regions) | ||
pipeline_target = Target( | ||
path_or_tag, target_structure, organizations, step, regions | ||
) | ||
pipeline_target.fetch_accounts_for_target() | ||
pipeline_object.template_dictionary["targets"].append( | ||
target_structure.account_list | ||
) | ||
|
||
if DEPLOYMENT_ACCOUNT_REGION not in regions: | ||
pipeline_object.stage_regions.append(DEPLOYMENT_ACCOUNT_REGION) | ||
pipeline_object.generate_input() | ||
data["ssm_params"] = fetch_required_ssm_params( | ||
pipeline_object.input["regions"] or [DEPLOYMENT_ACCOUNT_REGION] | ||
) | ||
data["input"] = pipeline_object.input | ||
store_regional_parameter_config(pipeline_object, parameter_store) | ||
return data | ||
|
||
|
||
def lambda_handler(pipeline, _): | ||
"""Main Lambda Entry point""" | ||
parameter_store = ParameterStore(DEPLOYMENT_ACCOUNT_REGION, boto3) | ||
sts = STS() | ||
role = sts.assume_cross_account_role( | ||
f'arn:{get_partition(DEPLOYMENT_ACCOUNT_REGION)}:iam::{ROOT_ACCOUNT_ID}:role/{parameter_store.fetch_parameter("cross_account_access_role")}-readonly', | ||
"pipeline", | ||
) | ||
organizations = Organizations(role) | ||
|
||
output = generate_pipeline_inputs(pipeline, organizations, parameter_store) | ||
|
||
return output |
96 changes: 96 additions & 0 deletions
96
...ootstrap/deployment/lambda_codebase/pipeline_management/identify_out_of_date_pipelines.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
""" | ||
Pipeline Management Lambda Function | ||
Compares pipeline definitions in S3 to the definitions stored in SSM Param Store. | ||
Any that exist in param store but not S3 are marked for removal. | ||
""" | ||
|
||
import os | ||
import json | ||
import hashlib | ||
|
||
import boto3 | ||
|
||
from logger import configure_logger | ||
from deployment_map import DeploymentMap | ||
from parameter_store import ParameterStore | ||
|
||
|
||
LOGGER = configure_logger(__name__) | ||
S3_BUCKET_NAME = os.environ["S3_BUCKET_NAME"] | ||
DEPLOYMENT_ACCOUNT_ID = os.environ["ACCOUNT_ID"] | ||
ADF_PIPELINE_PREFIX = os.environ["ADF_PIPELINE_PREFIX"] | ||
DEPLOYMENT_ACCOUNT_REGION = os.environ["AWS_REGION"] | ||
|
||
|
||
def download_deployment_maps(resource, prefix, local): | ||
paginator = resource.meta.client.get_paginator("list_objects") | ||
for result in paginator.paginate( | ||
Bucket=S3_BUCKET_NAME, Delimiter="/", Prefix=prefix | ||
): | ||
LOGGER.info(result) | ||
for subdir in result.get("CommonPrefixes", []): | ||
download_deployment_maps(resource, subdir.get("Prefix"), local) | ||
for file in result.get("Contents", []): | ||
LOGGER.info(file) | ||
dest_path_name = os.path.join(local, file.get("Key")) | ||
if not os.path.exists(os.path.dirname(dest_path_name)): | ||
os.makedirs(os.path.dirname(dest_path_name)) | ||
resource.meta.client.download_file( | ||
S3_BUCKET_NAME, file.get("Key"), dest_path_name | ||
) | ||
|
||
|
||
def get_current_pipelines(parameter_store): | ||
return parameter_store.fetch_parameters_by_path("/deployment/") | ||
|
||
|
||
def identify_out_of_date_pipelines(pipeline_names, current_pipelines): | ||
return [ | ||
{"pipeline": f"{ADF_PIPELINE_PREFIX}{d}"} | ||
for d in current_pipelines.difference(pipeline_names) | ||
] | ||
|
||
|
||
def delete_ssm_params(out_of_date_pipelines, parameter_store): | ||
for pipeline in out_of_date_pipelines: | ||
print(pipeline) | ||
print(f"/deployment/{pipeline.get('pipeline')}/regions") | ||
parameter_store.delete_parameter( | ||
f"/deployment/{pipeline.get('pipeline').removeprefix(ADF_PIPELINE_PREFIX)}/regions" | ||
) | ||
|
||
|
||
def lambda_handler(event, _): | ||
output = event.copy() | ||
s3 = boto3.resource("s3") | ||
download_deployment_maps(s3, "", "/tmp") | ||
deployment_map = DeploymentMap( | ||
None, | ||
None, | ||
None, | ||
map_path="/tmp/deployment_map.yml", | ||
map_dir_path="/tmp/deployment_maps", | ||
) | ||
parameter_store = ParameterStore(DEPLOYMENT_ACCOUNT_REGION, boto3) | ||
current_pipelines = { | ||
parameter.get("Name").split("/")[-2] | ||
for parameter in get_current_pipelines(parameter_store) | ||
} | ||
|
||
pipeline_names = { | ||
p.get("name") for p in deployment_map.map_contents["pipelines"] | ||
} | ||
out_of_date_pipelines = identify_out_of_date_pipelines( | ||
pipeline_names, current_pipelines | ||
) | ||
delete_ssm_params(out_of_date_pipelines, parameter_store) | ||
|
||
output = {"pipelines_to_be_deleted": out_of_date_pipelines} | ||
data_md5 = hashlib.md5( | ||
json.dumps(output, sort_keys=True).encode("utf-8") | ||
).hexdigest() | ||
root_trace_id = os.getenv("_X_AMZN_TRACE_ID", "na=na;na=na").split(";")[0] | ||
output["traceroot"] = root_trace_id | ||
output["hash"] = data_md5 | ||
return output |
Oops, something went wrong.