Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup CI workflow #3

Merged
merged 6 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: CI

on:
pull_request:
paths:
- carling/**
- .github/workflows/ci.yml

jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python: [3.7, 3.8, 3.9]
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python }}
- name: Install dependencies
run: |
python -m pip install poetry tox
- name: Test with tox
run: poetry run tox -e py,black,flake8,isort
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conf-platform

Running tox -e <platform_name> will run commands for a particular platform and skip the rest

61 changes: 45 additions & 16 deletions carling/__init__.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,59 @@
# isort: skip_file
__version__ = "0.3.1"

from .categorical import (
CreateCategoricalDicts,
DigestCategoricalColumns,
PairWithIndexNumber,
ReplaceCategoricalColumns,
)
from .group import (
UniqueOnly,
SingletonOnly,
Intersection,
DifferencePerKey,
FilterByKey,
FilterByKeyUsingSideInput,
DifferencePerKey,
Intersection,
MaxSelectPerKey,
PartitionRowsContainingNone,
SingletonOnly,
UniqueOnly,
)
from .mapping import (
Label,
Select,
Project,
Exclude,
IndexBy,
Stringify,
IndexBySingle,
Label,
Project,
RenameFromTo,
Exclude,
Select,
Stringify,
)
from .categorical import (
PairWithIndexNumber,
DigestCategoricalColumns,
CreateCategoricalDicts,
ReplaceCategoricalColumns,
from .util import LogSample, MemoizedValueProviderWrapper, ReifyMultiValueOption

__all__ = (
# categorical
"CreateCategoricalDicts",
"DigestCategoricalColumns",
"PairWithIndexNumber",
"ReplaceCategoricalColumns",
# group
"DifferencePerKey",
"FilterByKey",
"FilterByKeyUsingSideInput",
"Intersection",
"MaxSelectPerKey",
"PartitionRowsContainingNone",
"SingletonOnly",
"UniqueOnly",
# mapping
"Exclude",
"IndexBy",
"IndexBySingle",
"Label",
"Project",
"RenameFromTo",
"Select",
"Stringify",
# util
"LogSample",
"MemoizedValueProviderWrapper",
"ReifyMultiValueOption",
)
from .util import LogSample, ReifyMultiValueOption, MemoizedValueProviderWrapper
24 changes: 12 additions & 12 deletions carling/categorical.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from copy import deepcopy

import apache_beam as beam
from apache_beam.coders import VarIntCoder
from apache_beam.transforms.userstate import CombiningValueStateSpec


class _PairWithIndexNumberDoFn(beam.DoFn):
Expand Down Expand Up @@ -52,9 +50,9 @@ def _CreateCategoricalDict(pcoll, existing_dict_pairs):
"""
existing_max_value = (
existing_dict_pairs
| f"just values" >> beam.Map(lambda r: r[1])
| f"get max" >> beam.combiners.Top.Of(1)
| f"extract" >> beam.FlatMap(lambda r: r)
| "just values" >> beam.Map(lambda r: r[1])
| "get max" >> beam.combiners.Top.Of(1)
| "extract" >> beam.FlatMap(lambda r: r)
)

new_pairs = (
Expand Down Expand Up @@ -101,7 +99,8 @@ def CreateCategoricalDicts(pcoll, cat_cols, existing_dict_rows):

It then creates a transform which takes a pcollection and
- looks at the input pcoll for unseen values in each categorical column
- creates new unique integers for each distinct unseen value, starting at max(previous value for column)+1
- creates new unique integers for each distinct unseen value, starting at
max(previous value for column)+1
- ammends the existing mappings with (col, unseen_value, new_unique_int)

Output is:
Expand All @@ -115,9 +114,7 @@ def CreateCategoricalDicts(pcoll, cat_cols, existing_dict_rows):

existing_dicts = (
existing_dict_rows
| beam.Map(
lambda r: beam.pvalue.TaggedOutput(r[0], (r[1], r[2]))
).with_outputs()
| beam.Map(lambda r: beam.pvalue.TaggedOutput(r[0], (r[1], r[2]))).with_outputs()
)

for column in cat_cols:
Expand All @@ -131,16 +128,19 @@ def CreateCategoricalDicts(pcoll, cat_cols, existing_dict_rows):
# value by reference.
>> beam.Map(lambda r, column=column: r[column])
| _CreateCategoricalDict(existing_dict_pairs)
| f"re-append column [{column}]"
>> beam.Map(lambda r, column=column: (column, *r))
| f"re-append column [{column}]" >> beam.Map(lambda r, column=column: (column, *r))
)

return acc | beam.Flatten()


@beam.ptransform_fn
def ReplaceCategoricalColumns(
inputs, cat_cols, categorical_dict_rows, default_unseen=None, features_key=None
inputs,
cat_cols,
categorical_dict_rows,
default_unseen=None,
features_key=None,
):
"""
Utilizes the "categorical dictionary rows" generated by
Expand Down
37 changes: 8 additions & 29 deletions carling/group.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,9 @@
"""
Generic grouping transform utils

Author: Tsuyoki Kumazaki (tsuyoki@mcdigital.jp)
"""
from functools import reduce

import apache_beam as beam

from carling.iter_utils import is_none, is_some, take_as_singleton, take_top, unwrap_or_none
from carling.mapping import IndexBy
from carling.iter_utils import (
take_top,
is_none,
is_some,
unwrap,
unwrap_or_none,
take_as_singleton,
)


def _merge_two(x, y):
Expand Down Expand Up @@ -50,7 +38,6 @@ def expand(self, pcoll):


class UniqueOnly(beam.PTransform):

"""Produces elements that are the only elements per key after deduplication.

Given a `PCollection` of `(K, V)`,
Expand All @@ -76,7 +63,6 @@ def expand(self, pcoll):


class SingletonOnly(beam.PTransform):

"""Produces elements that are the only elements per key.

Given a `PCollection` of `(K, V)`,
Expand All @@ -93,8 +79,7 @@ def expand(self, pcoll):
return (
pcoll
| "Group" >> beam.GroupByKey()
| "Remove Non-singleton Elements"
>> beam.Map(lambda kv: take_as_singleton(kv[1]))
| "Remove Non-singleton Elements" >> beam.Map(lambda kv: take_as_singleton(kv[1]))
| "Remove None" >> beam.Filter(lambda v: len(v) > 0)
| "Unwrap Values" >> beam.Map(lambda v: v[0])
)
Expand All @@ -115,7 +100,6 @@ def process(self, row):


class Intersection(beam.PTransform):

"""Produces the intersection of given `PCollection`s.

Given a list of `PCollection`s,
Expand Down Expand Up @@ -152,7 +136,6 @@ def process(self, row):


class FilterByKey(beam.PTransform):

"""Filters elements by their keys.

The constructor receives one or more `PCollection`s of `K`s,
Expand All @@ -179,8 +162,7 @@ def expand(self, pcoll):

@beam.ptransform_fn
def FilterByKeyUsingSideInput(pcoll, lookup_entries, filter_key):
"""
Filters a single collection by a single lookup collection, using a common key.
"""Filters a single collection by a single lookup collection, using a common key.

Given:
- a `PCollection` (lookup_entries) of `(V)`, as a lookup collection
Expand Down Expand Up @@ -307,7 +289,6 @@ def process(self, row):


class DifferencePerKey(beam.PTransform):

"""Produces the difference per key between two `PCollection`s.

Given two `PCollection`s of `V`,
Expand Down Expand Up @@ -351,20 +332,18 @@ def MaxSelectPerKey(pcoll, index_keys, sort_key_fn, reverse=False):
return (
pcoll
| f"Index by {index_keys}" >> IndexBy(*index_keys)
| f"Top 1 per key"
>> beam.combiners.Top.PerKey(1, key=sort_key_fn, reverse=reverse)
| "Top 1 per key" >> beam.combiners.Top.PerKey(1, key=sort_key_fn, reverse=reverse)
| "De-Index" >> beam.Map(lambda k_v: k_v[1][0])
)


@beam.ptransform_fn
def PartitionRowsContainingNone(pcoll):
"""
Emits two tagged pcollections:
"""Emits two tagged pcollections:

- None: Default emitted collection.
Rows are guaranteed not to have any `None` values
- contains_none: At least one column in the row had a `None` value
- None: Default emitted collection.
Rows are guaranteed not to have any `None` values
- contains_none: At least one column in the row had a `None` value
"""

def _separator(row):
Expand Down
7 changes: 5 additions & 2 deletions carling/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#!/usr/bin/env python3
from .avro_schema import generate_avro_schema_from_template, load_avro_schema

from .avro_schema import load_avro_schema, generate_avro_schema_from_template
__all__ = (
"generate_avro_schema_from_template",
"load_avro_schema",
)
16 changes: 9 additions & 7 deletions carling/iter_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from .iter_utils import (
take_top,
is_some,
is_none,
unwrap,
unwrap_or_none,
take_as_singleton,
from .iter_utils import is_none, is_some, take_as_singleton, take_top, unwrap, unwrap_or_none

__all__ = (
"is_none",
"is_some",
"take_as_singleton",
"take_top",
"unwrap",
"unwrap_or_none",
)
5 changes: 0 additions & 5 deletions carling/iter_utils/iter_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
"""
Generic iter utils
Author: Tsuyoki Kumazaki (tsuyoki@mcdigital.jp)
"""

import itertools


Expand Down
24 changes: 6 additions & 18 deletions carling/mapping.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,20 @@
"""
Generic mapping transform utils

Author: Tsuyoki Kumazaki (tsuyoki@mcdigital.jp)
"""

import json

import apache_beam as beam


def Label(**labels):
"""Labels all elements.
"""
"""Labels all elements."""
return "Label" >> beam.Map(lambda r: {**r, **labels})


def Select(*keys):
"""Removes all columns which are not specified in `*keys`.
"""
"""Removes all columns which are not specified in `*keys`."""
return "Select" >> beam.Map(lambda r: {k: r[k] for k in keys})


def Project(*keys):
"""Transforms each element into a tuple of values of the specified columns.
"""
"""Transforms each element into a tuple of values of the specified columns."""
return "Project" >> beam.Map(lambda r: tuple(r[k] for k in keys))


Expand All @@ -45,8 +36,7 @@ def _decimal_default_proc(obj):


def Stringify():
"""Transforms each element into its JSON representation.
"""
"""Transforms each element into its JSON representation."""

def s(obj):
return json.dumps(obj, default=_decimal_default_proc)
Expand All @@ -66,8 +56,7 @@ def IndexBySingle(key):


def RenameFromTo(from_to_key_mapping):
"""Rename columns according to `from_to_key_mapping`.
"""
"""Rename columns according to `from_to_key_mapping`."""

def rename(row):
res = dict(row)
Expand All @@ -81,8 +70,7 @@ def rename(row):


def Exclude(*keys):
"""Removes all columns specified in `*keys`.
"""
"""Removes all columns specified in `*keys`."""

def exclude(row):
res = dict(row)
Expand Down
Empty file added carling/py.typed
Empty file.
2 changes: 2 additions & 0 deletions carling/test_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .test_utils import pprint_equal_to

__all__ = ("pprint_equal_to",)
Loading