Skip to content

Commit

Permalink
apply linter and formatters
Browse files Browse the repository at this point in the history
  • Loading branch information
mhiro2 committed Oct 19, 2021
1 parent 5f6ca92 commit 1884638
Show file tree
Hide file tree
Showing 14 changed files with 125 additions and 178 deletions.
61 changes: 45 additions & 16 deletions carling/__init__.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,59 @@
# isort: skip_file
__version__ = "0.3.1"

from .categorical import (
CreateCategoricalDicts,
DigestCategoricalColumns,
PairWithIndexNumber,
ReplaceCategoricalColumns,
)
from .group import (
UniqueOnly,
SingletonOnly,
Intersection,
DifferencePerKey,
FilterByKey,
FilterByKeyUsingSideInput,
DifferencePerKey,
Intersection,
MaxSelectPerKey,
PartitionRowsContainingNone,
SingletonOnly,
UniqueOnly,
)
from .mapping import (
Label,
Select,
Project,
Exclude,
IndexBy,
Stringify,
IndexBySingle,
Label,
Project,
RenameFromTo,
Exclude,
Select,
Stringify,
)
from .categorical import (
PairWithIndexNumber,
DigestCategoricalColumns,
CreateCategoricalDicts,
ReplaceCategoricalColumns,
from .util import LogSample, MemoizedValueProviderWrapper, ReifyMultiValueOption

__all__ = (
# categorical
"CreateCategoricalDicts",
"DigestCategoricalColumns",
"PairWithIndexNumber",
"ReplaceCategoricalColumns",
# group
"DifferencePerKey",
"FilterByKey",
"FilterByKeyUsingSideInput",
"Intersection",
"MaxSelectPerKey",
"PartitionRowsContainingNone",
"SingletonOnly",
"UniqueOnly",
# mapping
"Exclude",
"IndexBy",
"IndexBySingle",
"Label",
"Project",
"RenameFromTo",
"Select",
"Stringify",
# util
"LogSample",
"MemoizedValueProviderWrapper",
"ReifyMultiValueOption",
)
from .util import LogSample, ReifyMultiValueOption, MemoizedValueProviderWrapper
24 changes: 12 additions & 12 deletions carling/categorical.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from copy import deepcopy

import apache_beam as beam
from apache_beam.coders import VarIntCoder
from apache_beam.transforms.userstate import CombiningValueStateSpec


class _PairWithIndexNumberDoFn(beam.DoFn):
Expand Down Expand Up @@ -52,9 +50,9 @@ def _CreateCategoricalDict(pcoll, existing_dict_pairs):
"""
existing_max_value = (
existing_dict_pairs
| f"just values" >> beam.Map(lambda r: r[1])
| f"get max" >> beam.combiners.Top.Of(1)
| f"extract" >> beam.FlatMap(lambda r: r)
| "just values" >> beam.Map(lambda r: r[1])
| "get max" >> beam.combiners.Top.Of(1)
| "extract" >> beam.FlatMap(lambda r: r)
)

new_pairs = (
Expand Down Expand Up @@ -101,7 +99,8 @@ def CreateCategoricalDicts(pcoll, cat_cols, existing_dict_rows):
It then creates a transform which takes a pcollection and
- looks at the input pcoll for unseen values in each categorical column
- creates new unique integers for each distinct unseen value, starting at max(previous value for column)+1
- creates new unique integers for each distinct unseen value, starting at
max(previous value for column)+1
- ammends the existing mappings with (col, unseen_value, new_unique_int)
Output is:
Expand All @@ -115,9 +114,7 @@ def CreateCategoricalDicts(pcoll, cat_cols, existing_dict_rows):

existing_dicts = (
existing_dict_rows
| beam.Map(
lambda r: beam.pvalue.TaggedOutput(r[0], (r[1], r[2]))
).with_outputs()
| beam.Map(lambda r: beam.pvalue.TaggedOutput(r[0], (r[1], r[2]))).with_outputs()
)

for column in cat_cols:
Expand All @@ -131,16 +128,19 @@ def CreateCategoricalDicts(pcoll, cat_cols, existing_dict_rows):
# value by reference.
>> beam.Map(lambda r, column=column: r[column])
| _CreateCategoricalDict(existing_dict_pairs)
| f"re-append column [{column}]"
>> beam.Map(lambda r, column=column: (column, *r))
| f"re-append column [{column}]" >> beam.Map(lambda r, column=column: (column, *r))
)

return acc | beam.Flatten()


@beam.ptransform_fn
def ReplaceCategoricalColumns(
inputs, cat_cols, categorical_dict_rows, default_unseen=None, features_key=None
inputs,
cat_cols,
categorical_dict_rows,
default_unseen=None,
features_key=None,
):
"""
Utilizes the "categorical dictionary rows" generated by
Expand Down
37 changes: 8 additions & 29 deletions carling/group.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,9 @@
"""
Generic grouping transform utils
Author: Tsuyoki Kumazaki (tsuyoki@mcdigital.jp)
"""
from functools import reduce

import apache_beam as beam

from carling.iter_utils import is_none, is_some, take_as_singleton, take_top, unwrap_or_none
from carling.mapping import IndexBy
from carling.iter_utils import (
take_top,
is_none,
is_some,
unwrap,
unwrap_or_none,
take_as_singleton,
)


def _merge_two(x, y):
Expand Down Expand Up @@ -50,7 +38,6 @@ def expand(self, pcoll):


class UniqueOnly(beam.PTransform):

"""Produces elements that are the only elements per key after deduplication.
Given a `PCollection` of `(K, V)`,
Expand All @@ -76,7 +63,6 @@ def expand(self, pcoll):


class SingletonOnly(beam.PTransform):

"""Produces elements that are the only elements per key.
Given a `PCollection` of `(K, V)`,
Expand All @@ -93,8 +79,7 @@ def expand(self, pcoll):
return (
pcoll
| "Group" >> beam.GroupByKey()
| "Remove Non-singleton Elements"
>> beam.Map(lambda kv: take_as_singleton(kv[1]))
| "Remove Non-singleton Elements" >> beam.Map(lambda kv: take_as_singleton(kv[1]))
| "Remove None" >> beam.Filter(lambda v: len(v) > 0)
| "Unwrap Values" >> beam.Map(lambda v: v[0])
)
Expand All @@ -115,7 +100,6 @@ def process(self, row):


class Intersection(beam.PTransform):

"""Produces the intersection of given `PCollection`s.
Given a list of `PCollection`s,
Expand Down Expand Up @@ -152,7 +136,6 @@ def process(self, row):


class FilterByKey(beam.PTransform):

"""Filters elements by their keys.
The constructor receives one or more `PCollection`s of `K`s,
Expand All @@ -179,8 +162,7 @@ def expand(self, pcoll):

@beam.ptransform_fn
def FilterByKeyUsingSideInput(pcoll, lookup_entries, filter_key):
"""
Filters a single collection by a single lookup collection, using a common key.
"""Filters a single collection by a single lookup collection, using a common key.
Given:
- a `PCollection` (lookup_entries) of `(V)`, as a lookup collection
Expand Down Expand Up @@ -307,7 +289,6 @@ def process(self, row):


class DifferencePerKey(beam.PTransform):

"""Produces the difference per key between two `PCollection`s.
Given two `PCollection`s of `V`,
Expand Down Expand Up @@ -351,20 +332,18 @@ def MaxSelectPerKey(pcoll, index_keys, sort_key_fn, reverse=False):
return (
pcoll
| f"Index by {index_keys}" >> IndexBy(*index_keys)
| f"Top 1 per key"
>> beam.combiners.Top.PerKey(1, key=sort_key_fn, reverse=reverse)
| "Top 1 per key" >> beam.combiners.Top.PerKey(1, key=sort_key_fn, reverse=reverse)
| "De-Index" >> beam.Map(lambda k_v: k_v[1][0])
)


@beam.ptransform_fn
def PartitionRowsContainingNone(pcoll):
"""
Emits two tagged pcollections:
"""Emits two tagged pcollections:
- None: Default emitted collection.
Rows are guaranteed not to have any `None` values
- contains_none: At least one column in the row had a `None` value
- None: Default emitted collection.
Rows are guaranteed not to have any `None` values
- contains_none: At least one column in the row had a `None` value
"""

def _separator(row):
Expand Down
7 changes: 5 additions & 2 deletions carling/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#!/usr/bin/env python3
from .avro_schema import generate_avro_schema_from_template, load_avro_schema

from .avro_schema import load_avro_schema, generate_avro_schema_from_template
__all__ = (
"generate_avro_schema_from_template",
"load_avro_schema",
)
16 changes: 9 additions & 7 deletions carling/iter_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from .iter_utils import (
take_top,
is_some,
is_none,
unwrap,
unwrap_or_none,
take_as_singleton,
from .iter_utils import is_none, is_some, take_as_singleton, take_top, unwrap, unwrap_or_none

__all__ = (
"is_none",
"is_some",
"take_as_singleton",
"take_top",
"unwrap",
"unwrap_or_none",
)
5 changes: 0 additions & 5 deletions carling/iter_utils/iter_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
"""
Generic iter utils
Author: Tsuyoki Kumazaki (tsuyoki@mcdigital.jp)
"""

import itertools


Expand Down
24 changes: 6 additions & 18 deletions carling/mapping.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,20 @@
"""
Generic mapping transform utils
Author: Tsuyoki Kumazaki (tsuyoki@mcdigital.jp)
"""

import json

import apache_beam as beam


def Label(**labels):
"""Labels all elements.
"""
"""Labels all elements."""
return "Label" >> beam.Map(lambda r: {**r, **labels})


def Select(*keys):
"""Removes all columns which are not specified in `*keys`.
"""
"""Removes all columns which are not specified in `*keys`."""
return "Select" >> beam.Map(lambda r: {k: r[k] for k in keys})


def Project(*keys):
"""Transforms each element into a tuple of values of the specified columns.
"""
"""Transforms each element into a tuple of values of the specified columns."""
return "Project" >> beam.Map(lambda r: tuple(r[k] for k in keys))


Expand All @@ -45,8 +36,7 @@ def _decimal_default_proc(obj):


def Stringify():
"""Transforms each element into its JSON representation.
"""
"""Transforms each element into its JSON representation."""

def s(obj):
return json.dumps(obj, default=_decimal_default_proc)
Expand All @@ -66,8 +56,7 @@ def IndexBySingle(key):


def RenameFromTo(from_to_key_mapping):
"""Rename columns according to `from_to_key_mapping`.
"""
"""Rename columns according to `from_to_key_mapping`."""

def rename(row):
res = dict(row)
Expand All @@ -81,8 +70,7 @@ def rename(row):


def Exclude(*keys):
"""Removes all columns specified in `*keys`.
"""
"""Removes all columns specified in `*keys`."""

def exclude(row):
res = dict(row)
Expand Down
2 changes: 2 additions & 0 deletions carling/test_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .test_utils import pprint_equal_to

__all__ = ("pprint_equal_to",)
9 changes: 3 additions & 6 deletions carling/test_utils/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from apache_beam.testing.util import BeamAssertException
import pprint

from apache_beam.testing.util import BeamAssertException
from deepdiff import DeepDiff


def format_msg(msg_parts):

return "\n\n".join(
[
(
Expand Down Expand Up @@ -66,10 +66,7 @@ def _equal(actual):
msg_parts.append(("Missing", expected_list))

if deepdiff:
dds = []
msg_parts.append(
("DeepDiff (expected / actual)", DeepDiff(expected, actual))
)
msg_parts.append(("DeepDiff (expected / actual)", DeepDiff(expected, actual)))
raise BeamAssertException(format_msg(msg_parts))

return _equal
Loading

0 comments on commit 1884638

Please sign in to comment.