Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support prom discovery #93

Closed
15 changes: 15 additions & 0 deletions robusta_krr/core/integrations/base_workload_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import abc
from typing import Optional

from robusta_krr.core.models.objects import K8sObjectData
from robusta_krr.utils.configurable import Configurable

class WorkloadLoader(Configurable, abc.ABC):

@abc.abstractmethod
async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8sObjectData]:
...

@abc.abstractmethod
async def list_clusters(self) -> Optional[list[str]]:
...
3 changes: 2 additions & 1 deletion robusta_krr/core/integrations/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from robusta_krr.core.models.objects import HPAData, K8sObjectData, PodData
from robusta_krr.core.models.result import ResourceAllocations
from robusta_krr.utils.configurable import Configurable
from .base_workload_loader import WorkloadLoader


from .rollout import RolloutAppsV1Api
Expand Down Expand Up @@ -341,7 +342,7 @@ async def __list_hpa(self) -> dict[HPAKey, HPAData]:
return await self.__list_hpa_v1()


class KubernetesLoader(Configurable):
class KubernetesLoader(WorkloadLoader):
async def list_clusters(self) -> Optional[list[str]]:
"""List all clusters.

Expand Down
133 changes: 133 additions & 0 deletions robusta_krr/core/integrations/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import asyncio
import itertools
from typing import Optional, List, Dict
from collections import defaultdict

from robusta_krr.core.models.config import Config
from robusta_krr.core.models.objects import K8sObjectData, PodData
from robusta_krr.core.models.result import ResourceAllocations, ResourceType, RecommendationValue
from .prometheus.loader import MetricsLoader
from .base_workload_loader import WorkloadLoader

class PrometheusLoader(WorkloadLoader):
def __init__(self, config: Config):
super().__init__(config)
self.metrics_loader = MetricsLoader(config)

async def list_clusters(self) -> Optional[list[str]]:
self.debug("Working in Prometheus-based workload discovery mode. Only support a single cluster")
return None

async def list_scannable_objects(self, clusters: Optional[list[str]]) -> list[K8sObjectData]:
"""List all scannable objects from Prometheus
In this workload discovery mode, clusters are not supported.

Returns:
A list of scannable objects.
"""
self.info(f"Listing scannable objects from Prometheus")
self.debug(f"Namespaces: {self.config.namespaces}")
try:
objects_tuple = await asyncio.gather(
self._list_deployments(),
)
except Exception as e:
self.error(f"Error trying to list pods from Prometheus: {e}")
self.debug_exception()
return []

objects = itertools.chain(*objects_tuple)
if self.config.namespaces == "*":
# NOTE: We are not scanning kube-system namespace by default
result = [obj for obj in objects if obj.namespace != "kube-system"]
else:
result = [obj for obj in objects if obj.namespace in self.config.namespaces]

namespaces = {obj.namespace for obj in result}
self.info(f"Found {len(result)} objects across {len(namespaces)} namespaces from Prometheus({self.config.prometheus_url})")

return result

async def __parse_allocation(self, namespace: str, pod_selector: str, container_name: str) -> ResourceAllocations:
limits = await self.metrics_loader.loader.query("avg by(resource) (kube_pod_container_resource_limits{"
f'namespace="{namespace}", '
f'pod=~"{pod_selector}", '
f'container="{container_name}"'
"})")
requests = await self.metrics_loader.loader.query("avg by(resource) (kube_pod_container_resource_requests{"
f'namespace="{namespace}", '
f'pod=~"{pod_selector}", '
f'container="{container_name}"'
"})")
requests_values: Dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None}
limits_values: Dict[ResourceType, RecommendationValue] = {ResourceType.CPU: None, ResourceType.Memory: None}
for limit in limits:
if limit['metric']['resource'] == ResourceType.CPU:
limits_values[ResourceType.CPU] = float(limit['value'][1])
elif limit['metric']['resource'] == ResourceType.Memory:
limits_values[ResourceType.Memory] = float(limit['value'][1])

for request in requests:
if request['metric']['resource'] == ResourceType.CPU:
requests_values[ResourceType.CPU] = float(request['value'][1])
elif request['metric']['resource'] == ResourceType.Memory:
requests_values[ResourceType.Memory] = float(request['value'][1])
return ResourceAllocations(requests=requests_values, limits=limits_values)


async def __build_from_owner(self, namespace: str, app_name: str, containers: List[str], pod_names: List[str]) -> List[K8sObjectData]:
return [
K8sObjectData(
cluster=None,
namespace=namespace,
name=app_name,
kind="Deployment",
container=container_name,
allocations=await self.__parse_allocation(namespace, "|".join(pod_names), container_name), # find
pods=[PodData(name=pod_name, deleted=False) for pod_name in pod_names], # list pods
)
for container_name in containers
]

async def _list_containers(self, namespace: str, pod_selector: str) -> List[str]:
containers = await self.metrics_loader.loader.query("count by (container) (kube_pod_container_info{"
f'namespace="{namespace}", '
f'pod=~"{pod_selector}"'
"})")
return [container['metric']['container'] for container in containers]

async def _list_containers_in_pods(self, app_name: str, pod_owner_kind: str, namespace: str, owner_name: str) -> list[K8sObjectData]:
if pod_owner_kind == "ReplicaSet":
# owner_name is ReplicaSet names
pods = await self.metrics_loader.loader.query("count by (owner_name, replicaset, pod) (kube_pod_owner{"
f'namespace="{namespace}", '
f'owner_name=~"{owner_name}", '
'owner_kind="ReplicaSet"})')
if pods is None or len(pods) == 0:
return [] # no container
# [{'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-jqt4x'}, 'value': [1685529217, '1']},
# {'metric': {'owner_name': 'wbjs-algorithm-base-565b645489', 'pod': 'wbjs-algorithm-base-565b645489-lj9qg'}, 'value': [1685529217, '1']}]
pod_names = [pod['metric']['pod'] for pod in pods]
container_names = await self._list_containers(namespace, "|".join(pod_names))
return await self.__build_from_owner(namespace, app_name, container_names, pod_names)
return []

async def _list_deployments(self) -> list[K8sObjectData]:
self.debug(f"Listing deployments in namespace({self.config.namespaces}) from Prometheus({self.config.prometheus_url})")
ns = "|".join(self.config.namespaces)
replicasets = await self.metrics_loader.loader.query("count by (namespace, owner_name, replicaset) (kube_replicaset_owner{"
f'namespace=~"{ns}", '
'owner_kind="Deployment"})')
# groupBy: 'ns/owner_name' => [{metadata}...]
pod_owner_kind = "ReplicaSet"
replicaset_dict = defaultdict(list)
for replicaset in replicasets:
replicaset_dict[replicaset['metric']['namespace'] + "/" + replicaset['metric']['owner_name']].append(replicaset['metric'])
objects = await asyncio.gather(
*[
self._list_containers_in_pods(replicas[0]['owner_name'], pod_owner_kind, replicas[0]['namespace'],
"|".join(list(map(lambda metric: metric['replicaset'], replicas))))
for replicas in replicaset_dict.values()
]
)
return list(itertools.chain(*objects))
4 changes: 3 additions & 1 deletion robusta_krr/core/integrations/prometheus/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@


class MetricsLoader(Configurable):
loader: MetricsService

def __init__(
self,
config: Config,
Expand All @@ -48,11 +50,11 @@ def __init__(
else None
)
loader = self.get_metrics_service(config, api_client=self.api_client, cluster=cluster)

if loader is None:
raise PrometheusNotFound("No Prometheus or metrics service found")

self.loader = loader

self.info(f"{self.loader.name()} connected successfully for {cluster or 'default'} cluster")

def get_metrics_service(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ def __init__(
@abc.abstractmethod
def check_connection(self):
...

@abc.abstractmethod
async def query(self, query: str) -> dict:
...

def name(self) -> str:
classname = self.__class__.__name__
Expand Down
1 change: 1 addition & 0 deletions robusta_krr/core/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class Config(pd.BaseSettings):
clusters: Union[list[str], Literal["*"], None] = None
kubeconfig: Optional[str] = None
namespaces: Union[list[str], Literal["*"]] = pd.Field("*")
discovery_method: Literal["api-server", "prometheus"] = pd.Field("api-server")
selector: Optional[str] = None

# Value settings
Expand Down
1 change: 1 addition & 0 deletions robusta_krr/core/models/objects.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional

import pydantic as pd
from typing import Optional

from robusta_krr.core.models.allocations import ResourceAllocations

Expand Down
21 changes: 13 additions & 8 deletions robusta_krr/core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from robusta_krr.core.abstract.strategies import ResourceRecommendation, RunResult
from robusta_krr.core.integrations.kubernetes import KubernetesLoader
from robusta_krr.core.integrations.prometheus import ClusterNotSpecifiedException, MetricsLoader, PrometheusNotFound
from robusta_krr.core.integrations.metrics import PrometheusLoader
from robusta_krr.core.models.config import Config
from robusta_krr.core.models.objects import K8sObjectData
from robusta_krr.core.models.result import (
Expand All @@ -21,13 +22,15 @@
from robusta_krr.utils.progress_bar import ProgressBar
from robusta_krr.utils.version import get_version


class Runner(Configurable):
EXPECTED_EXCEPTIONS = (KeyboardInterrupt, PrometheusNotFound)

def __init__(self, config: Config) -> None:
super().__init__(config)
self._k8s_loader = KubernetesLoader(self.config)
if config.discovery_method == "api-server":
self._workload_loader = KubernetesLoader(self.config)
else:
self._workload_loader = PrometheusLoader(self.config)
self._metrics_service_loaders: dict[Optional[str], Union[MetricsLoader, Exception]] = {}
self._metrics_service_loaders_error_logged: set[Exception] = set()
self._strategy = self.config.create_strategy()
Expand Down Expand Up @@ -152,16 +155,16 @@ async def _gather_objects_recommendations(
]

async def _collect_result(self) -> Result:
clusters = await self._k8s_loader.list_clusters()
if clusters and len(clusters) > 1 and self.config.prometheus_url:
clusters = await self._workload_loader.list_clusters()
if clusters is not None and len(clusters) > 1 and self.config.prometheus_url:
# this can only happen for multi-cluster querying a single centeralized prometheus
# In this scenario we dont yet support determining which metrics belong to which cluster so the reccomendation can be incorrect
raise ClusterNotSpecifiedException(
f"Cannot scan multiple clusters for this prometheus, Rerun with the flag `-c <cluster>` where <cluster> is one of {clusters}"
)

self.info(f'Using clusters: {clusters if clusters is not None else "inner cluster"}')
objects = await self._k8s_loader.list_scannable_objects(clusters)
objects = await self._workload_loader.list_scannable_objects(clusters)

if len(objects) == 0:
self.warning("Current filters resulted in no objects available to scan.")
Expand Down Expand Up @@ -194,9 +197,11 @@ async def run(self) -> None:
try:
self.config.load_kubeconfig()
except Exception as e:
self.error(f"Could not load kubernetes configuration: {e}")
self.error("Try to explicitly set --context and/or --kubeconfig flags.")
return
if self.config.prometheus_url is None:
self.error(f"Could not load kubernetes configuration: {e}")
self.error("Try to explicitly set --context and/or --kubeconfig flags.")
return
self.warning("Could not load kubernetes configuration, use Prometheus-based worload instead.")

try:
result = await self._collect_result()
Expand Down
5 changes: 5 additions & 0 deletions robusta_krr/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ def {func_name}(
help="List of namespaces to run on. By default, will run on all namespaces.",
rich_help_panel="Kubernetes Settings"
),
discovery_method: Optional[str] = typer.Option(
"api-server",
"--discovery-method",
help="Method to discover workload in the cluster.",
selector: Optional[str] = typer.Option(
None,
"--selector",
Expand Down Expand Up @@ -137,6 +141,7 @@ def {func_name}(
kubeconfig=kubeconfig,
clusters="*" if all_clusters else clusters,
namespaces="*" if "*" in namespaces else namespaces,
discovery_method=discovery_method,
selector=selector,
prometheus_url=prometheus_url,
prometheus_auth_header=prometheus_auth_header,
Expand Down