diff --git a/datahub-web-react/src/images/powerbilogo.png b/datahub-web-react/src/images/powerbilogo.png new file mode 100644 index 00000000000000..03519c2e3b0e3c Binary files /dev/null and b/datahub-web-react/src/images/powerbilogo.png differ diff --git a/docs/cli.md b/docs/cli.md index cc9fbaaedfde01..6bdc8fd0919c04 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -96,6 +96,7 @@ We use a plugin architecture so that you can install only the dependencies you a | [trino](../metadata-ingestion/source_docs/trino.md) | `pip install 'acryl-datahub[trino]` | Trino source | | [starburst-trino-usage](../metadata-ingestion/source_docs/trino.md) | `pip install 'acryl-datahub[starburst-trino-usage]'` | Starburst Trino usage statistics source | | [nifi](../metadata-ingestion/source_docs/nifi.md) | `pip install 'acryl-datahub[nifi]` | Nifi source | +| [powerbi](../metadata-ingestion/source_docs/powerbi.md) | `pip install 'acryl-datahub[powerbi]` | Microsoft Power BI source | ### Sinks diff --git a/metadata-ingestion/examples/mce_files/data_platforms.json b/metadata-ingestion/examples/mce_files/data_platforms.json index e5f5e4c29a886d..a6124d5cb5e27e 100644 --- a/metadata-ingestion/examples/mce_files/data_platforms.json +++ b/metadata-ingestion/examples/mce_files/data_platforms.json @@ -595,5 +595,25 @@ } }, "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DataPlatformSnapshot": { + "urn": "urn:li:dataPlatform:powerbi", + "aspects": [ + { + "com.linkedin.pegasus2avro.dataplatform.DataPlatformInfo": { + "datasetNameDelimiter": ".", + "name": "powerbi", + "displayName": "Power BI", + "type": "OTHERS", + "logoUrl": "/assets/platforms/powerbilogo.png" + } + } + ] + } + }, + "proposedDelta": null } ] diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index e089d610f93280..af7669d59fe860 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -99,6 +99,10 @@ def get_long_description(): "cryptography", } +microsoft_common = { + "msal==1.16.0" +} + data_lake_base = { *aws_common, "parse>=1.19.0", @@ -181,6 +185,7 @@ def get_long_description(): "trino": sql_common | {"trino"}, "starburst-trino-usage": sql_common | {"trino"}, "nifi": {"requests", "packaging"}, + "powerbi": {"orderedset"} | microsoft_common } all_exclude_plugins: Set[str] = { @@ -258,6 +263,7 @@ def get_long_description(): "trino", "hive", "starburst-trino-usage", + "powerbi" # airflow is added below ] for dependency in plugins[plugin] @@ -355,6 +361,7 @@ def get_long_description(): "trino = datahub.ingestion.source.sql.trino:TrinoSource", "starburst-trino-usage = datahub.ingestion.source.usage.starburst_trino_usage:TrinoUsageSource", "nifi = datahub.ingestion.source.nifi:NifiSource", + "powerbi = datahub.ingestion.source.powerbi:PowerBiDashboardSource", ], "datahub.ingestion.sink.plugins": [ "file = datahub.ingestion.sink.file:FileSink", diff --git a/metadata-ingestion/source_docs/powerbi.md b/metadata-ingestion/source_docs/powerbi.md new file mode 100644 index 00000000000000..eb4cecce8ee22a --- /dev/null +++ b/metadata-ingestion/source_docs/powerbi.md @@ -0,0 +1,88 @@ +# Power BI dashboards + +For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md). + +## Setup + +To install this plugin, run `pip install 'acryl-datahub[powerbi]'`. + +## Capabilities + +This plugin extracts the following: + +- Power BI dashboards, tiles, datasets +- Names, descriptions and URLs of dashboard and tile +- Owners of dashboards + +## Configuration Notes + +See the +1. [Microsoft AD App Creation doc](https://docs.microsoft.com/en-us/power-bi/developer/embedded/embed-service-principal) for the steps to create a app client ID and secret. +2. Login to Power BI as Admin and from `Tenant settings` allow below permissions. + - Allow service principles to use Power BI APIs + - Allow service principals to use read-only Power BI admin APIs + - Enhance admin APIs responses with detailed metadata + + +## Quickstart recipe + +Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options. + +For general pointers on writing and running a recipe, see our [main recipe guide](../README.md#recipes). + +```yml +source: + type: "powerbi" + config: + # Your Power BI tenant identifier + tenant_id: a949d688-67c0-4bf1-a344-e939411c6c0a + # Ingest elements of below PowerBi Workspace into Datahub + workspace_id: 4bd10256-e999-45dd-8e56-571c77153a5f + # Workspace's dataset environments (PROD, DEV, QA, STAGE) + env: DEV + # Azure AD App client identifier + client_id: foo + # Azure AD App client secret + client_secret: bar + # dataset_type_mapping is fixed mapping of Power BI datasources type to equivalent Datahub "data platform" dataset + dataset_type_mapping: + PostgreSql: postgres + Oracle: oracle + + +sink: + # sink configs +``` + +## Config details + +| Field | Required | Default | Description | +| ------------------------- | -------- | ----------------------- | ------------------------------------------------------------------------------------------------------------ | +| `tenant_id` | ✅ | | Power BI tenant identifier. | +| `workspace_id` | ✅ | | Power BI workspace identifier. | +| `env` | ✅ | | Environment to use in namespace when constructing URNs. | +| `client_id` | ✅ | | Azure AD App client identifier. | +| `client_secret` | ✅ | | Azure AD App client secret. +| `dataset_type_mapping` | ✅ | | Mapping of Power BI datasource type to Datahub dataset. +| `scan_timeout` | ✅ | 60 | time in seconds to wait for Power BI metadata scan result. + +## Concept mapping + +| Power BI | Datahub | +| ------------------------- | ------------------- | +| `Dashboard` | `Dashboard` | +| `Dataset, Datasource` | `Dataset` | +| `Tile` | `Chart` | +| `Report.webUrl` | `Chart.externalUrl` | +| `Workspace` | `N/A` | +| `Report` | `N/A` | + +If Tile is created from report then Chart.externalUrl is set to Report.webUrl. + +## Compatibility + +Coming soon! + +## Questions + +If you've got any questions on configuring this source, feel free to ping us on [our Slack](https://slack.datahubproject.io/)! diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi.py new file mode 100644 index 00000000000000..41e00ad14c85ba --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi.py @@ -0,0 +1,1378 @@ +######################################################### +# +# Meta Data Ingestion From the Power BI Source +# +######################################################### + +import logging +from dataclasses import dataclass +from dataclasses import field as dataclass_field +from enum import Enum +from time import sleep +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple +from xmlrpc.client import Boolean + +import msal +import requests +from orderedset import OrderedSet + +import datahub.emitter.mce_builder as builder +from datahub.configuration.common import AllowDenyPattern, ConfigurationError +from datahub.configuration.source_common import EnvBasedSourceConfigBase +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.metadata.com.linkedin.pegasus2avro.common import ( + ChangeAuditStamps, + DataPlatformInstance, +) +from datahub.metadata.schema_classes import ( + BrowsePathsClass, + ChangeTypeClass, + ChartInfoClass, + ChartKeyClass, + CorpUserInfoClass, + CorpUserKeyClass, + DashboardInfoClass, + DashboardKeyClass, + DataPlatformInfoClass, + DatasetKeyClass, + DatasetPropertiesClass, + OwnerClass, + OwnershipClass, + OwnershipTypeClass, + StatusClass, +) + +# Logger instance +LOGGER = logging.getLogger(__name__) + + +class Constant: + """ + keys used in powerbi plugin + """ + + PBIAccessToken = "PBIAccessToken" + DASHBOARD_LIST = "DASHBOARD_LIST" + TILE_LIST = "TILE_LIST" + DATASET_GET = "DATASET_GET" + REPORT_GET = "REPORT_GET" + DATASOURCE_GET = "DATASOURCE_GET" + TILE_GET = "TILE_GET" + ENTITY_USER_LIST = "ENTITY_USER_LIST" + SCAN_CREATE = "SCAN_CREATE" + SCAN_GET = "SCAN_GET" + SCAN_RESULT_GET = "SCAN_RESULT_GET" + Authorization = "Authorization" + WorkspaceId = "WorkspaceId" + DashboardId = "DashboardId" + DatasetId = "DatasetId" + ReportId = "ReportId" + SCAN_ID = "ScanId" + Dataset_URN = "DatasetURN" + CHART_URN = "ChartURN" + CHART = "chart" + CORP_USER = "corpuser" + CORP_USER_INFO = "corpUserInfo" + CORP_USER_KEY = "corpUserKey" + CHART_INFO = "chartInfo" + STATUS = "status" + CHART_ID = "powerbi.linkedin.com/charts/{}" + CHART_KEY = "chartKey" + DASHBOARD_ID = "powerbi.linkedin.com/dashboards/{}" + DASHBOARD = "dashboard" + DASHBOARD_KEY = "dashboardKey" + OWNERSHIP = "ownership" + BROWSERPATH = "browsePaths" + DASHBOARD_INFO = "dashboardInfo" + DATAPLATFORM_INSTANCE = "dataPlatformInstance" + DATASET = "dataset" + DATASET_ID = "powerbi.linkedin.com/datasets/{}" + DATASET_KEY = "datasetKey" + DATASET_PROPERTIES = "datasetProperties" + VALUE = "value" + ENTITY = "ENTITY" + ID = "ID" + + +class PowerBiAPIConfig(EnvBasedSourceConfigBase): + # Organsation Identifier + tenant_id: str + # PowerBi workspace identifier + workspace_id: str + # Dataset type mapping + dataset_type_mapping: Dict[str, str] + # Azure app client identifier + client_id: str + # Azure app client secret + client_secret: str + # timeout for meta-data scanning + scan_timeout: int = 60 + + scope: str = "https://analysis.windows.net/powerbi/api/.default" + base_url: str = "https://api.powerbi.com/v1.0/myorg/groups" + admin_base_url = "https://api.powerbi.com/v1.0/myorg/admin" + authority = "https://login.microsoftonline.com/" + + def get_authority_url(self): + return "{}{}".format(self.authority, self.tenant_id) + + +class PowerBiDashboardSourceConfig(PowerBiAPIConfig): + platform_name: str = "powerbi" + platform_urn: str = builder.make_data_platform_urn(platform=platform_name) + dashboard_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() + chart_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() + + +class PowerBiAPI: + # API endpoints of PowerBi to fetch dashboards, tiles, datasets + API_ENDPOINTS = { + Constant.DASHBOARD_LIST: "{POWERBI_BASE_URL}/{WORKSPACE_ID}/dashboards", + Constant.ENTITY_USER_LIST: "{POWERBI_ADMIN_BASE_URL}/{ENTITY}/{ENTITY_ID}/users", + Constant.TILE_LIST: "{POWERBI_BASE_URL}/{WORKSPACE_ID}/dashboards/{DASHBOARD_ID}/tiles", + Constant.DATASET_GET: "{POWERBI_BASE_URL}/{WORKSPACE_ID}/datasets/{DATASET_ID}", + Constant.DATASOURCE_GET: "{POWERBI_BASE_URL}/{WORKSPACE_ID}/datasets/{DATASET_ID}/datasources", + Constant.REPORT_GET: "{POWERBI_BASE_URL}/{WORKSPACE_ID}/reports/{REPORT_ID}", + Constant.SCAN_GET: "{POWERBI_ADMIN_BASE_URL}/workspaces/scanStatus/{SCAN_ID}", + Constant.SCAN_RESULT_GET: "{POWERBI_ADMIN_BASE_URL}/workspaces/scanResult/{SCAN_ID}", + Constant.SCAN_CREATE: "{POWERBI_ADMIN_BASE_URL}/workspaces/getInfo", + } + + @dataclass + class Workspace: + """ + PowerBi Workspace + """ + + id: str + name: str + state: str + dashboards: List[Any] + datasets: Dict + + @dataclass + class DataSource: + """ + PowerBi + """ + + @dataclass + class MetaData: + """ + MetaData about DataSource + """ + + is_relational: Boolean + + id: str + type: str + database: Optional[str] + server: Optional[str] + metadata: Any + + def __members(self): + return (self.id,) + + def __eq__(self, instance): + return ( + isinstance(instance, PowerBiAPI.DataSource) + and self.__members() == instance.__members() + ) + + def __hash__(self): + return hash(self.__members()) + + # dataclasses for PowerBi Dashboard + @dataclass + class Dataset: + @dataclass + class Table: + name: str + schema_name: str + + id: str + name: str + webUrl: Optional[str] + workspace_id: str + datasource: Any + # Table in datasets + tables: List[Any] + + def get_urn_part(self): + return "datasets.{}".format(self.id) + + def __members(self): + return (self.id,) + + def __eq__(self, instance): + return ( + isinstance(instance, PowerBiAPI.Dataset) + and self.__members() == instance.__members() + ) + + def __hash__(self): + return hash(self.__members()) + + @dataclass + class Report: + id: str + name: str + webUrl: str + embedUrl: str + dataset: Any + + def get_urn_part(self): + return "reports.{}".format(self.id) + + @dataclass + class Tile: + class CreatedFrom(Enum): + REPORT = "Report" + DATASET = "Dataset" + VISUALIZATION = "Visualization" + UNKNOWN = "UNKNOWN" + + id: str + title: str + embedUrl: str + dataset: Optional[Any] + report: Optional[Any] + createdFrom: CreatedFrom + + def get_urn_part(self): + return "charts.{}".format(self.id) + + @dataclass + class User: + id: str + displayName: str + emailAddress: str + dashboardUserAccessRight: str + graphId: str + principalType: str + + def get_urn_part(self): + return "users.{}".format(self.id) + + def __members(self): + return (self.id,) + + def __eq__(self, instance): + return ( + isinstance(instance, PowerBiAPI.User) + and self.__members() == instance.__members() + ) + + def __hash__(self): + return hash(self.__members()) + + @dataclass + class Dashboard: + id: str + displayName: str + embedUrl: str + webUrl: str + isReadOnly: Any + workspace_id: str + workspace_name: str + tiles: List[Any] + users: List[Any] + + def get_urn_part(self): + return "dashboards.{}".format(self.id) + + def __members(self): + return (self.id,) + + def __eq__(self, instance): + return ( + isinstance(instance, PowerBiAPI.Dashboard) + and self.__members() == instance.__members() + ) + + def __hash__(self): + return hash(self.__members()) + + def __init__(self, config: PowerBiAPIConfig) -> None: + self.__config: PowerBiAPIConfig = config + self.__access_token: str = "" + + # Power-Bi Auth (Service Principal Auth) + self.__msal_client = msal.ConfidentialClientApplication( + self.__config.client_id, + client_credential=self.__config.client_secret, + authority=self.__config.authority + self.__config.tenant_id, + ) + + # Test connection by generating a access token + LOGGER.info("Trying to connect to {}".format(self.__config.get_authority_url())) + self.get_access_token() + LOGGER.info("Able to connect to {}".format(self.__config.get_authority_url())) + + def __get_users(self, workspace_id: str, entity: str, id: str) -> List[User]: + """ + Get user for the given PowerBi entity + """ + user_list_endpoint: str = PowerBiAPI.API_ENDPOINTS[Constant.ENTITY_USER_LIST] + # Replace place holders + user_list_endpoint = user_list_endpoint.format( + POWERBI_ADMIN_BASE_URL=self.__config.admin_base_url, + ENTITY=entity, + ENTITY_ID=id, + ) + # Hit PowerBi + LOGGER.info("Request to URL={}".format(user_list_endpoint)) + response = requests.get( + url=user_list_endpoint, + headers={Constant.Authorization: self.get_access_token()}, + ) + + # Check if we got response from PowerBi + if response.status_code != 200: + LOGGER.warning( + "Failed to fetch user list from power-bi for, http_status={}, message={}".format( + response.status_code, response.text + ) + ) + LOGGER.info("{}={}".format(Constant.WorkspaceId, workspace_id)) + LOGGER.info("{}={}".format(Constant.ENTITY, entity)) + LOGGER.info("{}={}".format(Constant.ID, id)) + raise ConnectionError("Failed to fetch the user list from the power-bi") + + users_dict: List[Any] = response.json()[Constant.VALUE] + + # Iterate through response and create a list of PowerBiAPI.Dashboard + users: List[PowerBiAPI.User] = [ + PowerBiAPI.User( + id=instance.get("identifier"), + displayName=instance.get("displayName"), + emailAddress=instance.get("emailAddress"), + dashboardUserAccessRight=instance.get("datasetUserAccessRight"), + graphId=instance.get("graphId"), + principalType=instance.get("principalType"), + ) + for instance in users_dict + ] + + return users + + def __get_report(self, workspace_id: str, report_id: str) -> Any: + """ + Fetch the dataset from PowerBi for the given dataset identifier + """ + if workspace_id is None or report_id is None: + LOGGER.info("Input values are None") + LOGGER.info("{}={}".format(Constant.WorkspaceId, workspace_id)) + LOGGER.info("{}={}".format(Constant.ReportId, report_id)) + return None + + report_get_endpoint: str = PowerBiAPI.API_ENDPOINTS[Constant.REPORT_GET] + # Replace place holders + report_get_endpoint = report_get_endpoint.format( + POWERBI_BASE_URL=self.__config.base_url, + WORKSPACE_ID=workspace_id, + REPORT_ID=report_id, + ) + # Hit PowerBi + LOGGER.info("Request to report URL={}".format(report_get_endpoint)) + response = requests.get( + url=report_get_endpoint, + headers={Constant.Authorization: self.get_access_token()}, + ) + + # Check if we got response from PowerBi + if response.status_code != 200: + message: str = "Failed to fetch report from power-bi for" + LOGGER.warning(message) + LOGGER.warning("{}={}".format(Constant.WorkspaceId, workspace_id)) + LOGGER.warning("{}={}".format(Constant.ReportId, report_id)) + raise ConnectionError(message) + + response_dict = response.json() + + return PowerBiAPI.Report( + id=response_dict.get("id"), + name=response_dict.get("name"), + webUrl=response_dict.get("webUrl"), + embedUrl=response_dict.get("embedUrl"), + dataset=self.get_dataset( + workspace_id=workspace_id, dataset_id=response_dict.get("datasetId") + ), + ) + + def get_access_token(self): + if self.__access_token != "": + LOGGER.info("Returning the cached access token") + return self.__access_token + + LOGGER.info("Generating PowerBi access token") + + auth_response = self.__msal_client.acquire_token_for_client( + scopes=[self.__config.scope] + ) + + if not auth_response.get("access_token"): + LOGGER.warn( + "Failed to generate the PowerBi access token. Please check input configuration" + ) + raise ConfigurationError( + "Powerbi authorization failed . Please check your input configuration." + ) + + LOGGER.info("Generated PowerBi access token") + + self.__access_token = "Bearer {}".format(auth_response.get("access_token")) + + LOGGER.debug("{}={}".format(Constant.PBIAccessToken, self.__access_token)) + + return self.__access_token + + def get_dashboard_users(self, dashboard: Dashboard) -> List[User]: + """ + Return list of dashboard users + """ + return self.__get_users( + workspace_id=dashboard.workspace_id, entity="dashboards", id=dashboard.id + ) + + def get_dashboards(self, workspace: Workspace) -> List[Dashboard]: + """ + Get the list of dashboard from PowerBi for the given workspace identifier + + TODO: Pagination. As per REST API doc (https://docs.microsoft.com/en-us/rest/api/power-bi/dashboards/get-dashboards), there is no information available on pagination + """ + dashboard_list_endpoint: str = PowerBiAPI.API_ENDPOINTS[Constant.DASHBOARD_LIST] + # Replace place holders + dashboard_list_endpoint = dashboard_list_endpoint.format( + POWERBI_BASE_URL=self.__config.base_url, WORKSPACE_ID=workspace.id + ) + # Hit PowerBi + LOGGER.info("Request to URL={}".format(dashboard_list_endpoint)) + response = requests.get( + url=dashboard_list_endpoint, + headers={Constant.Authorization: self.get_access_token()}, + ) + + # Check if we got response from PowerBi + if response.status_code != 200: + LOGGER.warning("Failed to fetch dashboard list from power-bi for") + LOGGER.warning("{}={}".format(Constant.WorkspaceId, workspace.id)) + raise ConnectionError( + "Failed to fetch the dashboard list from the power-bi" + ) + + dashboards_dict: List[Any] = response.json()[Constant.VALUE] + + # Iterate through response and create a list of PowerBiAPI.Dashboard + dashboards: List[PowerBiAPI.Dashboard] = [ + PowerBiAPI.Dashboard( + id=instance.get("id"), + isReadOnly=instance.get("isReadOnly"), + displayName=instance.get("displayName"), + embedUrl=instance.get("embedUrl"), + webUrl=instance.get("webUrl"), + workspace_id=workspace.id, + workspace_name=workspace.name, + tiles=[], + users=[], + ) + for instance in dashboards_dict + if instance is not None + ] + + return dashboards + + def get_dataset(self, workspace_id: str, dataset_id: str) -> Any: + """ + Fetch the dataset from PowerBi for the given dataset identifier + """ + if workspace_id is None or dataset_id is None: + LOGGER.info("Input values are None") + LOGGER.info("{}={}".format(Constant.WorkspaceId, workspace_id)) + LOGGER.info("{}={}".format(Constant.DatasetId, dataset_id)) + return None + + dataset_get_endpoint: str = PowerBiAPI.API_ENDPOINTS[Constant.DATASET_GET] + # Replace place holders + dataset_get_endpoint = dataset_get_endpoint.format( + POWERBI_BASE_URL=self.__config.base_url, + WORKSPACE_ID=workspace_id, + DATASET_ID=dataset_id, + ) + # Hit PowerBi + LOGGER.info("Request to dataset URL={}".format(dataset_get_endpoint)) + response = requests.get( + url=dataset_get_endpoint, + headers={Constant.Authorization: self.get_access_token()}, + ) + + # Check if we got response from PowerBi + if response.status_code != 200: + message: str = "Failed to fetch dataset from power-bi for" + LOGGER.warning(message) + LOGGER.warning("{}={}".format(Constant.WorkspaceId, workspace_id)) + LOGGER.warning("{}={}".format(Constant.DatasetId, dataset_id)) + raise ConnectionError(message) + + response_dict = response.json() + + # PowerBi Always return the webURL, in-case if it is None then setting complete webURL to None instead of None/details + return PowerBiAPI.Dataset( + id=response_dict.get("id"), + name=response_dict.get("name"), + webUrl="{}/details".format(response_dict.get("webUrl")) + if response_dict.get("webUrl") is not None + else None, + workspace_id=workspace_id, + tables=[], + datasource=None, + ) + + def get_data_source(self, dataset: Dataset) -> Any: + """ + Fetch the data source from PowerBi for the given dataset + """ + + datasource_get_endpoint: str = PowerBiAPI.API_ENDPOINTS[Constant.DATASOURCE_GET] + # Replace place holders + datasource_get_endpoint = datasource_get_endpoint.format( + POWERBI_BASE_URL=self.__config.base_url, + WORKSPACE_ID=dataset.workspace_id, + DATASET_ID=dataset.id, + ) + # Hit PowerBi + LOGGER.info("Request to datasource URL={}".format(datasource_get_endpoint)) + response = requests.get( + url=datasource_get_endpoint, + headers={Constant.Authorization: self.get_access_token()}, + ) + + # Check if we got response from PowerBi + if response.status_code != 200: + message: str = "Failed to fetch datasource from power-bi for" + LOGGER.warning(message) + LOGGER.warning("{}={}".format(Constant.WorkspaceId, dataset.workspace_id)) + LOGGER.warning("{}={}".format(Constant.DatasetId, dataset.id)) + raise ConnectionError(message) + + res = response.json() + value = res["value"] + if len(value) == 0: + LOGGER.info( + "datasource is not found for dataset {}({})".format( + dataset.name, dataset.id + ) + ) + return None + # Consider only zero index datasource + datasource_dict = value[0] + + # Create datasource instance with basic detail available + datasource = PowerBiAPI.DataSource( + id=datasource_dict["datasourceId"], + type=datasource_dict["datasourceType"], + server=None, + database=None, + metadata=None, + ) + + # Check if datasource is relational as per our relation mapping + if self.__config.dataset_type_mapping.get(datasource.type) is not None: + # Now set the database detail as it is relational data source + datasource.metadata = PowerBiAPI.DataSource.MetaData(is_relational=True) + datasource.database = datasource_dict["connectionDetails"]["database"] + datasource.server = datasource_dict["connectionDetails"]["server"] + else: + datasource.metadata = PowerBiAPI.DataSource.MetaData(is_relational=False) + + return datasource + + def get_tiles(self, workspace: Workspace, dashboard: Dashboard) -> List[Tile]: + + """ + Get the list of tiles from PowerBi for the given workspace identifier + + TODO: Pagination. As per REST API doc (https://docs.microsoft.com/en-us/rest/api/power-bi/dashboards/get-tiles), there is no information available on pagination + """ + + def new_dataset_or_report(tile_instance: Any) -> dict: + """ + Find out which is the data source for tile. It is either REPORT or DATASET + """ + report_fields = { + "dataset": None, + "report": None, + "createdFrom": PowerBiAPI.Tile.CreatedFrom.UNKNOWN, + } + + report_fields["dataset"] = ( + workspace.datasets[tile_instance.get("datasetId")] + if tile_instance.get("datasetId") is not None + else None + ) + report_fields["report"] = ( + self.__get_report( + workspace_id=workspace.id, + report_id=tile_instance.get("reportId"), + ) + if tile_instance.get("reportId") is not None + else None + ) + + # Tile is either created from report or dataset or from custom visualization + report_fields["createdFrom"] = PowerBiAPI.Tile.CreatedFrom.UNKNOWN + if report_fields["report"] is not None: + report_fields["createdFrom"] = PowerBiAPI.Tile.CreatedFrom.REPORT + elif report_fields["dataset"] is not None: + report_fields["createdFrom"] = PowerBiAPI.Tile.CreatedFrom.DATASET + else: + report_fields["createdFrom"] = PowerBiAPI.Tile.CreatedFrom.VISUALIZATION + + LOGGER.info( + "Tile {}({}) is created from {}".format( + tile_instance.get("title"), + tile_instance.get("id"), + report_fields["createdFrom"], + ) + ) + + return report_fields + + tile_list_endpoint: str = PowerBiAPI.API_ENDPOINTS[Constant.TILE_LIST] + # Replace place holders + tile_list_endpoint = tile_list_endpoint.format( + POWERBI_BASE_URL=self.__config.base_url, + WORKSPACE_ID=dashboard.workspace_id, + DASHBOARD_ID=dashboard.id, + ) + # Hit PowerBi + LOGGER.info("Request to URL={}".format(tile_list_endpoint)) + response = requests.get( + url=tile_list_endpoint, + headers={Constant.Authorization: self.get_access_token()}, + ) + + # Check if we got response from PowerBi + if response.status_code != 200: + LOGGER.warning("Failed to fetch tiles list from power-bi for") + LOGGER.warning("{}={}".format(Constant.WorkspaceId, workspace.id)) + LOGGER.warning("{}={}".format(Constant.DashboardId, dashboard.id)) + raise ConnectionError("Failed to fetch the tile list from the power-bi") + + # Iterate through response and create a list of PowerBiAPI.Dashboard + tile_dict: List[Any] = response.json()[Constant.VALUE] + tiles: List[PowerBiAPI.Tile] = [ + PowerBiAPI.Tile( + id=instance.get("id"), + title=instance.get("title"), + embedUrl=instance.get("embedUrl"), + **new_dataset_or_report(instance), + ) + for instance in tile_dict + if instance is not None + ] + + return tiles + + # flake8: noqa: C901 + def get_workspace(self, workspace_id: str) -> Workspace: + """ + Return Workspace for the given workspace identifier i.e workspace_id + """ + scan_create_endpoint = PowerBiAPI.API_ENDPOINTS[Constant.SCAN_CREATE] + scan_create_endpoint = scan_create_endpoint.format( + POWERBI_ADMIN_BASE_URL=self.__config.admin_base_url + ) + + def create_scan_job(): + """ + Create scan job on PowerBi for the workspace + """ + request_body = {"workspaces": [workspace_id]} + + res = requests.post( + scan_create_endpoint, + data=request_body, + params={ + "datasetExpressions": True, + "datasetSchema": True, + "datasourceDetails": True, + "getArtifactUsers": True, + "lineage": True, + }, + headers={Constant.Authorization: self.get_access_token()}, + ) + + if res.status_code not in (200, 202): + message = "API({}) return error code {} for workpace id({})".format( + scan_create_endpoint, res.status_code, workspace_id + ) + + LOGGER.warning(message) + + raise ConnectionError(message) + # Return Id of Scan created for the given workspace + id = res.json()["id"] + LOGGER.info("Scan id({})".format(id)) + return id + + def wait_for_scan_to_complete(scan_id: str, timeout: int) -> Boolean: + """ + Poll the PowerBi service for workspace scan to complete + """ + minimum_sleep = 3 + if timeout < minimum_sleep: + LOGGER.info( + "Setting timeout to minimum_sleep time {} seconds".format( + minimum_sleep + ) + ) + timeout = minimum_sleep + + max_trial = int(timeout / minimum_sleep) + LOGGER.info("Max trial {}".format(max_trial)) + scan_get_endpoint = PowerBiAPI.API_ENDPOINTS[Constant.SCAN_GET] + scan_get_endpoint = scan_get_endpoint.format( + POWERBI_ADMIN_BASE_URL=self.__config.admin_base_url, SCAN_ID=scan_id + ) + + LOGGER.info("Hitting URL={}".format(scan_get_endpoint)) + + trail = 1 + while True: + LOGGER.info("Trial = {}".format(trail)) + res = requests.get( + scan_get_endpoint, + headers={Constant.Authorization: self.get_access_token()}, + ) + if res.status_code != 200: + message = "API({}) return error code {} for scan id({})".format( + scan_get_endpoint, res.status_code, scan_id + ) + + LOGGER.warning(message) + + raise ConnectionError(message) + + if res.json()["status"].upper() == "Succeeded".upper(): + LOGGER.info( + "Scan result is available for scan id({})".format(scan_id) + ) + return True + + if trail == max_trial: + break + LOGGER.info("Sleeping for {} seconds".format(minimum_sleep)) + sleep(minimum_sleep) + trail += 1 + + # Result is not available + return False + + def get_scan_result(scan_id: str) -> dict: + LOGGER.info("Fetching scan result") + LOGGER.info("{}={}".format(Constant.SCAN_ID, scan_id)) + scan_result_get_endpoint = PowerBiAPI.API_ENDPOINTS[ + Constant.SCAN_RESULT_GET + ] + scan_result_get_endpoint = scan_result_get_endpoint.format( + POWERBI_ADMIN_BASE_URL=self.__config.admin_base_url, SCAN_ID=scan_id + ) + + LOGGER.info("Hittin URL={}".format(scan_result_get_endpoint)) + res = requests.get( + scan_result_get_endpoint, + headers={Constant.Authorization: self.get_access_token()}, + ) + if res.status_code != 200: + message = "API({}) return error code {} for scan id({})".format( + scan_result_get_endpoint, res.status_code, scan_id + ) + + LOGGER.warning(message) + + raise ConnectionError(message) + + return res.json()["workspaces"][0] + + def json_to_dataset_map(scan_result: dict) -> dict: + """ + Filter out "dataset" from scan_result and return PowerBiAPI.Dataset instance set + """ + datasets: Optional[Any] = scan_result.get("datasets") + dataset_map: dict = {} + + if datasets is None or len(datasets) == 0: + LOGGER.warning( + "Workspace {}({}) does not have datasets".format( + scan_result["name"], scan_result["id"] + ) + ) + LOGGER.info("Returning empty datasets") + return dataset_map + + for dataset_dict in datasets: + dataset_instance: PowerBiAPI.Dataset = self.get_dataset( + workspace_id=scan_result["id"], + dataset_id=dataset_dict["id"], + ) + + dataset_map[dataset_instance.id] = dataset_instance + # set dataset's DataSource + dataset_instance.datasource = self.get_data_source(dataset_instance) + # Set table only if the datasource is relational and dataset is not created from custom SQL i.e Value.NativeQuery( + # There are dataset which doesn't have DataSource + if ( + dataset_instance.datasource + and dataset_instance.datasource.metadata.is_relational is True + ): + LOGGER.info( + "Processing tables attribute for dataset {}({})".format( + dataset_instance.name, dataset_instance.id + ) + ) + + for table in dataset_dict["tables"]: + if "Value.NativeQuery(" in table["source"][0]["expression"]: + LOGGER.warning( + "Table {} is created from Custom SQL. Ignoring in processing".format( + table["name"] + ) + ) + continue + + # PowerBi table name contains schema name and table name. Format is + schema_and_name = table["name"].split(" ") + dataset_instance.tables.append( + PowerBiAPI.Dataset.Table( + schema_name=schema_and_name[0], + name=schema_and_name[1], + ) + ) + + return dataset_map + + def init_dashboard_tiles(workspace: PowerBiAPI.Workspace) -> None: + for dashboard in workspace.dashboards: + dashboard.tiles = self.get_tiles(workspace, dashboard=dashboard) + + return None + + LOGGER.info("Creating scan job for workspace") + LOGGER.info("{}={}".format(Constant.WorkspaceId, workspace_id)) + LOGGER.info("Hitting URL={}".format(scan_create_endpoint)) + scan_id = create_scan_job() + LOGGER.info("Waiting for scan to complete") + if ( + wait_for_scan_to_complete( + scan_id=scan_id, timeout=self.__config.scan_timeout + ) + is False + ): + raise ValueError( + "Workspace detail is not available. Please increase scan_timeout to wait." + ) + + # Scan is complete lets take the result + scan_result = get_scan_result(scan_id=scan_id) + workspace = PowerBiAPI.Workspace( + id=scan_result["id"], + name=scan_result["name"], + state=scan_result["state"], + datasets={}, + dashboards=[], + ) + # Get workspace dashboards + workspace.dashboards = self.get_dashboards(workspace) + workspace.datasets = json_to_dataset_map(scan_result) + init_dashboard_tiles(workspace) + + return workspace + + +class Mapper: + """ + Transfrom PowerBi concepts Dashboard, Dataset and Tile to DataHub concepts Dashboard, Dataset and Chart + """ + + class EquableMetadataWorkUnit(MetadataWorkUnit): + """ + We can add EquableMetadataWorkUnit to set. + This will avoid passing same MetadataWorkUnit to DataHub Ingestion framework. + """ + + def __eq__(self, instance): + return self.id == self.id + + def __hash__(self): + return id(self.id) + + def __init__(self, config: PowerBiDashboardSourceConfig): + self.__config = config + + def new_mcp( + self, + entity_type, + entity_urn, + aspect_name, + aspect, + change_type=ChangeTypeClass.UPSERT, + ): + """ + Create MCP + """ + return MetadataChangeProposalWrapper( + entityType=entity_type, + changeType=change_type, + entityUrn=entity_urn, + aspectName=aspect_name, + aspect=aspect, + ) + + def __to_work_unit( + self, mcp: MetadataChangeProposalWrapper + ) -> EquableMetadataWorkUnit: + return Mapper.EquableMetadataWorkUnit( + id="{PLATFORM}-{ENTITY_URN}-{ASPECT_NAME}".format( + PLATFORM=self.__config.platform_name, + ENTITY_URN=mcp.entityUrn, + ASPECT_NAME=mcp.aspectName, + ), + mcp=mcp, + ) + + def __to_datahub_dataset( + self, dataset: Optional[PowerBiAPI.Dataset] + ) -> List[MetadataChangeProposalWrapper]: + """ + Map PowerBi dataset to datahub dataset. Here we are mapping each table of PowerBi Dataset to Datahub dataset. + In PowerBi Tile would be having single dataset, However corresponding Datahub's chart might have many input sources. + """ + + dataset_mcps: List[MetadataChangeProposalWrapper] = [] + if dataset is None: + return dataset_mcps + + # We are only suporting relation PowerBi DataSources + if ( + dataset.datasource is None + or dataset.datasource.metadata.is_relational is False + ): + LOGGER.warning( + "Dataset {}({}) is not created from relational datasource".format( + dataset.name, dataset.id + ) + ) + return dataset_mcps + + LOGGER.info( + "Converting dataset={}(id={}) to datahub dataset".format( + dataset.name, dataset.id + ) + ) + + for table in dataset.tables: + # Create an URN for dataset + ds_urn = builder.make_dataset_urn( + platform=self.__config.dataset_type_mapping[dataset.datasource.type], + name="{}.{}.{}".format( + dataset.datasource.database, table.schema_name, table.name + ), + env=self.__config.env, + ) + LOGGER.info("{}={}".format(Constant.Dataset_URN, ds_urn)) + # Create datasetProperties mcp + ds_properties = DatasetPropertiesClass(description=table.name) + + info_mcp = self.new_mcp( + entity_type=Constant.DATASET, + entity_urn=ds_urn, + aspect_name=Constant.DATASET_PROPERTIES, + aspect=ds_properties, + ) + + # Remove status mcp + status_mcp = self.new_mcp( + entity_type=Constant.DATASET, + entity_urn=ds_urn, + aspect_name=Constant.STATUS, + aspect=StatusClass(removed=False), + ) + + dataset_mcps.extend([info_mcp, status_mcp]) + + return dataset_mcps + + def __to_datahub_chart( + self, tile: PowerBiAPI.Tile, ds_mcps: List[MetadataChangeProposalWrapper] + ) -> List[MetadataChangeProposalWrapper]: + """ + Map PowerBi tile to datahub chart + """ + LOGGER.info("Converting tile {}(id={}) to chart".format(tile.title, tile.id)) + # Create an URN for chart + chart_urn = builder.make_chart_urn( + self.__config.platform_name, tile.get_urn_part() + ) + + LOGGER.info("{}={}".format(Constant.CHART_URN, chart_urn)) + + ds_input: List[str] = self.to_urn_set(ds_mcps) + + def tile_custom_properties(tile: PowerBiAPI.Tile) -> dict: + custom_properties = { + "datasetId": tile.dataset.id if tile.dataset else "", + "reportId": tile.report.id if tile.report else "", + "datasetWebUrl": tile.dataset.webUrl + if tile.dataset is not None + else "", + "createdFrom": tile.createdFrom.value, + } + + return custom_properties + + # Create chartInfo mcp + # Set chartUrl only if tile is created from Report + chart_info_instance = ChartInfoClass( + title=tile.title or "", + description=tile.title or "", + lastModified=ChangeAuditStamps(), + inputs=ds_input, + externalUrl=tile.report.webUrl if tile.report else None, + customProperties={**tile_custom_properties(tile)}, + ) + + info_mcp = self.new_mcp( + entity_type=Constant.CHART, + entity_urn=chart_urn, + aspect_name=Constant.CHART_INFO, + aspect=chart_info_instance, + ) + + # removed status mcp + status_mcp = self.new_mcp( + entity_type=Constant.CHART, + entity_urn=chart_urn, + aspect_name=Constant.STATUS, + aspect=StatusClass(removed=False), + ) + + # ChartKey status + chart_key_instance = ChartKeyClass( + dashboardTool=self.__config.platform_name, + chartId=Constant.CHART_ID.format(tile.id), + ) + + chartkey_mcp = self.new_mcp( + entity_type=Constant.CHART, + entity_urn=chart_urn, + aspect_name=Constant.CHART_KEY, + aspect=chart_key_instance, + ) + + return [info_mcp, status_mcp, chartkey_mcp] + + # written in this style to fix linter error + def to_urn_set(self, mcps: List[MetadataChangeProposalWrapper]) -> List[str]: + return list( + OrderedSet( + [ + mcp.entityUrn + for mcp in mcps + if mcp is not None and mcp.entityUrn is not None + ] + ) + ) + + def __to_datahub_dashboard( + self, + dashboard: PowerBiAPI.Dashboard, + chart_mcps: List[MetadataChangeProposalWrapper], + user_mcps: List[MetadataChangeProposalWrapper], + ) -> List[MetadataChangeProposalWrapper]: + """ + Map PowerBi dashboard to Datahub dashboard + """ + + dashboard_urn = builder.make_dashboard_urn( + self.__config.platform_name, dashboard.get_urn_part() + ) + + chart_urn_list: List[str] = self.to_urn_set(chart_mcps) + user_urn_list: List[str] = self.to_urn_set(user_mcps) + + def chart_custom_properties(dashboard: PowerBiAPI.Dashboard) -> dict: + return { + "chartCount": str(len(dashboard.tiles)), + "workspaceName": dashboard.workspace_name, + "workspaceId": dashboard.id, + } + + # DashboardInfo mcp + dashboard_info_cls = DashboardInfoClass( + description=dashboard.displayName or "", + title=dashboard.displayName or "", + charts=chart_urn_list, + lastModified=ChangeAuditStamps(), + dashboardUrl=dashboard.webUrl, + customProperties={**chart_custom_properties(dashboard)}, + ) + + info_mcp = self.new_mcp( + entity_type=Constant.DASHBOARD, + entity_urn=dashboard_urn, + aspect_name=Constant.DASHBOARD_INFO, + aspect=dashboard_info_cls, + ) + + # removed status mcp + removed_status_mcp = self.new_mcp( + entity_type=Constant.DASHBOARD, + entity_urn=dashboard_urn, + aspect_name=Constant.STATUS, + aspect=StatusClass(removed=False), + ) + + # dashboardKey mcp + dashboard_key_cls = DashboardKeyClass( + dashboardTool=self.__config.platform_name, + dashboardId=Constant.DASHBOARD_ID.format(dashboard.id), + ) + + # Dashboard key + dashboard_key_mcp = self.new_mcp( + entity_type=Constant.DASHBOARD, + entity_urn=dashboard_urn, + aspect_name=Constant.DASHBOARD_KEY, + aspect=dashboard_key_cls, + ) + + # Dashboard Ownership + owners = [ + OwnerClass(owner=user_urn, type=OwnershipTypeClass.CONSUMER) + for user_urn in user_urn_list + if user_urn is not None + ] + ownership = OwnershipClass(owners=owners) + # Dashboard owner MCP + owner_mcp = self.new_mcp( + entity_type=Constant.DASHBOARD, + entity_urn=dashboard_urn, + aspect_name=Constant.OWNERSHIP, + aspect=ownership, + ) + + # Dashboard browsePaths + browse_path = BrowsePathsClass( + paths=["/powerbi/{}".format(self.__config.workspace_id)] + ) + browse_path_mcp = self.new_mcp( + entity_type=Constant.DASHBOARD, + entity_urn=dashboard_urn, + aspect_name=Constant.BROWSERPATH, + aspect=browse_path, + ) + + return [ + browse_path_mcp, + info_mcp, + removed_status_mcp, + dashboard_key_mcp, + owner_mcp, + ] + + def to_datahub_user( + self, user: PowerBiAPI.User + ) -> List[MetadataChangeProposalWrapper]: + """ + Map PowerBi user to datahub user + """ + + LOGGER.info( + "Converting user {}(id={}) to datahub's user".format( + user.displayName, user.id + ) + ) + + # Create an URN for user + user_urn = builder.make_user_urn(user.get_urn_part()) + + user_info_instance = CorpUserInfoClass( + displayName=user.displayName, + email=user.emailAddress, + title=user.displayName, + active=True, + ) + + info_mcp = self.new_mcp( + entity_type=Constant.CORP_USER, + entity_urn=user_urn, + aspect_name=Constant.CORP_USER_INFO, + aspect=user_info_instance, + ) + + # removed status mcp + status_mcp = self.new_mcp( + entity_type=Constant.CORP_USER, + entity_urn=user_urn, + aspect_name=Constant.STATUS, + aspect=StatusClass(removed=False), + ) + + user_key = CorpUserKeyClass(username=user.id) + + user_key_mcp = self.new_mcp( + entity_type=Constant.CORP_USER, + entity_urn=user_urn, + aspect_name=Constant.CORP_USER_KEY, + aspect=user_key, + ) + + return [info_mcp, status_mcp, user_key_mcp] + + def to_datahub_users( + self, users: List[PowerBiAPI.User] + ) -> List[MetadataChangeProposalWrapper]: + user_mcps = [] + + for user in users: + user_mcps.extend(self.to_datahub_user(user)) + + return user_mcps + + def to_datahub_chart( + self, tiles: List[PowerBiAPI.Tile] + ) -> Tuple[ + List[MetadataChangeProposalWrapper], List[MetadataChangeProposalWrapper] + ]: + ds_mcps = [] + chart_mcps = [] + + # Return empty list if input list is empty + if len(tiles) == 0: + return [], [] + + LOGGER.info("Converting tiles(count={}) to charts".format(len(tiles))) + + for tile in tiles: + if tile is None: + continue + # First convert the dataset to MCP, because dataset mcp is used in input attribute of chart mcp + dataset_mcps = self.__to_datahub_dataset(tile.dataset) + # Now convert tile to chart MCP + chart_mcp = self.__to_datahub_chart(tile, dataset_mcps) + + ds_mcps.extend(dataset_mcps) + chart_mcps.extend(chart_mcp) + + # Return dataset and chart MCPs + + return ds_mcps, chart_mcps + + def to_datahub_work_units( + self, dashboard: PowerBiAPI.Dashboard + ) -> Set[EquableMetadataWorkUnit]: + mcps = [] + + LOGGER.info( + "Converting dashboard={} to datahub dashboard".format(dashboard.displayName) + ) + + # Convert user to CorpUser + user_mcps = self.to_datahub_users(dashboard.users) + # Convert tiles to charts + ds_mcps, chart_mcps = self.to_datahub_chart(dashboard.tiles) + # Lets convert dashboard to datahub dashboard + dashboard_mcps = self.__to_datahub_dashboard(dashboard, chart_mcps, user_mcps) + + # Now add MCPs in sequence + mcps.extend(ds_mcps) + mcps.extend(user_mcps) + mcps.extend(chart_mcps) + mcps.extend(dashboard_mcps) + + # Convert MCP to work_units + work_units = map(self.__to_work_unit, mcps) + # Return set of work_unit + return OrderedSet([wu for wu in work_units if wu is not None]) + + +@dataclass +class PowerBiDashboardSourceReport(SourceReport): + dashboards_scanned: int = 0 + charts_scanned: int = 0 + filtered_dashboards: List[str] = dataclass_field(default_factory=list) + filtered_charts: List[str] = dataclass_field(default_factory=list) + + def report_dashboards_scanned(self, count: int = 1) -> None: + self.dashboards_scanned += count + + def report_charts_scanned(self, count: int = 1) -> None: + self.charts_scanned += count + + def report_dashboards_dropped(self, model: str) -> None: + self.filtered_dashboards.append(model) + + def report_charts_dropped(self, view: str) -> None: + self.filtered_charts.append(view) + + +class PowerBiDashboardSource(Source): + """ + Datahub PowerBi plugin main class. This class extends Source to become PowerBi data ingestion source for Datahub + """ + + source_config: PowerBiDashboardSourceConfig + reporter: PowerBiDashboardSourceReport + accessed_dashboards: int = 0 + + def __init__(self, config: PowerBiDashboardSourceConfig, ctx: PipelineContext): + super().__init__(ctx) + self.source_config = config + self.reporter = PowerBiDashboardSourceReport() + self.auth_token = PowerBiAPI(self.source_config).get_access_token() + self.powerbi_client = PowerBiAPI(self.source_config) + self.mapper = Mapper(config) + + @classmethod + def create(cls, config_dict, ctx): + config = PowerBiDashboardSourceConfig.parse_obj(config_dict) + return cls(config, ctx) + + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + """ + Datahub Ingestion framework invoke this method + """ + LOGGER.info("PowerBi plugin execution is started") + + # Fetch PowerBi workspace for given workspace identifier + workspace = self.powerbi_client.get_workspace(self.source_config.workspace_id) + + for dashboard in workspace.dashboards: + + try: + # Fetch PowerBi users for dashboards + dashboard.users = self.powerbi_client.get_dashboard_users(dashboard) + # Increase dashboard and tiles count in report + self.reporter.report_dashboards_scanned() + self.reporter.report_charts_scanned(count=len(dashboard.tiles)) + except Exception as e: + message = "Error ({}) occurred while loading dashboard {}(id={}) tiles.".format( + e, dashboard.displayName, dashboard.id + ) + LOGGER.exception(message, e) + self.reporter.report_warning(dashboard.id, message) + + # Convert PowerBi Dashboard and child entities to Datahub work unit to ingest into Datahub + workunits = self.mapper.to_datahub_work_units(dashboard) + for workunit in workunits: + # Add workunit to report + self.reporter.report_workunit(workunit) + # Return workunit to Datahub Ingestion framework + yield workunit + + def get_report(self) -> SourceReport: + return self.reporter diff --git a/metadata-ingestion/tests/integration/powerbi/golden_test_ingest.json b/metadata-ingestion/tests/integration/powerbi/golden_test_ingest.json new file mode 100644 index 00000000000000..f7f8f285c1c4cb --- /dev/null +++ b/metadata-ingestion/tests/integration/powerbi/golden_test_ingest.json @@ -0,0 +1,382 @@ +[ + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,library_db.public.issue_book,DEV)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "value": "{\"customProperties\": {}, \"description\": \"issue_book\", \"tags\": []}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,library_db.public.issue_book,DEV)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "value": "{\"removed\": false}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,library_db.public.member,DEV)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "value": "{\"customProperties\": {}, \"description\": \"member\", \"tags\": []}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,library_db.public.member,DEV)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "value": "{\"removed\": false}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,library_db.public.issue_history,DEV)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "value": "{\"customProperties\": {}, \"description\": \"issue_history\", \"tags\": []}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,library_db.public.issue_history,DEV)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "value": "{\"removed\": false}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "corpuser", + "entityUrn": "urn:li:corpuser:users.User1@foo.com", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "corpUserInfo", + "aspect": { + "value": "{\"active\": true, \"displayName\": \"User1\", \"email\": \"User1@foo.com\", \"title\": \"User1\"}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "corpuser", + "entityUrn": "urn:li:corpuser:users.User1@foo.com", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "value": "{\"removed\": false}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "corpuser", + "entityUrn": "urn:li:corpuser:users.User1@foo.com", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "corpUserKey", + "aspect": { + "value": "{\"username\": \"User1@foo.com\"}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "corpuser", + "entityUrn": "urn:li:corpuser:users.User2@foo.com", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "corpUserInfo", + "aspect": { + "value": "{\"active\": true, \"displayName\": \"User2\", \"email\": \"User2@foo.com\", \"title\": \"User2\"}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "corpuser", + "entityUrn": "urn:li:corpuser:users.User2@foo.com", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "value": "{\"removed\": false}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "corpuser", + "entityUrn": "urn:li:corpuser:users.User2@foo.com", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "corpUserKey", + "aspect": { + "value": "{\"username\": \"User2@foo.com\"}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "chart", + "entityUrn": "urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "chartInfo", + "aspect": { + "value": "{\"customProperties\": {\"datasetId\": \"05169CD2-E713-41E6-9600-1D8066D95445\", \"reportId\": \"\", \"datasetWebUrl\": \"http://localhost/groups/64ED5CAD-7C10-4684-8180-826122881108/datasets/05169CD2-E713-41E6-9600-1D8066D95445\", \"createdFrom\": \"Dataset\"}, \"title\": \"test_tiles\", \"description\": \"test_tiles\", \"lastModified\": {\"created\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"lastModified\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}}, \"inputs\": [{\"string\": \"urn:li:dataset:(urn:li:dataPlatform:postgres,library_db.public.issue_book,DEV)\"}, {\"string\": \"urn:li:dataset:(urn:li:dataPlatform:postgres,library_db.public.member,DEV)\"}, {\"string\": \"urn:li:dataset:(urn:li:dataPlatform:postgres,library_db.public.issue_history,DEV)\"}]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "chart", + "entityUrn": "urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "value": "{\"removed\": false}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "chart", + "entityUrn": "urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "chartKey", + "aspect": { + "value": "{\"dashboardTool\": \"powerbi\", \"chartId\": \"powerbi.linkedin.com/charts/B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0\"}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(powerbi,dashboards.7D668CAD-7FFC-4505-9215-655BCA5BEBAE)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "browsePaths", + "aspect": { + "value": "{\"paths\": [\"/powerbi/64ED5CAD-7C10-4684-8180-826122881108\"]}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(powerbi,dashboards.7D668CAD-7FFC-4505-9215-655BCA5BEBAE)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dashboardInfo", + "aspect": { + "value": "{\"customProperties\": {\"chartCount\": \"1\", \"workspaceName\": \"foo\", \"workspaceId\": \"7D668CAD-7FFC-4505-9215-655BCA5BEBAE\"}, \"title\": \"test_dashboard\", \"description\": \"test_dashboard\", \"charts\": [\"urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)\"], \"lastModified\": {\"created\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"lastModified\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}}, \"dashboardUrl\": \"https://localhost/dashboards/web/1\"}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(powerbi,dashboards.7D668CAD-7FFC-4505-9215-655BCA5BEBAE)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "value": "{\"removed\": false}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(powerbi,dashboards.7D668CAD-7FFC-4505-9215-655BCA5BEBAE)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "dashboardKey", + "aspect": { + "value": "{\"dashboardTool\": \"powerbi\", \"dashboardId\": \"powerbi.linkedin.com/dashboards/7D668CAD-7FFC-4505-9215-655BCA5BEBAE\"}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + }, + { + "auditHeader": null, + "entityType": "dashboard", + "entityUrn": "urn:li:dashboard:(powerbi,dashboards.7D668CAD-7FFC-4505-9215-655BCA5BEBAE)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "value": "{\"owners\": [{\"owner\": \"urn:li:corpuser:users.User1@foo.com\", \"type\": \"CONSUMER\"}, {\"owner\": \"urn:li:corpuser:users.User2@foo.com\", \"type\": \"CONSUMER\"}], \"lastModified\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}}", + "contentType": "application/json" + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "powerbi-test", + "registryName": null, + "registryVersion": null, + "properties": null + } + } + ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/powerbi/test_powerbi.py b/metadata-ingestion/tests/integration/powerbi/test_powerbi.py new file mode 100644 index 00000000000000..4f2cde4cbf3d83 --- /dev/null +++ b/metadata-ingestion/tests/integration/powerbi/test_powerbi.py @@ -0,0 +1,131 @@ +from unittest import mock + +from freezegun import freeze_time + +from datahub.ingestion.run.pipeline import Pipeline +from datahub.ingestion.source.powerbi import PowerBiAPI +from tests.test_helpers import mce_helpers + +FROZEN_TIME = "2022-02-03 07:00:00" + +POWERBI_API = "datahub.ingestion.source.powerbi.PowerBiAPI" + + +@freeze_time(FROZEN_TIME) +def test_powerbi_ingest(pytestconfig, tmp_path, mock_time): + mocked_client = mock.MagicMock() + # Set the datasource mapping dictionary + with mock.patch(POWERBI_API) as mock_sdk: + mock_sdk.return_value = mocked_client + # Mock the PowerBi Dashboard API response + mocked_client.get_workspace.return_value = PowerBiAPI.Workspace( + id="64ED5CAD-7C10-4684-8180-826122881108", + name="demo-workspace", + state="Active", + datasets={}, + dashboards=[ + PowerBiAPI.Dashboard( + id="7D668CAD-7FFC-4505-9215-655BCA5BEBAE", + displayName="test_dashboard", + isReadOnly=True, + embedUrl="https://localhost/dashboards/embed/1", + webUrl="https://localhost/dashboards/web/1", + workspace_id="4A378B07-FAA2-4EA2-9383-CBA91AD9681C", + workspace_name="foo", + users=[], + tiles=[ + PowerBiAPI.Tile( + id="B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0", + title="test_tiles", + embedUrl="https://localhost/tiles/embed/1", + createdFrom=PowerBiAPI.Tile.CreatedFrom.DATASET, + report=None, + dataset=PowerBiAPI.Dataset( + id="05169CD2-E713-41E6-9600-1D8066D95445", + name="library-dataset", + workspace_id="64ED5CAD-7C10-4684-8180-826122881108", + webUrl="http://localhost/groups/64ED5CAD-7C10-4684-8180-826122881108/datasets/05169CD2-E713-41E6-9600-1D8066D95445", + tables=[ + PowerBiAPI.Dataset.Table( + name="issue_book", schema_name="public" + ), + PowerBiAPI.Dataset.Table( + name="member", schema_name="public" + ), + PowerBiAPI.Dataset.Table( + name="issue_history", schema_name="public" + ), + ], + datasource=PowerBiAPI.DataSource( + id="DCE90B40-84D6-467A-9A5C-648E830E72D3", + database="library_db", + type="PostgreSql", + server="gs-library.postgres.database.azure.com", + metadata=PowerBiAPI.DataSource.MetaData( + is_relational=True + ), + ), + ), + ) + ], + ) + ], + ) + + # Mock the PowerBi User API response + mocked_client.get_dashboard_users.return_value = [ + PowerBiAPI.User( + id="User1@foo.com", + displayName="User1", + emailAddress="User1@foo.com", + dashboardUserAccessRight="ReadWrite", + principalType="User", + graphId="C9EE53F2-88EA-4711-A173-AF0515A3CD46", + ), + PowerBiAPI.User( + id="User2@foo.com", + displayName="User2", + emailAddress="User2@foo.com", + dashboardUserAccessRight="ReadWrite", + principalType="User", + graphId="5ED26AA7-FCD2-42C5-BCE8-51E0AAD0682B", + ), + ] + + test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" + + pipeline = Pipeline.create( + { + "run_id": "powerbi-test", + "source": { + "type": "powerbi", + "config": { + "client_id": "foo", + "client_secret": "bar", + "tenant_id": "0B0C960B-FCDF-4D0F-8C45-2E03BB59DDEB", + "workspace_id": "64ED5CAD-7C10-4684-8180-826122881108", + "dataset_type_mapping": { + "PostgreSql": "postgres", + "Oracle": "oracle", + }, + "env": "DEV", + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path}/powerbi_mces.json", + }, + }, + } + ) + + pipeline.run() + pipeline.raise_from_status() + mce_out_file = "golden_test_ingest.json" + + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / "powerbi_mces.json", + golden_path=f"{test_resources_dir}/{mce_out_file}", + ) diff --git a/metadata-service/restli-servlet-impl/src/main/resources/DataPlatformInfo.json b/metadata-service/restli-servlet-impl/src/main/resources/DataPlatformInfo.json index 8369572a7d6b72..5622b342183568 100644 --- a/metadata-service/restli-servlet-impl/src/main/resources/DataPlatformInfo.json +++ b/metadata-service/restli-servlet-impl/src/main/resources/DataPlatformInfo.json @@ -163,5 +163,12 @@ "displayName": "Looker", "type": "OTHERS", "logoUrl": "https://raw.githubusercontent.com/linkedin/datahub/master/datahub-web-react/src/images/lookerlogo.png" + }, + "urn:li:dataPlatform:powerbi": { + "datasetNameDelimiter": ".", + "name": "powerbi", + "displayName": "Power BI", + "type": "OTHERS", + "logoUrl": "https://raw.githubusercontent.com/linkedin/datahub/master/datahub-web-react/src/images/powerbilogo.png" } } diff --git a/metadata-service/war/src/main/resources/boot/data_platforms.json b/metadata-service/war/src/main/resources/boot/data_platforms.json index d857264a756bb7..cd5842539293dc 100644 --- a/metadata-service/war/src/main/resources/boot/data_platforms.json +++ b/metadata-service/war/src/main/resources/boot/data_platforms.json @@ -385,5 +385,15 @@ "type": "OTHERS", "logoUrl": "/assets/platforms/elasticsearchlogo.png" } + }, + { + "urn": "urn:li:dataPlatform:powerbi", + "aspect": { + "datasetNameDelimiter": ".", + "name": "powerbi", + "displayName": "Power BI", + "type": "OTHERS", + "logoUrl": "/assets/platforms/powerbilogo.png" + } } ]