Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][WIP] Rename type to type name #185

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions core/src/klio_core/config/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def wrapper(cls):
class KlioIOConfig(object):
io_type = attr.attrib(type=KlioIOType)
io_direction = attr.attrib(type=KlioIODirection)
name = attr.attrib(type=str)

# these must be filled in by subclasses so Klio knows what supports what
SUPPORTED_TYPES = []
Expand Down Expand Up @@ -116,7 +117,7 @@ def as_dict(self):
# since dicts preserve order by default in py3, let's force
# type to be first - particularly helpful/useful for dumping
# config via `klio job config show`
copy = {"type": self.name}
copy = {"type": self.type_name}
copy.update(config_dict)
return copy

Expand All @@ -141,15 +142,19 @@ def from_dict(cls, config_dict, *args, **kwargs):
# to be defined without a default after those that have defaults,
# we're inserting the default value here if the user doesn't have
# it already in their config
copy = config_dict.copy()
if "skip_klio_read" not in config_dict:
copy = config_dict.copy()
copy["skip_klio_read"] = False
if "name" not in config_dict:
copy["name"] = None
if "skip_klio_read" not in config_dict or "name" not in config_dict:
return super().from_dict(copy, *args, **kwargs)
return super().from_dict(config_dict, *args, **kwargs)

def to_io_kwargs(self):
kwargs = super().to_io_kwargs()
kwargs.pop("skip_klio_read", None)
kwargs.pop("name", None)
return kwargs


Expand All @@ -163,15 +168,19 @@ def from_dict(cls, config_dict, *args, **kwargs):
# to be defined without a default after those that have defaults,
# we're inserting the default value here if the user doesn't have
# it already in their config
copy = config_dict.copy()
if "skip_klio_write" not in config_dict:
copy = config_dict.copy()
copy["skip_klio_write"] = False
if "name" not in config_dict:
copy["name"] = None
if "skip_klio_write" not in config_dict or "name" not in config_dict:
return super().from_dict(copy, *args, **kwargs)
return super().from_dict(config_dict, *args, **kwargs)

def to_io_kwargs(self):
kwargs = super().to_io_kwargs()
kwargs.pop("skip_klio_write", None)
kwargs.pop("name", None)
return kwargs


Expand All @@ -185,9 +194,15 @@ def from_dict(cls, config_dict, *args, **kwargs):
# to be defined without a default after those that have defaults,
# we're inserting the default value here if the user doesn't have
# it already in their config
copy = config_dict.copy()
if "skip_klio_existence_check" not in config_dict:
copy = config_dict.copy()
copy["skip_klio_existence_check"] = False
if "name" not in config_dict:
copy["name"] = None
if (
"skip_klio_existence_check" not in config_dict
or "name" not in config_dict
):
return super().from_dict(copy, *args, **kwargs)
return super().from_dict(config_dict, *args, **kwargs)

Expand All @@ -199,7 +214,7 @@ def to_io_kwargs(self):

@attr.attrs(frozen=True)
class KlioPubSubConfig(object):
name = "pubsub"
type_name = "pubsub"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use TYPE_NAME in caps to indicate this is more like a class-level constant.

topic = attr.attrib(type=str)

@staticmethod
Expand Down Expand Up @@ -254,7 +269,7 @@ def from_dict(cls, config_dict, *args, **kwargs):


class KlioFileConfig(object):
name = "file"
type_name = "file"


@attr.attrs(frozen=True)
Expand Down Expand Up @@ -331,7 +346,7 @@ class KlioFileOutputDataConfig(KlioDataIOConfig, KlioFileConfig):


class KlioAvroConfig(object):
name = "avro"
type_name = "avro"


@attr.attrs(frozen=True)
Expand Down Expand Up @@ -387,7 +402,7 @@ def _convert_bigquery_input_coder(coder_str):

@attr.attrs(frozen=True)
class KlioBigQueryConfig(object):
name = "bq"
type_name = "bq"
project = attr.attrib(type=str, default=None)
dataset = attr.attrib(type=str, default=None)
table = attr.attrib(type=str, default=None)
Expand Down Expand Up @@ -493,7 +508,7 @@ def contains_invalid_fields(field_list):

@attr.attrs(frozen=True)
class KlioGCSConfig(KlioIOConfig):
name = "gcs"
type_name = "gcs"
location = attr.attrib(type=str)

@staticmethod
Expand Down
15 changes: 3 additions & 12 deletions core/src/klio_core/config/_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.
#

import collections
import string

import glom
Expand Down Expand Up @@ -46,7 +45,7 @@ def _transform_io_list(io_subsection_list):
"""Transform lists of dicts into a nested dict of dicts, where the keys
for the top-level dict come from the `name` field in the nested dict.
If `name` is not present, a name is auto-generated based on the index
of the I/O and it's type.
of the I/O.

example:

Expand Down Expand Up @@ -77,20 +76,12 @@ def _transform_io_list(io_subsection_list):
}
"""

type_counters = collections.defaultdict(int)
io_dict = {}
for conf in io_subsection_list:
for count, conf in enumerate(io_subsection_list):
if "name" in conf:
name = conf["name"]
# TODO: right now "name" isn't supported in IOConfig (conflicts
# with existing "name" attribute), once that's fixed we
# shouldn't drop name here
conf.pop("name")
else:
type_name = conf.get("type", "unknown")
type_id = type_counters[type_name]
type_counters[type_name] += 1
name = "{}{}".format(type_name, type_id)
name = count
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why the type is being dropped from the generated name? I'm not strongly for keeping it but I also don't see any clear reason for removing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i remember we discussed as a team and thought there was weirdness since there is another area https://github.com/spotify/klio/blob/develop/exec/src/klio_exec/commands/run.py#L466 where we again append an index to the type. i THINK it's used because multiple event reading requires a unique label (I'll cite this when i find the link) and this generated name was for the unique label. I'll ask the team and confirm if we want to keep or not and I can pluck off that commit if we do indeed want to keep it.

Theres also this TODO written by @econchick that touches on stuff disappearing if we convert to enforcing a dictionary for event inputs and i think the two are related. https://github.com/spotify/klio/blob/develop/exec/src/klio_exec/commands/run.py#L460


io_dict[name] = conf

Expand Down
2 changes: 1 addition & 1 deletion core/src/klio_core/config/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def traverse(cls):
def _create_config_objects(self, configs, io_type, io_direction):
options = dict(
[
(x.name, x)
(x.type_name, x)
for x in self._get_all_config_subclasses()
if x.supports_type(io_type)
and x.supports_direction(io_direction)
Expand Down
33 changes: 21 additions & 12 deletions core/tests/config/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,19 @@ def job_config_dict():
},
},
"outputs": {
"pubsub0": {"type": "pubsub", "topic": "test-job-out"}
"pubsub0": {
"type": "pubsub",
"topic": "test-job-out",
"name": "test-event-output0",
}
},
},
"data": {
"inputs": {
"gcs0": {
"type": "GCS",
"location": "gs://sigint-output/test-parent-job-out",
"name": "test-data-input0",
}
},
"outputs": {
Expand All @@ -63,49 +68,53 @@ def job_config_dict():
@pytest.fixture
def final_job_config_dict():
return {
"allow_non_klio_messages": False,
"metrics": {"logger": {}},
"blocking": False,
"events": {
"inputs": [
{
"type": "pubsub",
"name": None,
"skip_klio_read": False,
"topic": "test-parent-job-out",
"subscription": "test-parent-job-out-sub",
"skip_klio_read": False,
},
}
],
"outputs": [
{
"type": "pubsub",
"topic": "test-job-out",
"name": "test-event-output0",
"skip_klio_write": False,
"topic": "test-job-out",
}
],
},
"data": {
"inputs": [
{
"type": "gcs",
"location": "gs://sigint-output/test-parent-job-out",
"name": "test-data-input0",
"skip_klio_existence_check": False,
"location": "gs://sigint-output/test-parent-job-out",
"file_suffix": "",
"ping": False,
}
],
"outputs": [
{
"type": "gcs",
"name": None,
"skip_klio_existence_check": False,
"location": "gs://sigint-output/test-job-out",
"file_suffix": "",
"force": False,
"location": "gs://sigint-output/test-job-out",
"skip_klio_existence_check": False,
}
],
},
"more": "config",
"that": {"the": "user"},
"might": ["include"],
"blocking": False,
"allow_non_klio_messages": False,
}


Expand Down Expand Up @@ -427,7 +436,7 @@ def test_klio_read_file_config():
config_dict, io.KlioIOType.DATA, io.KlioIODirection.INPUT
)

assert "file" == klio_read_file_config.name
assert "file" == klio_read_file_config.type_name
assert config_dict["location"] == klio_read_file_config.file_pattern


Expand All @@ -440,7 +449,7 @@ def test_klio_write_file_config():
config_dict, io.KlioIOType.DATA, io.KlioIODirection.OUTPUT
)

assert "file" == klio_write_file_config.name
assert "file" == klio_write_file_config.type_name
assert config_dict["location"] == klio_write_file_config.file_path_prefix


Expand All @@ -461,7 +470,7 @@ def test_klio_write_bigquery_config():
config_dict, io.KlioIOType.EVENT, io.KlioIODirection.OUTPUT
)

assert "bq" == klio_write_bq_cfg.name
assert "bq" == klio_write_bq_cfg.type_name
assert config_dict["schema"] == klio_write_bq_cfg.schema
assert (
config_dict["create_disposition"]
Expand Down
103 changes: 59 additions & 44 deletions core/tests/config/test_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,53 +223,68 @@ def test_apply_templates(
assert expected == json.loads(new_dict)


def test_transform_io(kcp):
config = {
"job_config": {
"events": {
"inputs": [
{"type": "bq", "key": "value"},
{"type": "bq", "key": "value2"},
{"type": "gcs", "gcskey": "gcsvalue"},
],
"outputs": [
{"type": "bq", "key": "value", "name": "mybq"},
{"type": "bq", "key": "value2"},
],
},
"data": {
"inputs": [],
"outputs": [
{"type": "bq", "key": "value", "name": "mybq"},
{"type": "bq", "key": "value2"},
],
},
}
}
expected = {
"job_config": {
"events": {
"inputs": {
"bq0": {"type": "bq", "key": "value"},
"bq1": {"type": "bq", "key": "value2"},
"gcs0": {"type": "gcs", "gcskey": "gcsvalue"},
},
"outputs": {
"mybq": {"type": "bq", "key": "value"},
"bq0": {"type": "bq", "key": "value2"},
},
@pytest.mark.parametrize(
"config,expected",
(
# No overrides given - no changes in returned dict
(
{
"job_config": {
"events": {
"inputs": [
{"type": "bq", "key": "value"},
{"type": "bq", "key": "value2"},
{"type": "gcs", "gcskey": "gcsvalue"},
],
"outputs": [
{"type": "bq", "key": "value", "name": "mybq"},
{"type": "bq", "key": "value2"},
],
},
"data": {
"inputs": [],
"outputs": [
{"type": "bq", "key": "value", "name": "mybq"},
{"type": "bq", "key": "value2"},
],
},
}
},
"data": {
"inputs": {},
"outputs": {
"mybq": {"type": "bq", "key": "value"},
"bq0": {"type": "bq", "key": "value2"},
},
{
"job_config": {
"events": {
"inputs": {
0: {"type": "bq", "key": "value"},
1: {"type": "bq", "key": "value2"},
2: {"type": "gcs", "gcskey": "gcsvalue"},
},
"outputs": {
"mybq": {
"type": "bq",
"key": "value",
"name": "mybq",
},
1: {"type": "bq", "key": "value2"},
},
},
"data": {
"inputs": {},
"outputs": {
"mybq": {
"type": "bq",
"key": "value",
"name": "mybq",
},
1: {"type": "bq", "key": "value2"},
},
},
}
},
}
}
),
),
)
def test_transform_io(kcp, config, expected):
actual = kcp._transform_io_sections(config)

assert actual == expected


Expand Down
Loading