Skip to content

Commit

Permalink
Merge branch 'aj-flattener-2' into 'main'
Browse files Browse the repository at this point in the history
Add record and schema flattener in Stream Maps

See merge request meltano/sdk!236
  • Loading branch information
AJ Steers committed Feb 4, 2022
2 parents 6232d3e + 010b41e commit 41b1b65
Show file tree
Hide file tree
Showing 11 changed files with 564 additions and 143 deletions.
24 changes: 24 additions & 0 deletions docs/stream_maps.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,30 @@ SDK-based taps, targets, and mappers automatically support the custom inline map
- **Property-level additions:** new properties can be created based on inline user-defined
expressions.

### Schema Flattening Applications

- ***Flatten nested properties:** separates large complex properties into multiple distinct fields.

For instance, a complex `user` property may look like this:

```js
{
// ...
"user": {
"first_name": "Jane",
"last_name": "Carter",
"id": "jcarter"
}
}
```

Rather than receive the entire record as one large structure, flattening the record would output
three distinct fields:

- `user__first_name`
- `user__last_name`
- `user__id`

## Out-of-scope capabilities

These capabilities are all out of scope _by design_:
Expand Down
11 changes: 1 addition & 10 deletions samples/sample_target_parquet/parquet_target_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import pyarrow as pa
import pyarrow.parquet as pq

from singer_sdk.helpers._flattening import RecordFlattener
from singer_sdk.sinks import BatchSink


Expand All @@ -22,15 +21,7 @@ def process_batch(self, context: dict) -> None:
schema = pa.schema([("some_int", pa.int32()), ("some_string", pa.string())])
writer = pq.ParquetWriter(self.config["filepath"], schema)

count = 0
flattened_records = []
flattener = RecordFlattener()
for record in records_to_drain:
flatten_record = flattener.flatten_record(record, schema, level=0)
flattened_records.append(flatten_record)
count += 1

df = pandas.DataFrame(data=flattened_records)
df = pandas.DataFrame(data=records_to_drain)
table = pa.Table.from_pandas(df)
writer.write_table(table)
writer.close()
Expand Down
293 changes: 233 additions & 60 deletions singer_sdk/helpers/_flattening.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,252 @@
"""Internal helper library for record flatteting functions."""

import collections
import itertools
import json
import re
from typing import Optional
from typing import Any, List, Mapping, MutableMapping, NamedTuple, Optional, Tuple

import inflection

DEFAULT_FLATTENING_SEPARATOR = "__"

class RecordFlattener:
"""Flattens hierarchical records into 2-dimensional ones."""

sep: str
max_level: Optional[int]
class FlatteningOptions(NamedTuple):
"""A stream map which performs the flattening role."""

def __init__(self, sep: str = "__", max_level: int = None):
"""Initialize flattener."""
self.sep = sep
self.max_level = max_level
max_level: int
flattening_enabled: bool = True
separator: str = DEFAULT_FLATTENING_SEPARATOR

def flatten_key(self, k, parent_key):
"""Return a flattened version of the key."""
full_key = parent_key + [k]
inflected_key = full_key.copy()
reducer_index = 0
while len(self.sep.join(inflected_key)) >= 255 and reducer_index < len(
inflected_key
):
reduced_key = re.sub(
r"[a-z]", "", inflection.camelize(inflected_key[reducer_index])
)
inflected_key[reducer_index] = (
reduced_key
if len(reduced_key) > 1
else inflected_key[reducer_index][0:3]
).lower()
reducer_index += 1
return self.sep.join(inflected_key)

def flatten_record(self, d, flatten_schema=None, parent_key=None, level=0):
"""Return a flattened version of the record."""
items = []
parent_key = parent_key or []
for k, v in d.items():
new_key = self._flatten_key(k, parent_key)
if isinstance(v, collections.MutableMapping) and level < self.max_level:

def get_flattening_options(
plugin_config: Mapping,
) -> Optional[FlatteningOptions]:
"""Get flattening options, if flattening is enabled.
Args:
plugin_config: The tap or target config dictionary.
Returns:
A new FlatteningOptions object or None if flattening is disabled.
"""
if "flattening_enabled" in plugin_config and plugin_config["flattening_enabled"]:
return FlatteningOptions(max_level=int(plugin_config["flattening_max_depth"]))

return None


def flatten_key(key_name: str, parent_keys: List[str], separator: str = "__") -> str:
"""Concatenate `key_name` with its `parent_keys` using `separator`.
Args:
key_name: The node's key.
parent_keys: A list of parent keys which are ancestors to this node.
separator: The separator used during concatenation. Defaults to "__".
Returns:
The flattened key name as a string.
"""
full_key = parent_keys + [key_name]
inflected_key = full_key.copy()
reducer_index = 0
while len(separator.join(inflected_key)) >= 255 and reducer_index < len(
inflected_key
):
reduced_key = re.sub(
r"[a-z]", "", inflection.camelize(inflected_key[reducer_index])
)
inflected_key[reducer_index] = (
reduced_key if len(reduced_key) > 1 else inflected_key[reducer_index][0:3]
).lower()
reducer_index += 1

return separator.join(inflected_key)


def flatten_schema(
schema: dict,
max_level: int,
separator: str = "__",
) -> dict:
"""Flatten the provided schema up to a depth of max_level.
Args:
schema: The schema definition to flatten.
separator: The string to use when concatenating key names.
max_level: The max recursion level (zero-based, exclusive).
Returns:
A flattened version of the provided schema definition.
"""
return _flatten_schema(schema_node=schema, max_level=max_level, separator=separator)


def _flatten_schema(
schema_node: dict,
parent_keys: List[str] = None,
separator: str = "__",
level: int = 0,
max_level: int = 0,
) -> dict:
"""Flatten the provided schema node, recursively up to depth of `max_level`.
Args:
schema_node: The schema node to flatten.
parent_key: The parent's key, provided as a list of node names.
separator: The string to use when concatenating key names.
level: The current recursion level (zero-based).
max_level: The max recursion level (zero-based, exclusive).
Returns:
A flattened version of the provided node.
"""
if parent_keys is None:
parent_keys = []

items: List[Tuple[str, dict]] = []
if "properties" not in schema_node:
return {}

for k, v in schema_node["properties"].items():
new_key = flatten_key(k, parent_keys, separator)
if "type" in v.keys():
if "object" in v["type"] and "properties" in v and level < max_level:
items.extend(
self._flatten_record(
_flatten_schema(
v,
flatten_schema,
parent_key + [k],
sep=self.sep,
parent_keys + [k],
separator=separator,
level=level + 1,
max_level=max_level,
).items()
)
else:
items.append(
(
new_key,
json.dumps(v)
if self._should_json_dump_value(k, v, flatten_schema)
else v,
)
items.append((new_key, v))
else:
if len(v.values()) > 0:
if list(v.values())[0][0]["type"] == "string":
list(v.values())[0][0]["type"] = ["null", "string"]
items.append((new_key, list(v.values())[0][0]))
elif list(v.values())[0][0]["type"] == "array":
list(v.values())[0][0]["type"] = ["null", "array"]
items.append((new_key, list(v.values())[0][0]))
elif list(v.values())[0][0]["type"] == "object":
list(v.values())[0][0]["type"] = ["null", "object"]
items.append((new_key, list(v.values())[0][0]))

# Sort and check for duplicates
def _key_func(item):
return item[0] # first item is tuple is the key name.

sorted_items = sorted(items, key=_key_func)
for k, g in itertools.groupby(sorted_items, key=_key_func):
if len(list(g)) > 1:
raise ValueError(f"Duplicate column name produced in schema: {k}")

# Return the (unsorted) result as a dict.
return dict(items)


def flatten_record(
record: dict,
flattened_schema: dict,
max_level: int,
separator: str = "__",
) -> dict:
"""Flatten a record up to max_level.
Args:
record: The record to flatten.
flattened_schema: The already flattened schema.
separator: The string used to separate concatenated key names. Defaults to "__".
max_level: The maximum depth of keys to flatten recursively.
Returns:
A flattened version of the record.
"""
return _flatten_record(
record_node=record,
flattened_schema=flattened_schema,
separator=separator,
max_level=max_level,
)


def _flatten_record(
record_node: MutableMapping[Any, Any],
flattened_schema: dict = None,
parent_key: List[str] = None,
separator: str = "__",
level: int = 0,
max_level: int = 0,
) -> dict:
"""This recursive function flattens the record node.
The current invocation is expected to be at `level` and will continue recursively
until the provided `max_level` is reached.
Args:
record_node: The record node to flatten.
flattened_schema: The already flattened full schema for the record.
parent_key: The parent's key, provided as a list of node names.
separator: The string to use when concatenating key names.
level: The current recursion level (zero-based).
max_level: The max recursion level (zero-based, exclusive).
Returns:
A flattened version of the provided node.
"""
if parent_key is None:
parent_key = []

items: List[Tuple[str, Any]] = []
for k, v in record_node.items():
new_key = flatten_key(k, parent_key, separator)
if isinstance(v, collections.abc.MutableMapping) and level < max_level:
items.extend(
_flatten_record(
v,
flattened_schema,
parent_key + [k],
separator=separator,
level=level + 1,
max_level=max_level,
).items()
)
else:
items.append(
(
new_key,
json.dumps(v)
if _should_jsondump_value(k, v, flattened_schema)
else v,
)
return dict(items)

@staticmethod
def _should_json_dump_value(key, value, flatten_schema=None) -> bool:
if isinstance(value, (dict, list)):
return True
if (
flatten_schema
and key in flatten_schema
and "type" in flatten_schema[key]
and set(flatten_schema[key]["type"]) == {"null", "object", "array"}
):
return True
return False
)

return dict(items)


def _should_jsondump_value(key: str, value: Any, flattened_schema=None) -> bool:
"""Return True if json.dump() should be used to serialize the value.
Args:
key: [description]
value: [description]
schema: [description]. Defaults to None.
Returns:
[description]
"""
if isinstance(value, (dict, list)):
return True

if (
flattened_schema
and key in flattened_schema
and "type" in flattened_schema[key]
and set(flattened_schema[key]["type"]) == {"null", "object", "array"}
):
return True

return False
Loading

0 comments on commit 41b1b65

Please sign in to comment.