Skip to content

Commit

Permalink
feat(sdk): structured properties - add support for listing
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka committed Jan 7, 2025
1 parent 2f20c52 commit 6b6daa3
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Usage: python3 list_structured_properties.py
# Expected Output: List of structured properties
# This script lists all structured properties in DataHub
from datahub.api.entities.structuredproperties.structuredproperties import (
StructuredProperties,
)
from datahub.ingestion.graph.client import get_default_graph

with get_default_graph() as graph:
structuredproperties = StructuredProperties.list(graph)
for structuredproperty in structuredproperties:
print(structuredproperty.dict())
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from contextlib import contextmanager
from enum import Enum
from pathlib import Path
from typing import Generator, List, Optional
from typing import Generator, Iterable, List, Optional

import yaml
from pydantic import validator
Expand Down Expand Up @@ -136,7 +136,6 @@ def urn_must_be_present(cls, v, values):
def create(file: str, graph: Optional[DataHubGraph] = None) -> None:
emitter: DataHubGraph = graph if graph else get_default_graph()
with StructuredPropertiesConfig.use_graph(emitter):
print("Using graph")
with open(file) as fp:
structuredproperties: List[dict] = yaml.safe_load(fp)
for structuredproperty_raw in structuredproperties:
Expand Down Expand Up @@ -237,3 +236,15 @@ def to_yaml(
yaml.indent(mapping=2, sequence=4, offset=2)
yaml.default_flow_style = False
yaml.dump(self.dict(), fp)

@staticmethod
def list_urns(graph: DataHubGraph) -> Iterable[str]:
return graph.get_urns_by_filter(
entity_types=["structuredProperty"],
)

@staticmethod
def list(graph: DataHubGraph) -> Iterable["StructuredProperties"]:
with StructuredPropertiesConfig.use_graph(graph):
for urn in StructuredProperties.list_urns(graph):
yield StructuredProperties.from_datahub(graph, urn)
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import json
import logging
from pathlib import Path
from typing import Iterable

import click
from click_default_group import DefaultGroup
from ruamel.yaml import YAML

from datahub.api.entities.structuredproperties.structuredproperties import (
StructuredProperties,
Expand Down Expand Up @@ -60,3 +62,85 @@ def get(urn: str, to_file: str) -> None:
)
else:
click.secho(f"Structured property {urn} does not exist")


@properties.command(
name="list",
)
@click.option("--details/--no-details", is_flag=True, default=True)
@click.option("--to-file", required=False, type=str)
@telemetry.with_telemetry()
def list(details: bool, to_file: str) -> None:
"""List structured properties in DataHub"""

def to_yaml_list(
objects: Iterable[StructuredProperties], # iterable of objects to dump
file: Path,
) -> None:
# if file exists, first we read it
yaml = YAML(typ="rt") # default, if not specfied, is 'rt' (round-trip)
yaml.indent(mapping=2, sequence=4, offset=2)
yaml.default_flow_style = False
serialized_objects = []
if file.exists():
with open(file, "r") as fp:
existing_objects = yaml.load(fp) # this is a list of dicts
existing_objects = [
StructuredProperties.parse_obj(obj) for obj in existing_objects
]
objects = [obj for obj in objects]
# do a positional update of the existing objects
existing_urns = {obj.urn for obj in existing_objects}
# existing_urns = {obj["urn"] if "urn" in obj else f"urn:li:structuredProperty:{obj['id']}" for obj in existing_objects}
for i, obj in enumerate(existing_objects):
# existing_urn = obj["urn"] if "urn" in obj else f"urn:li:structuredProperty:{obj['id']}"
existing_urn = obj.urn
# breakpoint()
if existing_urn in {obj.urn for obj in objects}:
existing_objects[i] = next(
obj.dict(exclude_unset=True, exclude_none=True)
for obj in objects
if obj.urn == existing_urn
)
new_objects = [
obj.dict(exclude_unset=True, exclude_none=True)
for obj in objects
if obj.urn not in existing_urns
]
serialized_objects = existing_objects + new_objects
else:
serialized_objects = [
obj.dict(exclude_unset=True, exclude_none=True) for obj in objects
]

with open(file, "w") as fp:
yaml.dump(serialized_objects, fp)

with get_default_graph() as graph:
if details:
logger.info(
"Listing structured properties with details. Use --no-details for urns only"
)
structuredproperties = StructuredProperties.list(graph)
if to_file:
to_yaml_list(structuredproperties, Path(to_file))
else:
for structuredproperty in structuredproperties:
click.secho(
f"{json.dumps(structuredproperty.dict(exclude_unset=True, exclude_none=True), indent=2)}"
)
else:
logger.info(
"Listing structured property urns only, use --details for more information"
)
structured_property_urns = StructuredProperties.list_urns(graph)
if to_file:
with open(to_file, "w") as f:
for urn in structured_property_urns:
f.write(f"{urn}\n")
click.secho(
f"Structured property urns written to {to_file}", fg="green"
)
else:
for urn in structured_property_urns:
click.secho(f"{urn}")

0 comments on commit 6b6daa3

Please sign in to comment.