Skip to content

Commit

Permalink
Rework of deploytime codebase
Browse files Browse the repository at this point in the history
Move functions to the common openshift library.

Optimize querying for the parent objects such as ReplicaSet
and ReplicationController. With this implementation we relay on known
api versions and api objects directly from the pod.

Added cache to optimize queries to the OpenShift API for the
parent objects. For the running pods we do want to always have
the latest state, which means we can't do caching there.

Reworked extracting image SHAs from the running pod instances,
which is a base for the skopeo as we also get the full URI of the
image.

Signed-off-by: Michal Pryc <mpryc@redhat.com>
  • Loading branch information
mpryc committed May 15, 2023
1 parent 130d038 commit 20dbfe2
Show file tree
Hide file tree
Showing 6 changed files with 451 additions and 188 deletions.
185 changes: 32 additions & 153 deletions exporters/deploytime/app.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import logging
import re
import time
from typing import Iterable, Optional
from typing import Iterable

from attrs import field, frozen
from openshift.dynamic import DynamicClient, ResourceInstance
from openshift.dynamic.exceptions import ResourceNotFoundError
from openshift.dynamic import DynamicClient
from prometheus_client import start_http_server
from prometheus_client.core import REGISTRY, GaugeMetricFamily

Expand All @@ -14,8 +12,13 @@
from pelorus.config import load_and_log, no_env_vars
from pelorus.config.converters import comma_separated
from pelorus.timeutil import METRIC_TIMESTAMP_THRESHOLD_MINUTES, is_out_of_date

supported_replica_objects = {"ReplicaSet", "ReplicationController"}
from provider_common.openshift import (
filter_pods_by_replica_uid,
get_and_log_namespaces,
get_images_from_pod,
get_owner_object_from_child,
get_running_pods,
)


@frozen
Expand All @@ -30,7 +33,6 @@ def __attrs_post_init__(self):

def collect(self) -> Iterable[GaugeMetricFamily]:
logging.info("collect: start")

metrics = self.generate_metrics()

deploy_timestamp_metric = GaugeMetricFamily(
Expand All @@ -43,7 +45,7 @@ def collect(self) -> Iterable[GaugeMetricFamily]:

for m in metrics:
if not is_out_of_date(str(m.deploy_time_timestamp)):
logging.info(
logging.debug(
"Collected deploy_timestamp{namespace=%s, app=%s, image=%s} %s (%s)",
m.namespace,
m.name,
Expand All @@ -67,168 +69,45 @@ def collect(self) -> Iterable[GaugeMetricFamily]:
m.deploy_time_timestamp,
)
if number_of_dropped:
logging.debug(
logging.info(
"Number of deployments that are older then %smin and won't be collected: %s",
METRIC_TIMESTAMP_THRESHOLD_MINUTES,
number_of_dropped,
)
yield deploy_timestamp_metric

def get_and_log_namespaces(self) -> set[str]:
"""
Get the set of namespaces to watch, and log what they are.
They will be either:
1. The namespaces explicitly specified
2. The namespaces matched by PROD_LABEL
3. If neither namespaces nor the PROD_LABEL is given, then implicitly matches all namespaces.
"""
if self.namespaces:
logging.info("Watching namespaces %s", self.namespaces)
return self.namespaces

if self.prod_label:
logging.info(
"No namespaces specified, watching all namespaces with given PROD_LABEL (%s)",
self.prod_label,
)
query_args = dict(label_selector=self.prod_label)
else:
logging.info(
"No namespaces specified and no PROD_LABEL given, watching all namespaces."
)
query_args = dict()

all_namespaces = self.client.resources.get(api_version="v1", kind="Namespace")
namespaces = {
namespace.metadata.name
for namespace in all_namespaces.get(**query_args).items
}
logging.info("Watching namespaces %s", namespaces)
if not namespaces:
logging.warning(
"No NAMESPACES given and PROD_LABEL did not return any matching namespaces."
)
return namespaces

def generate_metrics(self) -> Iterable[DeployTimeMetric]:
namespaces = self.get_and_log_namespaces()
namespaces = get_and_log_namespaces(
self.client, self.namespaces, self.prod_label
)

if not namespaces:
return []

app_label = self.app_label
visited_replicas = set()

def already_seen(full_path: str) -> bool:
return full_path in visited_replicas

def mark_as_seen(full_path: str):
visited_replicas.add(full_path)

logging.info("generate_metrics: start")

# get all running Pods with the app label
v1_pods = self.client.resources.get(api_version="v1", kind="Pod")
pods = v1_pods.get(
label_selector=app_label, field_selector="status.phase=Running"
).items
pods = get_running_pods(self.client, namespaces, self.app_label)

replicas_dict = (
self.get_replicas("v1", "ReplicationController")
| self.get_replicas("apps/v1", "ReplicaSet")
| self.get_replicas("extensions/v1beta1", "ReplicaSet")
)
# Build dictionary with controllers and retrieved pods
replica_pods_dict = filter_pods_by_replica_uid(pods)

for pod in pods:
namespace = pod.metadata.namespace
owner_refs = pod.metadata.ownerReferences
if namespace not in namespaces or not owner_refs:
continue
for uid, pod in replica_pods_dict.items():
replicas = get_owner_object_from_child(self.client, uid, pod)

logging.debug(
"Getting Replicas for pod: %s in namespace: %s",
pod.metadata.name,
pod.metadata.namespace,
)
# Since a commit will be built into a particular image and there could be multiple
# containers (images) per pod, we will push one metric per image/container in the
# pod template
images = get_images_from_pod(pod)

# Get deploytime from the owning controller of the pod.
# We track all already-visited controllers to not duplicate metrics per-pod.
for ref in owner_refs:
full_path = f"{namespace}/{ref.name}"

if ref.kind not in supported_replica_objects or already_seen(full_path):
continue

logging.debug(
"Getting replica: %s, kind: %s, namespace: %s",
ref.name,
ref.kind,
namespace,
for sha in images.keys():
metric = DeployTimeMetric(
name=pod.metadata.labels[self.app_label],
namespace=pod.metadata.namespace,
labels=pod.metadata.labels,
deploy_time=replicas.get(uid).metadata.creationTimestamp,
image_sha=sha,
)

if not (rc := replicas_dict.get(full_path)):
continue

mark_as_seen(full_path)
container_shas = (
image_sha(container.image) for container in pod.spec.containers
)
container_status_shas = (
image_sha(status.imageID) for status in pod.status.containerStatuses
)
images = {sha for sha in container_shas if sha} | {
sha for sha in container_status_shas if sha
}

# Since a commit will be built into a particular image and there could be multiple
# containers (images) per pod, we will push one metric per image/container in the
# pod template
for sha in images:
metric = DeployTimeMetric(
name=rc.metadata.labels[app_label],
namespace=namespace,
labels=rc.metadata.labels,
deploy_time=rc.metadata.creationTimestamp,
image_sha=sha,
)
yield metric

def get_replicas(
self, apiVersion: str, objectName: str
) -> dict[str, ResourceInstance]:
"""Process Replicas for given Api Version and Object type (ReplicaSet or ReplicationController)"""
try:
apiResource = self.client.resources.get(
api_version=apiVersion, kind=objectName
)
replicationobjects = apiResource.get(label_selector=self.app_label)
return {
f"{replica.metadata.namespace}/{replica.metadata.name}": replica
for replica in replicationobjects.items
}
except ResourceNotFoundError:
logging.debug(
"API Object not found for version: %s object: %s",
apiVersion,
objectName,
)
return {}


def image_sha(image_url_or_id: str) -> Optional[str]:
"""
Gets the hash of the image, extracted from the image URL or image ID.
Specifically, everything after the first `sha256:` seen.
"""
sha_regex = re.compile(r"sha256:.*")
if match := sha_regex.search(image_url_or_id):
return match.group()
else:
# This may be noisy if there are a lot of pods where the container
# spec doesn't have a SHA but the status does.
# But since it's only in debug logs, it doesn't matter.
logging.debug("Skipping unresolved image reference: %s", image_url_or_id)
return None
yield metric


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 20dbfe2

Please sign in to comment.