Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: kibana 8.16.0 #3222

Merged
merged 3 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 129 additions & 43 deletions keep/providers/kibana_provider/kibana_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import datetime
import json
import uuid
from typing import Literal
from typing import Literal, Union
from urllib.parse import urlparse

import pydantic
import requests
from fastapi import HTTPException
from packaging.version import Version
from starlette.datastructures import FormData

from keep.api.models.alert import AlertDto, AlertSeverity, AlertStatus
from keep.contextmanager.contextmanager import ContextManager
Expand All @@ -37,14 +39,14 @@ class KibanaProviderAuthConfig:
"required": True,
"description": "Kibana Host",
"hint": "https://keep.kb.us-central1.gcp.cloud.es.io",
"validation": "any_http_url"
"validation": "any_http_url",
}
)
kibana_port: UrlPort = dataclasses.field(
metadata={
"required": False,
"description": "Kibana Port (defaults to 9243)",
"validation": "port"
"validation": "port",
},
default=9243,
)
Expand All @@ -64,6 +66,11 @@ class KibanaProvider(BaseProvider):
"id": "{{alert.id}}",
"fingerprint": "{{alert.id}}",
"url": "{{context.alertDetailsUrl}}",
"context.message": "{{context.message}}",
"context.hits": "{{context.hits}}",
"context.link": "{{context.link}}",
"context.query": "{{context.query}}",
"context.title": "{{context.title}}",
"context.cloud": "{{context.cloud}}",
"context.container": "{{context.container}}",
"context.group": "{{context.group}}",
Expand Down Expand Up @@ -153,12 +160,58 @@ def __init__(
super().__init__(context_manager, provider_id, config)

@staticmethod
def parse_event_raw_body(raw_body: bytes | dict) -> dict:
# tb: this is a f**king stupid hack because Kibana doesn't escape {{#toJson}} :(
if b'"payload": "{' in raw_body:
raw_body = raw_body.replace(b'"payload": "{', b'"payload": {')
raw_body = raw_body.replace(b'}",', b"},")
return json.loads(raw_body)
def parse_event_raw_body(raw_body: Union[bytes, dict, FormData]) -> dict:
"""
Parse the raw body from various input types into a dictionary.

Args:
raw_body: Can be bytes, dict, or FormData

Returns:
dict: Parsed event data

Raises:
ValueError: If the input type is not supported or parsing fails
"""
# Handle FormData
if hasattr(raw_body, "_list") and hasattr(
raw_body, "getlist"
): # Check if it's FormData
# Convert FormData to dict
form_dict = {}
for key, value in raw_body.items():
# Handle multiple values for the same key
existing_value = form_dict.get(key)
if existing_value is not None:
if isinstance(existing_value, list):
existing_value.append(value)
else:
form_dict[key] = [existing_value, value]
else:
form_dict[key] = value

# If there's a 'payload' field that's a string, try to parse it as JSON
if "payload" in form_dict and isinstance(form_dict["payload"], str):
try:
form_dict["payload"] = json.loads(form_dict["payload"])
except json.JSONDecodeError:
pass # Keep the original string if it's not valid JSON

return form_dict

# Handle bytes
if isinstance(raw_body, bytes):
# Handle the Kibana escape issue
if b'"payload": "{' in raw_body:
raw_body = raw_body.replace(b'"payload": "{', b'"payload": {')
raw_body = raw_body.replace(b'}",', b"},")
return json.loads(raw_body)

# Handle dict
if isinstance(raw_body, dict):
return raw_body

raise ValueError(f"Unsupported raw_body type: {type(raw_body)}")

def validate_scopes(self) -> dict[str, bool | str]:
"""
Expand Down Expand Up @@ -244,6 +297,17 @@ def __setup_webhook_alerts(self, tenant_id: str, keep_api_url: str, api_key: str
keep_api_url (str): The URL of the Keep API
api_key (str): The API key of the Keep API
"""
# Check kibana version
kibana_version = (
self.request("GET", "api/status").get("version", {}).get("number")
)
rule_types = self.request("GET", "api/alerting/rule_types")

rule_types = {rule_type["id"]: rule_type for rule_type in rule_types}
# if not version, assume < 8 for backwards compatibility
if not kibana_version:
kibana_version = "7.0.0"

# First get all existing connectors and check if we're already installed:
connectors = self.request("GET", "api/actions/connectors")
connector_name = f"keep-{tenant_id}"
Expand All @@ -265,7 +329,10 @@ def __setup_webhook_alerts(self, tenant_id: str, keep_api_url: str, api_key: str
# this means we already have a connector installed, so we just need to update it
config: dict = connector["config"]
config["url"] = keep_api_url
config["headers"] = {"X-API-KEY": api_key}
config["headers"] = {
"X-API-KEY": api_key,
"Content-Type": "application/json",
}
self.request(
"PUT",
f"api/actions/connector/{connector['id']}",
Expand All @@ -284,7 +351,10 @@ def __setup_webhook_alerts(self, tenant_id: str, keep_api_url: str, api_key: str
"method": "post",
"url": keep_api_url,
"authType": None,
"headers": {"X-API-KEY": api_key},
"headers": {
"X-API-KEY": api_key,
"Content-Type": "application/json",
},
},
"secrets": {},
"connector_type_id": ".webhook",
Expand All @@ -305,6 +375,13 @@ def __setup_webhook_alerts(self, tenant_id: str, keep_api_url: str, api_key: str
for alert_rule in alert_rules.get("data", []):
self.logger.info(f"Updating alert {alert_rule['id']}")
alert_actions = alert_rule.get("actions") or []

# kibana 8:
# pop any connector_type_id
if Version(kibana_version) > Version("8.0.0"):
for action in alert_actions:
action.pop("connector_type_id", None)

keep_action_exists = any(
iter(
[
Expand All @@ -318,18 +395,14 @@ def __setup_webhook_alerts(self, tenant_id: str, keep_api_url: str, api_key: str
# This alert was already modified by us / manually added
self.logger.info(f"Alert {alert_rule['id']} already updated, skipping")
continue
for status in ["Alert", "Recovered", "No Data"]:

action_groups = rule_types.get(alert_rule["rule_type_id"], {}).get(
"action_groups", []
)
for action_group in action_groups:
alert_actions.append(
{
"group": (
"custom_threshold.fired"
if status == "Alert"
else (
"recovered"
if status == "Recovered"
else "custom_threshold.nodata"
)
),
"group": action_group.get("id"),
"id": connector_id,
"params": {"body": KibanaProvider.WEBHOOK_PAYLOAD},
"frequency": {
Expand All @@ -340,6 +413,7 @@ def __setup_webhook_alerts(self, tenant_id: str, keep_api_url: str, api_key: str
"uuid": str(uuid.uuid4()),
}
)

try:
self.request(
"PUT",
Expand Down Expand Up @@ -443,10 +517,14 @@ def setup_webhook(

def validate_config(self):
if self.is_installed or self.is_provisioned:
host = self.config.authentication['kibana_host']
host = self.config.authentication["kibana_host"]
if not (host.startswith("http://") or host.startswith("https://")):
scheme = "http://" if ("localhost" in host or "127.0.0.1" in host) else "https://"
self.config.authentication['kibana_host'] = scheme + host
scheme = (
"http://"
if ("localhost" in host or "127.0.0.1" in host)
else "https://"
)
self.config.authentication["kibana_host"] = scheme + host

self.authentication_config = KibanaProviderAuthConfig(
**self.config.authentication
Expand Down Expand Up @@ -501,25 +579,22 @@ def _format_alert(
# If this is coming from Kibana Watcher
if "payload" in event:
return KibanaProvider.format_alert_from_watcher(event)
try:
labels = {
v.split("=", 1)[0]: v.split("=", 1)[1]
for v in event.get("ruleTags", "").split(",")
}
except Exception:
# Failed to extract labels from ruleTags
labels = {}

try:
labels.update(
{
v.split("=", 1)[0]: v.split("=", 1)[1]
for v in event.get("contextTags", "").split(",")
}
)
except Exception:
# Failed to enrich labels with contextTags
pass
labels = {}
tags = event.get("ruleTags", [])
for tag in tags:
# if the tag is a key=value pair
if "=" in tag:
key, value = tag.split("=", 1)
labels[key] = value

# same with contextTags
context_tags = event.get("contextTags", [])
for tag in context_tags:
# if the tag is a key=value pair
if "=" in tag:
key, value = tag.split("=", 1)
labels[key] = value

environment = labels.get("environment", "undefined")

Expand All @@ -530,8 +605,19 @@ def _format_alert(
event["severity"] = KibanaProvider.SEVERITIES_MAP.get(
event.get("severity"), AlertSeverity.INFO
)

if not event.get("url"):
event["url"] = event.get("ruleUrl")
# if still no url, popt it
if not event.get("url"):
event.pop("url", None)

return AlertDto(
environment=environment, labels=labels, source=["kibana"], **event
environment=environment,
labels=labels,
tags=tags,
source=["kibana"],
**event,
)


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "keep"
version = "0.35.5"
version = "0.35.6"
description = "Alerting. for developers, by developers."
authors = ["Keep Alerting LTD"]
packages = [{include = "keep"}]
Expand Down
Loading