-
Notifications
You must be signed in to change notification settings - Fork 219
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a script to generate the media_properties.md
Signed-off-by: Olga Bulat <obulat@gmail.com>
- Loading branch information
Showing
9 changed files
with
1,380 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
import ast | ||
from pathlib import Path | ||
|
||
|
||
STORAGE_PATH = Path(__file__).parents[2] / "dags" / "common" / "storage" | ||
COLUMNS_PATH = STORAGE_PATH / "columns.py" | ||
|
||
COLUMNS_URL = "https://github.com/WordPress/openverse/blob/main/catalog/dags/common/storage/columns.py" # noqa: E501 | ||
|
||
|
||
def format_python_column( | ||
column_db_name: str, | ||
python_column: dict[str, any], | ||
python_column_lines: dict[str, tuple[int, int]], | ||
) -> str: | ||
col_type = python_column.pop("python_type") | ||
start, end = python_column_lines[col_type] | ||
python_column_string = f"[{col_type}]({COLUMNS_URL}#L{start}-L{end}) (" | ||
|
||
col_name = python_column.pop("name") | ||
if col_name != column_db_name: | ||
python_column_string += f"name='{col_name}', " | ||
|
||
custom_props_string = "" | ||
if custom_props := python_column.pop("custom_column_props", None): | ||
props_string = ", ".join([f"{k}={v}" for k, v in custom_props.items()]) | ||
custom_props_string = f", {col_type}Props({props_string})" | ||
|
||
python_column_string += ", ".join([f"{k}={v}" for k, v in python_column.items()]) | ||
python_column_string += f"{custom_props_string})" | ||
|
||
return python_column_string | ||
|
||
|
||
def file_to_ast() -> ast.Module: | ||
with open(COLUMNS_PATH) as f: | ||
file_contents = f.read() | ||
return ast.parse(file_contents) | ||
|
||
|
||
def parse_python_columns() -> dict[str, any]: | ||
"""Get the Python column definitions from the columns.py file.""" | ||
columns = {} | ||
python_column_lines = get_python_column_types() | ||
|
||
code = file_to_ast() | ||
for item in ast.iter_child_nodes(code): | ||
if isinstance(item, ast.Assign): | ||
if not (column := parse_column_definition(item)): | ||
continue | ||
db_name = column.pop("db_name") | ||
columns[db_name] = format_python_column( | ||
db_name, column, python_column_lines | ||
) | ||
|
||
return columns | ||
|
||
|
||
def get_python_column_types() -> dict[str, tuple[int, int]]: | ||
""" | ||
Parse the columns.py file to get the Python column names | ||
and their line numbers for hyperlinks. | ||
Sample output: `StringColumn: (3, 5)`` | ||
""" | ||
code = file_to_ast() | ||
return { | ||
item.name: (item.lineno, item.end_lineno) | ||
for item in ast.iter_child_nodes(code) | ||
if isinstance(item, ast.ClassDef) and item.name.endswith("Column") | ||
} | ||
|
||
|
||
def parse_column_definition(item: ast.Assign) -> dict[str, any] | None: | ||
column = { | ||
"python_type": None, | ||
"name": None, | ||
"db_name": None, | ||
"nullable": None, | ||
"required": False, | ||
"upsert_strategy": "newest_non_null", | ||
"custom_column_props": {}, | ||
} | ||
if hasattr(item.value, "func") and hasattr(item.value.func, "id"): | ||
column["python_type"] = item.value.func.id | ||
|
||
if hasattr(item.value, "keywords"): | ||
for kw in item.value.keywords: | ||
if hasattr(kw.value, "value"): | ||
if kw.arg not in column.keys(): | ||
column["custom_column_props"][kw.arg] = kw.value.value | ||
else: | ||
# upsert_strategy is a special case | ||
if hasattr(kw.value, "attr"): | ||
column[kw.arg] = kw.value.attr | ||
else: | ||
column[kw.arg] = kw.value.value | ||
else: | ||
if not hasattr(kw.value, "keywords"): | ||
continue | ||
# An Array column that has a base_column | ||
column_params = ", ".join( | ||
[f"{kw2.arg}={kw2.value.value}" for kw2 in kw.value.keywords] | ||
) | ||
column["custom_column_props"][kw.arg] = ( | ||
f"{kw.value.func.id}({column_params})" | ||
) | ||
if not column.get("name"): | ||
return None | ||
column["db_name"] = column.get("db_name") or column["name"] | ||
if column["custom_column_props"] == {}: | ||
del column["custom_column_props"] | ||
if column["nullable"] is None: | ||
column["nullable"] = ( | ||
not column["required"] if column["required"] is not None else True | ||
) | ||
return column | ||
return None |
246 changes: 246 additions & 0 deletions
246
catalog/utilities/media_props_gen/generate_media_properties.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,246 @@ | ||
"""Automatic media properties generation.""" | ||
|
||
import logging | ||
import re | ||
from dataclasses import dataclass | ||
from pathlib import Path | ||
from typing import Any, Literal | ||
|
||
from column_parser import parse_python_columns | ||
|
||
|
||
log = logging.getLogger(__name__) | ||
# Silence noisy modules | ||
logging.getLogger("common.storage.media").setLevel(logging.WARNING) | ||
|
||
# Constants | ||
DOC_MD_PATH = Path(__file__).parent / "media_properties.md" | ||
SOURCE_MD_PATH = Path(__file__).parent / "media_props.md" | ||
LOCAL_POSTGRES_FOLDER = Path(__file__).parents[3] / "docker" / "upstream_db" | ||
|
||
SQL_PATH = { | ||
"image": LOCAL_POSTGRES_FOLDER / "0003_openledger_image_schema.sql", | ||
"audio": LOCAL_POSTGRES_FOLDER / "0006_openledger_audio_schema.sql", | ||
} | ||
sql_types = [ | ||
"integer", | ||
"boolean", | ||
"uuid", | ||
"double precision", | ||
"jsonb", | ||
"timestamp with time zone", | ||
"character varying", | ||
] | ||
sql_type_regex = re.compile(f"({'|'.join(sql_types)})") | ||
|
||
MediaType = Literal["audio", "image"] | ||
MEDIA_TYPES: list[MediaType] = ["audio", "image"] | ||
|
||
|
||
@dataclass | ||
class FieldInfo: | ||
name: str | ||
nullable: bool | ||
datatype: str | ||
constraint: str | ||
python_column: str = "" | ||
|
||
|
||
@dataclass | ||
class FieldSqlInfo: | ||
nullable: bool | ||
datatype: str | ||
constraint: str | ||
|
||
|
||
def create_db_props_dict( | ||
media_type: MediaType, | ||
) -> dict[Any, Any] | dict[Any, dict[str, FieldSqlInfo]]: | ||
""" | ||
Parse the DDL for a media type and returns a list of field | ||
sql definitions. | ||
""" | ||
|
||
create_table_regex = re.compile(r"CREATE\s+TABLE\s+\w+\.(\w+)\s+\(([\s\S]*?)\);") | ||
sql_path = SQL_PATH[media_type] | ||
|
||
with open(sql_path) as f: | ||
contents = f.read() | ||
table_description_matches = create_table_regex.search(contents) | ||
if not table_description_matches: | ||
print(f"Could not find table description for {media_type} in {sql_path}") | ||
return {} | ||
table_name = table_description_matches.group(1) | ||
if table_name != media_type: | ||
print(f"Table name {table_name} does not match media type {media_type}") | ||
return {} | ||
field_descriptions = [ | ||
field.strip() | ||
for field in table_description_matches.group(2).split("\n") | ||
if field.strip() | ||
] | ||
fields = {} | ||
for field in field_descriptions: | ||
field_name = field.split(" ")[0] | ||
# False if "not null" in field.lower() else True | ||
field_constraint = "" | ||
try: | ||
field_type = sql_type_regex.search(field).group(1) | ||
if field_type == "character varying": | ||
char_limit = field.split("(")[1].split(")")[0] | ||
field_constraint = f"({char_limit})" | ||
|
||
if "[]" in field: | ||
field_type = f"array of {field_type}" | ||
except AttributeError: | ||
raise ValueError(f"Could not find type for field {field_name} in {field}") | ||
|
||
fields[field_name] = { | ||
"sql": FieldSqlInfo( | ||
nullable="NOT NULL" not in field, | ||
datatype=field_type, | ||
constraint=field_constraint, | ||
) | ||
} | ||
return fields | ||
|
||
|
||
def add_column_props(media_props, python_columns): | ||
"""Add the python column properties to the media properties dictionary.""" | ||
for prop in media_props.keys(): | ||
if not (python_prop := python_columns.get(prop)): | ||
print(f"Column {prop} not found in table") | ||
python_prop = "" | ||
media_props[prop]["python_column"] = python_prop | ||
return media_props | ||
|
||
|
||
def parse_markdown() -> dict[str, dict[str, str]]: | ||
""" | ||
Parse the markdown documentation file and return a dictionary with the | ||
field name as key and the description as value. | ||
""" | ||
with open(SOURCE_MD_PATH) as f: | ||
contents = [line for line in f.readlines() if line.strip()] | ||
current_field = "" | ||
properties = {} | ||
prop = "" | ||
value = {} | ||
for i, line in enumerate(contents): | ||
if line.startswith("# "): | ||
if current_field and value: | ||
properties[current_field] = value | ||
current_field = line.replace("# ", "").strip() | ||
value = {} | ||
continue | ||
elif line.startswith("## "): | ||
prop = line.replace("## ", "").strip() | ||
value[prop] = "" | ||
continue | ||
else: | ||
value[prop] += line | ||
|
||
return properties | ||
|
||
|
||
def generate_media_props() -> dict: | ||
""" | ||
Generate a dictionary with the media properties from the database, | ||
python code and markdown documentation files. | ||
""" | ||
media_props = {} | ||
python_columns = parse_python_columns() | ||
|
||
for media_type in MEDIA_TYPES: | ||
media_props[media_type] = create_db_props_dict(media_type) | ||
media_props[media_type] = add_column_props( | ||
media_props[media_type], python_columns | ||
) | ||
return media_props | ||
|
||
|
||
def generate_media_props_table(media_properties) -> str: | ||
"""Generate the table with media properties.""" | ||
|
||
# Convert the list of FieldInfo objects to a md table | ||
table = "| DB Field | Python Column | \n" | ||
table += "| --- | --- | \n" | ||
for field_name, field in media_properties.items(): | ||
field_sql = field["sql"] | ||
field_db_type = ( | ||
field_sql.datatype | ||
if not field_sql.constraint | ||
else f"{field_sql.datatype} {field_sql.constraint}" | ||
) | ||
db_properties = ( | ||
f"{field_db_type}, {'nullable' if field_sql.nullable else 'non-nullable'}" | ||
) | ||
table += ( | ||
f"| [`{field_name}`](#{field_name}) ({db_properties}) | " | ||
f"{field.get('python_column', '')} | \n" | ||
) | ||
return table | ||
|
||
|
||
def description_dict_to_markdown(description: dict[str, str]) -> str: | ||
"""Convert a dictionary with field descriptions to a markdown string.""" | ||
description_md = "" | ||
for name, value in description.items(): | ||
description_md += f"#### {name}\n\n" | ||
description_md += f"{value}\n\n" | ||
return description_md | ||
|
||
|
||
def generate_media_props_doc( | ||
markdown_descriptions: dict, media_properties: dict | ||
) -> str: | ||
"""Generate the long-form documentation for each media property.""" | ||
media_docs = "" | ||
for prop, description in markdown_descriptions.items(): | ||
prop_heading = f"### {prop}\n\n" | ||
media_types = [ | ||
f"`{media_type}`" | ||
for media_type, value in media_properties.items() | ||
if prop in value.keys() | ||
] | ||
|
||
prop_heading += f"Media Types: {', '.join(media_types)}\n\n" | ||
if prop_doc := description_dict_to_markdown(description): | ||
media_docs += prop_heading + prop_doc | ||
|
||
return media_docs | ||
|
||
|
||
def generate_markdown_doc( | ||
media_properties: dict[str, dict], markdown_descriptions: dict[str, dict] | ||
) -> str: | ||
""" | ||
Generate the tables with media properties database column and | ||
Python objects characteristics. | ||
""" | ||
with open(Path(__file__).parent / "preamble.md") as f: | ||
preamble = f.read() | ||
media_props_doc = f"""{preamble} | ||
## Image Properties\n | ||
{generate_media_props_table(media_properties["image"])} | ||
""" # noqa 501 | ||
media_props_doc += f"""## Audio Properties\n | ||
{generate_media_props_table(media_properties["audio"])} | ||
""" | ||
media_props_doc += f"""## Media Property Descriptions\n | ||
{generate_media_props_doc(markdown_descriptions, media_properties)} | ||
""" | ||
return media_props_doc | ||
|
||
|
||
def write_media_props_doc(path: Path = DOC_MD_PATH) -> None: | ||
"""Generate the DAG documentation and write it to a file.""" | ||
media_properties = generate_media_props() | ||
markdown_descriptions = parse_markdown() | ||
doc_text = generate_markdown_doc(media_properties, markdown_descriptions) | ||
log.info(f"Writing DAG doc to {path}") | ||
path.write_text(doc_text) | ||
|
||
|
||
if __name__ == "__main__": | ||
write_media_props_doc() |
Oops, something went wrong.