Skip to content

Commit

Permalink
Add support for runAssertion, runAssertions, and runAssertionsForAsse…
Browse files Browse the repository at this point in the history
…t APIs
  • Loading branch information
noggi committed May 29, 2024
1 parent d78287c commit b6daf8a
Showing 1 changed file with 164 additions and 0 deletions.
164 changes: 164 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,170 @@ def create_tag(self, tag_name: str) -> str:
# return urn
return res["createTag"]

def _assertion_result_shared(self) -> str:
fragment: str = """
fragment assertionResult on AssertionResult {
type
rowCount
missingCount
unexpectedCount
actualAggValue
externalUrl
nativeResults {
value
}
error {
type
properties {
value
}
}
}
"""
return fragment

def _run_assertion_result_shared(self) -> str:
fragment: str = """
fragment runAssertionResult on RunAssertionResult {
assertion {
urn
}
result {
... assertionResult
}
}
"""
return fragment

def _run_assertion_build_params(
self, params: Optional[Dict[str, str]] = {}
) -> List[Any]:
if params is None:
return []

results = []
for key, value in params.items():
result = {
"key": key,
"value": value,
}
results.append(result)

return results

def run_assertion(
self,
urn: str,
save_result: bool = True,
parameters: Optional[Dict[str, str]] = {},
async_flag: bool = False,
) -> Dict:
params = self._run_assertion_build_params(parameters)
graph_query: str = """
%s
mutation runAssertion($assertionUrn: String!, $saveResult: Boolean, $parameters: [StringMapEntryInput!], $async: Boolean!) {
runAssertion(urn: $assertionUrn, saveResult: $saveResult, parameters: $parameters, async: $async) {
... assertionResult
}
}
""" % (
self._assertion_result_shared()
)

variables = {
"assertionUrn": urn,
"saveResult": save_result,
"parameters": params,
"async": async_flag,
}

res = self.execute_graphql(
query=graph_query,
variables=variables,
)

return res["runAssertion"]

def run_assertions(
self,
urns: List[str],
save_result: bool = True,
parameters: Optional[Dict[str, str]] = {},
async_flag: bool = False,
) -> Dict:
params = self._run_assertion_build_params(parameters)
graph_query: str = """
%s
%s
mutation runAssertions($assertionUrns: [String!]!, $saveResult: Boolean, $parameters: [StringMapEntryInput!], $async: Boolean!) {
runAssertions(urns: $assertionUrns, saveResults: $saveResult, parameters: $parameters, async: $async) {
passingCount
failingCount
errorCount
results {
... runAssertionResult
}
}
}
""" % (
self._assertion_result_shared(),
self._run_assertion_result_shared(),
)

variables = {
"assertionUrns": urns,
"saveResult": save_result,
"parameters": params,
"async": async_flag,
}

res = self.execute_graphql(
query=graph_query,
variables=variables,
)

return res["runAssertions"]

def run_assertions_for_asset(
self,
urn: str,
tag_urns: Optional[List[str]] = [],
parameters: Optional[Dict[str, str]] = {},
async_flag: bool = False,
) -> Dict:
params = self._run_assertion_build_params(parameters)
graph_query: str = """
%s
%s
mutation runAssertionsForAsset($assetUrn: String!, $tagUrns: [!String], $parameters: [StringMapEntryInput!], $async: Boolean!) {
runAssertionsForAsset(urn: $assetUrn, tagUrns: $tagUrns, parameters: $parameters, async: $async) {
passingCount
failingCount
errorCount
results {
... runAssertionResult
}
}
}
""" % (
self._assertion_result_shared(),
self._run_assertion_result_shared(),
)

variables = {
"assetUrn": urn,
"tagUrns": tag_urns,
"parameters": params,
"async": async_flag,
}

res = self.execute_graphql(
query=graph_query,
variables=variables,
)

return res["runAssertionsForAsset"]

def close(self) -> None:
self._make_schema_resolver.cache_clear()
super().close()
Expand Down

0 comments on commit b6daf8a

Please sign in to comment.