Skip to content

Commit

Permalink
Merge pull request #41 from robusta-dev/pass-promql-to-result
Browse files Browse the repository at this point in the history
Add PromQL to json for Robusta UI
  • Loading branch information
LeaveMyYard authored May 22, 2023
2 parents 1f417d6 + 98d138d commit bdf27e1
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 44 deletions.
13 changes: 11 additions & 2 deletions robusta_krr/core/abstract/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class StrategySettings(pd.BaseModel):
history_duration: float = pd.Field(
24 * 7 * 2, ge=1, description="The duration of the history data to use (in hours)."
)
timeframe_duration: float = pd.Field(15, ge=1, description="The step for the history data (in minutes).")
timeframe_duration: float = pd.Field(2, ge=1, description="The step for the history data (in minutes).")

@property
def history_timedelta(self) -> datetime.timedelta:
Expand All @@ -43,7 +43,16 @@ def timeframe_timedelta(self) -> datetime.timedelta:
_StrategySettings = TypeVar("_StrategySettings", bound=StrategySettings)

ArrayNx2 = Annotated[NDArray[np.float64], Literal["N", 2]]
ResourceHistoryData = dict[str, ArrayNx2]


class ResourceHistoryData(pd.BaseModel):
query: str # The query used to get the data
data: dict[str, ArrayNx2] # Mapping: pod -> (time, value)

class Config:
arbitrary_types_allowed = True


HistoryData = dict[ResourceType, ResourceHistoryData]
RunResult = dict[ResourceType, ResourceRecommendation]

Expand Down
3 changes: 0 additions & 3 deletions robusta_krr/core/integrations/prometheus/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

from .metrics import BaseMetricLoader

import numpy as np
from numpy.typing import NDArray


class PrometheusDiscovery(ServiceDiscovery):
def find_prometheus_url(self, *, api_client: Optional[ApiClient] = None) -> Optional[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def get_target_name(series: PrometheusSeries) -> Optional[str]:
return series["metric"][label]
return None

# TODO: Rework this, as now our query can return multiple metrics for different pods
@staticmethod
def filter_prom_jobs_results(
series_list_result: list[PrometheusSeries],
Expand Down
35 changes: 16 additions & 19 deletions robusta_krr/core/integrations/prometheus/metrics/base_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, config: Config, prometheus: CustomPrometheusConnect) -> None:
self.prometheus = prometheus

@abc.abstractmethod
def get_query(self, namespace: str, pod: str, container: str) -> str:
def get_query(self, object: K8sObjectData) -> str:
...

async def query_prometheus(
Expand All @@ -41,28 +41,25 @@ async def query_prometheus(
async def load_data(
self, object: K8sObjectData, period: datetime.timedelta, step: datetime.timedelta
) -> ResourceHistoryData:
result = await asyncio.gather(
*[
self.query_prometheus(
query=self.get_query(object.namespace, pod.name, object.container),
start_time=datetime.datetime.now() - period,
end_time=datetime.datetime.now(),
step=step,
)
for pod in object.pods
]
query = self.get_query(object)
result = await self.query_prometheus(
query=query,
start_time=datetime.datetime.now() - period,
end_time=datetime.datetime.now(),
step=step,
)

if result == []:
self.warning(f"Prometheus returned no {self.__class__.__name__} metrics for {object}")
return {pod.name: np.array([]) for pod in object.pods}

pod_results = {pod: result[i] for i, pod in enumerate(object.pods)}
return {
pod.name: np.array([(timestamp, value) for timestamp, value in pod_result[0]["values"]], dtype=np.float64)
for pod, pod_result in pod_results.items()
if pod_result != []
}
return ResourceHistoryData(query=query, data={})

return ResourceHistoryData(
query=query,
data={
pod_result['metric']['pod']: np.array(pod_result["values"], dtype=np.float64)
for pod_result in result
},
)

@staticmethod
def get_by_resource(resource: str) -> type[BaseMetricLoader]:
Expand Down
12 changes: 10 additions & 2 deletions robusta_krr/core/integrations/prometheus/metrics/cpu_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,17 @@

from .base_metric import bind_metric
from .base_filtered_metric import BaseFilteredMetricLoader
from robusta_krr.core.models.objects import K8sObjectData


@bind_metric(ResourceType.CPU)
class CPUMetricLoader(BaseFilteredMetricLoader):
def get_query(self, namespace: str, pod: str, container: str) -> str:
return f'sum(irate(container_cpu_usage_seconds_total{{namespace="{namespace}", pod="{pod}", container="{container}"}}[5m])) by (container, pod, job)'
def get_query(self, object: K8sObjectData) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
return (
'sum(irate(container_cpu_usage_seconds_total{'
f'namespace="{object.namespace}", '
f'pod=~"{pods_selector}", '
f'container="{object.container}"'
'}[5m])) by (container, pod, job)'
)
12 changes: 10 additions & 2 deletions robusta_krr/core/integrations/prometheus/metrics/memory_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,17 @@

from .base_metric import bind_metric
from .base_filtered_metric import BaseFilteredMetricLoader
from robusta_krr.core.models.objects import K8sObjectData


@bind_metric(ResourceType.Memory)
class MemoryMetricLoader(BaseFilteredMetricLoader):
def get_query(self, namespace: str, pod: str, container: str) -> str:
return f'sum(container_memory_working_set_bytes{{image!="", namespace="{namespace}", pod="{pod}", container="{container}"}}) by (container, pod, job)'
def get_query(self, object: K8sObjectData) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
return (
'sum(container_memory_working_set_bytes{'
f'namespace="{object.namespace}", '
f'pod=~"{pods_selector}", '
f'container="{object.container}"'
'}) by (container, pod, job)'
)
10 changes: 7 additions & 3 deletions robusta_krr/core/models/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,17 @@ class ResourceRecommendation(pd.BaseModel):
limits: dict[ResourceType, RecommendationValue]


MetricsData = dict[ResourceType, str]


class ResourceScan(pd.BaseModel):
object: K8sObjectData
recommended: ResourceRecommendation
severity: Severity
metrics: MetricsData

@classmethod
def calculate(cls, object: K8sObjectData, recommendation: ResourceAllocations) -> ResourceScan:
def calculate(cls, object: K8sObjectData, recommendation: ResourceAllocations, metrics: MetricsData) -> ResourceScan:
recommendation_processed = ResourceRecommendation(requests={}, limits={})

for resource_type in ResourceType:
Expand All @@ -84,9 +88,9 @@ def calculate(cls, object: K8sObjectData, recommendation: ResourceAllocations) -
for selector in ["requests", "limits"]:
for recommendation_request in getattr(recommendation_processed, selector).values():
if recommendation_request.severity == severity:
return cls(object=object, recommended=recommendation_processed, severity=severity)
return cls(object=object, recommended=recommendation_processed, severity=severity, metrics=metrics)

return cls(object=object, recommended=recommendation_processed, severity=Severity.UNKNOWN)
return cls(object=object, recommended=recommendation_processed, severity=Severity.UNKNOWN, metrics=metrics)


class Result(pd.BaseModel):
Expand Down
25 changes: 14 additions & 11 deletions robusta_krr/core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from robusta_krr.core.integrations.prometheus import PrometheusLoader, PrometheusNotFound
from robusta_krr.core.models.config import Config
from robusta_krr.core.models.objects import K8sObjectData
from robusta_krr.core.models.result import ResourceAllocations, ResourceScan, ResourceType, Result
from robusta_krr.core.models.result import ResourceAllocations, ResourceScan, ResourceType, Result, MetricsData
from robusta_krr.utils.configurable import Configurable
from robusta_krr.utils.logo import ASCII_LOGO
from robusta_krr.utils.version import get_version
Expand Down Expand Up @@ -91,11 +91,11 @@ def _format_result(self, result: RunResult) -> RunResult:
for resource, recommendation in result.items()
}

async def _calculate_object_recommendations(self, object: K8sObjectData) -> RunResult:
async def _calculate_object_recommendations(self, object: K8sObjectData) -> tuple[RunResult, MetricsData]:
prometheus_loader = self._get_prometheus_loader(object.cluster)

if prometheus_loader is None:
return {resource: ResourceRecommendation.undefined() for resource in ResourceType}
return {resource: ResourceRecommendation.undefined() for resource in ResourceType}, {}

data_tuple = await asyncio.gather(
*[
Expand All @@ -109,23 +109,26 @@ async def _calculate_object_recommendations(self, object: K8sObjectData) -> RunR
]
)
data = dict(zip(ResourceType, data_tuple))
queries = {resource: data[resource].query for resource in ResourceType}

# NOTE: We run this in a threadpool as the strategy calculation might be CPU intensive
# But keep in mind that numpy calcluations will not block the GIL
result = await asyncio.to_thread(self._strategy.run, data, object)
return self._format_result(result)
return self._format_result(result), queries

async def _gather_objects_recommendations(self, objects: list[K8sObjectData]) -> list[ResourceAllocations]:
recommendations: list[RunResult] = await asyncio.gather(
async def _gather_objects_recommendations(self, objects: list[K8sObjectData]) -> list[tuple[ResourceAllocations, MetricsData]]:
recommendations: list[tuple[RunResult, MetricsData]] = await asyncio.gather(
*[self._calculate_object_recommendations(object) for object in objects]
)

return [
ResourceAllocations(
requests={resource: recommendation[resource].request for resource in ResourceType},
limits={resource: recommendation[resource].limit for resource in ResourceType},
(
ResourceAllocations(
requests={resource: recommendation[resource].request for resource in ResourceType},
limits={resource: recommendation[resource].limit for resource in ResourceType},
), metric
)
for recommendation in recommendations
for recommendation, metric in recommendations
]

async def _collect_result(self) -> Result:
Expand All @@ -144,7 +147,7 @@ async def _collect_result(self) -> Result:

return Result(
scans=[
ResourceScan.calculate(obj, recommended) for obj, recommended in zip(objects, resource_recommendations)
ResourceScan.calculate(obj, recommended, metrics) for obj, (recommended, metrics) in zip(objects, resource_recommendations)
],
description=self._strategy.description,
)
Expand Down
4 changes: 2 additions & 2 deletions robusta_krr/strategies/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class SimpleStrategy(BaseStrategy[SimpleStrategySettings]):
__rich_console__ = True

def run(self, history_data: HistoryData, object_data: K8sObjectData) -> RunResult:
cpu_usage = self.settings.calculate_cpu_proposal(history_data[ResourceType.CPU])
memory_usage = self.settings.calculate_memory_proposal(history_data[ResourceType.Memory])
cpu_usage = self.settings.calculate_cpu_proposal(history_data[ResourceType.CPU].data)
memory_usage = self.settings.calculate_memory_proposal(history_data[ResourceType.Memory].data)

return {
ResourceType.CPU: ResourceRecommendation(request=cpu_usage, limit=None),
Expand Down

0 comments on commit bdf27e1

Please sign in to comment.