diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index d96b11f6b405c..66f39b2a9c265 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -1199,6 +1199,161 @@ def create_tag(self, tag_name: str) -> str: # return urn return res["createTag"] + def _assertion_result_shared(self) -> str: + fragment: str = """ + fragment assertionResult on AssertionResult { + type + rowCount + missingCount + unexpectedCount + actualAggValue + externalUrl + nativeResults { + value + } + error { + type + properties { + value + } + } + } + """ + return fragment + + def _run_assertion_result_shared(self) -> str: + fragment: str = """ + fragment runAssertionResult on RunAssertionResult { + assertion { + urn + } + result { + ... assertionResult + } + } + """ + return fragment + + def _run_assertion_build_params( + self, params: Optional[Dict[str, str]] = {} + ) -> List[Any]: + if params is None: + return [] + + results = [] + for key, value in params.items(): + result = { + "key": key, + "value": value, + } + results.append(result) + + return results + + def run_assertion( + self, urn: str, save_result: bool = True, parameters: Optional[Dict[str, str]] = {} + ) -> Dict: + params = self._run_assertion_build_params(parameters) + graph_query: str = """ + %s + mutation runAssertion($assertionUrn: String!, $saveResult: Boolean, $parameters: [StringMapEntryInput!]) { + runAssertion(urn: $assertionUrn, saveResult: $saveResult, parameters: $parameters) { + ... assertionResult + } + } + """ % ( + self._assertion_result_shared() + ) + + variables = { + "assertionUrn": urn, + "saveResult": save_result, + "parameters": params, + } + + res = self.execute_graphql( + query=graph_query, + variables=variables, + ) + + return res["runAssertion"] + + def run_assertions( + self, + urns: List[str], + save_result: bool = True, + parameters: Optional[Dict[str, str]] = {}, + ) -> Dict: + params = self._run_assertion_build_params(parameters) + graph_query: str = """ + %s + %s + mutation runAssertions($assertionUrns: [String!]!, $saveResult: Boolean, $parameters: [StringMapEntryInput!]) { + runAssertions(urns: $assertionUrns, saveResults: $saveResult, parameters: $parameters) { + passingCount + failingCount + errorCount + results { + ... runAssertionResult + } + } + } + """ % ( + self._assertion_result_shared(), + self._run_assertion_result_shared(), + ) + + variables = { + "assertionUrns": urns, + "saveResult": save_result, + "parameters": params, + } + + res = self.execute_graphql( + query=graph_query, + variables=variables, + ) + + return res["runAssertions"] + + def run_assertions_for_asset( + self, + urn: str, + tag_urns: Optional[List[str]] = [], + parameters: Optional[Dict[str, str]] = {}, + ) -> Dict: + params = self._run_assertion_build_params(parameters) + graph_query: str = """ + %s + %s + mutation runAssertionsForAsset($assetUrn: String!, $tagUrns: [!String], $parameters: [StringMapEntryInput!]) { + runAssertionsForAsset(urn: $assetUrn, tagUrns: $tagUrns, parameters: $parameters) { + passingCount + failingCount + errorCount + results { + ... runAssertionResult + } + } + } + """ % ( + self._assertion_result_shared(), + self._run_assertion_result_shared(), + ) + + variables = { + "assetUrn": urn, + "tagUrns": tag_urns, + "parameters": params, + } + + res = self.execute_graphql( + query=graph_query, + variables=variables, + ) + + return res["runAssertionsForAsset"] + def close(self) -> None: self._make_schema_resolver.cache_clear() super().close()