Skip to content

Commit

Permalink
add cluster summary of cores and memory (#309)
Browse files Browse the repository at this point in the history
Avi-Robusta authored Jul 24, 2024
1 parent 2e8278c commit 50cb775
Showing 5 changed files with 61 additions and 3 deletions.
9 changes: 8 additions & 1 deletion robusta_krr/core/integrations/prometheus/loader.py
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
import datetime
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Optional, Dict, Any

from kubernetes import config as k8s_config
from kubernetes.client.api_client import ApiClient
@@ -89,6 +89,13 @@ async def load_pods(self, object: K8sObjectData, period: datetime.timedelta) ->
logger.exception(f"Failed to load pods for {object}: {e}")
return []

async def get_cluster_summary(self) -> Dict[str, Any]:
try:
return await self.loader.get_cluster_summary()
except Exception as e:
logger.exception(f"Failed to get cluster summary: {e}")
return {}

async def gather_data(
self,
object: K8sObjectData,
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import abc
import datetime
from concurrent.futures import ThreadPoolExecutor
from typing import List, Optional
from typing import List, Optional, Dict, Any

from kubernetes.client.api_client import ApiClient

@@ -36,6 +36,10 @@ def name(cls) -> str:
def get_cluster_names(self) -> Optional[List[str]]:
...

@abc.abstractmethod
async def get_cluster_summary(self) -> Dict[str, Any]:
...

@abc.abstractmethod
async def gather_data(
self,
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@
import logging
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from typing import Iterable, List, Optional
from typing import Iterable, List, Optional, Dict, Any

from kubernetes.client import ApiClient
from prometheus_api_client import PrometheusApiClientException
@@ -206,6 +206,45 @@ async def gather_data(

return data

async def get_cluster_summary(self) -> Dict[str, Any]:
cluster_label = self.get_prometheus_cluster_label()
memory_query = f"""
sum(max by (instance) (machine_memory_bytes{{ {cluster_label} }}))
"""
cpu_query = f"""
sum(max by (instance) (machine_cpu_cores{{ {cluster_label} }}))
"""

try:
cluster_memory_result = await self.query(memory_query)
cluster_cpu_result = await self.query(cpu_query)

# Verify that there is exactly one value in each result
if len(cluster_memory_result) != 1 or len(cluster_cpu_result) != 1:
logger.warning("Error: Expected exactly one result from Prometheus query.")
return {}

cluster_memory_value = cluster_memory_result[0].get("value")
cluster_cpu_value = cluster_cpu_result[0].get("value")

# Verify that the "value" list has exactly two elements (timestamp and value)
if not cluster_memory_value or not cluster_cpu_value:
logger.warning("Error: Missing value in Prometheus result.")
return {}

if len(cluster_memory_value) != 2 or len(cluster_cpu_value) != 2:
logger.warning("Error: Prometheus result values are not of expected size.")
return {}

return {
"cluster_memory": float(cluster_memory_value[1]),
"cluster_cpu": float(cluster_cpu_value[1])
}

except Exception as e:
logger.error(f"Exception occurred while getting cluster summary: {e}")
return {}

async def load_pods(self, object: K8sObjectData, period: timedelta) -> list[PodData]:
"""
List pods related to the object and add them to the object's pods list.
1 change: 1 addition & 0 deletions robusta_krr/core/models/result.py
Original file line number Diff line number Diff line change
@@ -65,6 +65,7 @@ class Result(pd.BaseModel):
description: Optional[str] = None
strategy: StrategyData
errors: list[dict[str, Any]] = pd.Field(default_factory=list)
clusterSummary: dict[str, Any] = {}
config: Optional[Config] = pd.Field(default_factory=Config.get_config)

def __init__(self, *args, **kwargs) -> None:
7 changes: 7 additions & 0 deletions robusta_krr/core/runner.py
Original file line number Diff line number Diff line change
@@ -286,6 +286,12 @@ async def _collect_result(self) -> Result:

with ProgressBar(title="Calculating Recommendation") as self.__progressbar:
workloads = await self._k8s_loader.list_scannable_objects(clusters)
if not clusters or len(clusters) == 1:
cluster_name = clusters[0] if clusters else None # its none if krr is running inside cluster
prometheus_loader = self._get_prometheus_loader(cluster_name)
cluster_summary = await prometheus_loader.get_cluster_summary()
else:
cluster_summary = {}
scans = await asyncio.gather(*[self._gather_object_allocations(k8s_object) for k8s_object in workloads])

successful_scans = [scan for scan in scans if scan is not None]
@@ -308,6 +314,7 @@ async def _collect_result(self) -> Result:
name=str(self._strategy).lower(),
settings=self._strategy.settings.dict(),
),
clusterSummary=cluster_summary
)

async def run(self) -> int:

0 comments on commit 50cb775

Please sign in to comment.