Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): grafana connector #10891

Merged
merged 15 commits into from
Jul 15, 2024
Merged
2 changes: 2 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@
"flask-openid>=1.3.0",
"dask[dataframe]<2024.7.0",
},
"grafana": {"requests"},
"glue": aws_common,
# hdbcli is supported officially by SAP, sqlalchemy-hana is built on top but not officially supported
"hana": sql_common
Expand Down Expand Up @@ -634,6 +635,7 @@
"dynamodb = datahub.ingestion.source.dynamodb.dynamodb:DynamoDBSource",
"elasticsearch = datahub.ingestion.source.elastic_search:ElasticsearchSource",
"feast = datahub.ingestion.source.feast:FeastRepositorySource",
"grafana = datahub.ingestion.source.grafana.grafana_source:GrafanaSource",
"glue = datahub.ingestion.source.aws.glue:GlueSource",
"sagemaker = datahub.ingestion.source.aws.sagemaker:SagemakerSource",
"hana = datahub.ingestion.source.sql.hana:HanaSource",
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from typing import Iterable, List, Optional

import requests
from pydantic import Field, SecretStr

import datahub.emitter.mce_builder as builder
from datahub.configuration.source_common import PlatformInstanceConfigMixin
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
StatefulIngestionConfigBase,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionReport,
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import ChangeAuditStamps
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
DashboardSnapshot,
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import DashboardInfoClass


class GrafanaSourceConfig(StatefulIngestionConfigBase, PlatformInstanceConfigMixin):
url: str = Field(
default="",
description="Grafana URL in the format http://your-grafana-instance with no trailing slash",
)
service_account_token: SecretStr = Field(
description="Service account token for Grafana"
)


class GrafanaReport(StaleEntityRemovalSourceReport):
pass


@platform_name("Grafana")
@config_class(GrafanaSourceConfig)
@support_status(SupportStatus.TESTING)
class GrafanaSource(StatefulIngestionSourceBase):
"""
This is experimental source for Grafana. Not a lot of testing done yet.
It currently only ingests dashboards and nothing else. (not even charts)
"""

def __init__(self, config: GrafanaSourceConfig, ctx: PipelineContext):
super().__init__(config, ctx)
self.source_config = config
self.report = GrafanaReport()
self.platform = "grafana"

@classmethod
def create(cls, config_dict, ctx):
config = GrafanaSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
StaleEntityRemovalHandler.create(
self, self.source_config, self.ctx
).workunit_processor,
]

def get_report(self) -> StatefulIngestionReport:
return self.report

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
headers = {
"Authorization": f"Bearer {self.source_config.service_account_token.get_secret_value()}",
"Content-Type": "application/json",
}
try:
response = requests.get(
f"{self.source_config.url}/api/search", headers=headers
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

general best practice is to create a request.Session and then use that everywhere - should be ok here though since it only makes one request

response.raise_for_status()
except requests.exceptions.RequestException as e:
self.report.report_failure(f"Failed to fetch dashboards: {str(e)}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.report.report_failure(f"Failed to fetch dashboards: {str(e)}")
self.report.failure("Failed to fetch dashboards", exc=e)")

return
res_json = response.json()
for item in res_json:
_uid = item["uid"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the underscore prefix isn't necessary here

_title = item["title"]
_url = item["url"]
full_url = f"{self.source_config.url}{_url}"
_folder_id = item.get("folderId", None)
if _folder_id is not None:
dashboard_urn = builder.make_dashboard_urn(
platform=self.platform,
name=_uid,
platform_instance=self.source_config.platform_instance,
)
dash_snapshot = DashboardSnapshot(
urn=dashboard_urn,
aspects=[
DashboardInfoClass(
description="",
title=_title,
charts=[],
lastModified=ChangeAuditStamps(),
dashboardUrl=full_url,
customProperties={
"displayName": _title,
"id": str(item["id"]),
"uid": _uid,
"title": _title,
"uri": item["uri"],
"type": item["type"],
"folderId": str(item.get("folderId", None)),
"folderUid": item.get("folderUid", None),
"folderTitle": str(item.get("folderTitle", None)),
},
)
],
)
yield MetadataWorkUnit(
id=dashboard_urn,
mce=MetadataChangeEvent(proposedSnapshot=dash_snapshot),
)
Loading