From 06f1f73b2100a189bdbf362742dca566fe8931a8 Mon Sep 17 00:00:00 2001 From: Joshua Hoskins Date: Wed, 16 Sep 2020 11:20:25 -0700 Subject: [PATCH] Feedback 1 Remove streams Move fixtures to amundsen_common/tests Stat as additional class, deprecate Statistics Signed-off-by: Joshua Hoskins --- amundsen_common/models/table.py | 20 ++ amundsen_common/tests/__init__.py | 2 + amundsen_common/{ => tests}/fixtures.py | 3 + amundsen_common/utils/streams.py | 388 ------------------------ tests/tests/__init__.py | 2 + tests/{unit => tests}/test_fixtures.py | 18 +- tests/unit/utils/test_streams.py | 205 ------------- 7 files changed, 36 insertions(+), 602 deletions(-) create mode 100644 amundsen_common/tests/__init__.py rename amundsen_common/{ => tests}/fixtures.py (98%) delete mode 100644 amundsen_common/utils/streams.py create mode 100644 tests/tests/__init__.py rename tests/{unit => tests}/test_fixtures.py (87%) delete mode 100644 tests/unit/utils/test_streams.py diff --git a/amundsen_common/models/table.py b/amundsen_common/models/table.py index 0637bc4..bd535b0 100644 --- a/amundsen_common/models/table.py +++ b/amundsen_common/models/table.py @@ -60,6 +60,26 @@ class Meta: register_as_scheme = True +@attr.s(auto_attribs=True, kw_only=True) +class Statistics: + """ + DEPRECATED. Use Stat + """ + stat_type: str + stat_val: Optional[str] = None + start_epoch: Optional[int] = None + end_epoch: Optional[int] = None + + +class StatisticsSchema(AttrsSchema): + """ + DEPRECATED. Use StatSchema + """ + class Meta: + target = Statistics + register_as_scheme = True + + @attr.s(auto_attribs=True, kw_only=True) class Stat: stat_type: str diff --git a/amundsen_common/tests/__init__.py b/amundsen_common/tests/__init__.py new file mode 100644 index 0000000..f3145d7 --- /dev/null +++ b/amundsen_common/tests/__init__.py @@ -0,0 +1,2 @@ +# Copyright Contributors to the Amundsen project. +# SPDX-License-Identifier: Apache-2.0 diff --git a/amundsen_common/fixtures.py b/amundsen_common/tests/fixtures.py similarity index 98% rename from amundsen_common/fixtures.py rename to amundsen_common/tests/fixtures.py index 18b6a89..4fbd040 100644 --- a/amundsen_common/fixtures.py +++ b/amundsen_common/tests/fixtures.py @@ -11,6 +11,9 @@ class Fixtures: + """ + These fixtures are useful for creating test objects. For an example usage, check out tests/tests/test_fixtures.py + """ counter = 1000 @staticmethod diff --git a/amundsen_common/utils/streams.py b/amundsen_common/utils/streams.py deleted file mode 100644 index 40ff842..0000000 --- a/amundsen_common/utils/streams.py +++ /dev/null @@ -1,388 +0,0 @@ -# Copyright Contributors to the Amundsen project. -# SPDX-License-Identifier: Apache-2.0 - -import logging -import threading -from typing import (Any, AsyncIterator, Callable, Collection, Iterable, - Iterator, List, Optional, Tuple, TypeVar, Union) - -from typing_extensions import Final, final - -LOGGER = logging.getLogger(__name__) - - -V = TypeVar('V') -R = TypeVar('R') - - -def one(ignored: Any) -> int: - return 1 - - -class PeekingIterator(Iterator[V]): - """ - Like Iterator, but with peek(), peek_default(), and take_peeked() - """ - def __init__(self, iterable: Iterable[V]): - self.it: Final[Iterator[V]] = iterable if isinstance(iterable, Iterator) else iter(iterable) - self.has_peeked_value = False - self.peeked_value: Optional[V] = None - # RLock could make sense, but it would be just weird for the same thread to try to peek from same blocking - # iterator - self.lock: Final[threading.Lock] = threading.Lock() - - @final - # @overrides Iterator but @overrides doesn't like - def __next__(self) -> V: - """ - :return: the previously peeked value or the next - :raises StopIteration if there is no more values - """ - with self.lock: - value: V - if self.has_peeked_value: - value = self.peeked_value # type: ignore - self.peeked_value = None - self.has_peeked_value = False - else: - value = next(self.it) - assert not self.has_peeked_value - return value - - @final - def peek(self) -> V: - """ - :return: the previously peeked value or the next - :raises StopIteration if there is no more values - """ - with self.lock: - if not self.has_peeked_value: - self.peeked_value = next(self.it) - self.has_peeked_value = True - assert self.has_peeked_value - return self.peeked_value # type: ignore - - @final - def peek_default(self, default: Optional[V]) -> Optional[V]: - """ - :return: the previously peeked value or the next, or default if no more values - """ - try: - return self.peek() - except StopIteration: - return default - - @final - def take_peeked(self, value: V) -> None: - with self.lock: - assert self.has_peeked_value, f'expected to find a peaked value' - assert self.peeked_value is value, f'expected the peaked value to be the same' - self.peeked_value = None - self.has_peeked_value = False - - @final - def has_more(self) -> bool: - try: - self.peek() - return True - except StopIteration: - return False - - -class PeekingAsyncIterator(AsyncIterator[V]): - """ - Like AsyncIterator, but with peek(), peek_default(), and take_peeked() - """ - def __init__(self, iterable: AsyncIterator[V]): - self.it: Final[AsyncIterator[V]] = iterable - self.has_peeked_value = False - self.peeked_value: Optional[V] = None - # RLock could make sense, but it would be just weird for the same thread to try to peek from same blocking - # iterator - self.lock: Final[threading.Lock] = threading.Lock() - - @final - # @overrides AsyncIterator but @overrides doesn't like - async def __anext__(self) -> V: - """ - :return: the previously peeked value or the next - :raises StopAsyncIteration if there is no more values - """ - with self.lock: - value: V - if self.has_peeked_value: - value = self.peeked_value # type: ignore - self.peeked_value = None - self.has_peeked_value = False - else: - value = await self.__anext__() - assert not self.has_peeked_value - return value - - @final - async def peek(self) -> V: - """ - :return: the previously peeked value or the next - :raises StopAsyncIteration if there is no more values - """ - with self.lock: - if not self.has_peeked_value: - self.peeked_value = await self.it.__anext__() - self.has_peeked_value = True - assert self.has_peeked_value - return self.peeked_value # type: ignore - - @final - async def peek_default(self, default: Optional[V]) -> Optional[V]: - """ - :return: the previously peeked value or the next, or default if no more values - """ - try: - return await self.peek() - except StopAsyncIteration: - return default - - @final - def take_peeked(self, value: V) -> None: - with self.lock: - assert self.has_peeked_value, f'expected to find a peaked value' - assert self.peeked_value is value, f'expected the peaked value to be the same' - self.peeked_value = None - self.has_peeked_value = False - - @final - async def has_more(self) -> bool: - try: - await self.peek() - return True - except StopAsyncIteration: - return False - - -def one_chunk(*, it: PeekingIterator[V], n: int, metric: Callable[[V], int]) -> Tuple[Iterable[V], bool]: - """ - :param it: stream of values as a PeekingIterator (or regular iterable if you are only going to take the first chunk - and don't care about the peeked value being consumed) - :param n: consume stream until n is reached. if n is 0, process whole stream as one chunk. - :param metric: the callable that returns positive metric for a value - :returns the chunk - """ - items: List[V] = [] - items_metric: int = 0 - try: - while True: - item = it.peek() - item_metric = metric(item) - # negative would be insane, let's say positive - assert item_metric > 0, \ - f'expected metric to be positive! item_metric={item_metric}, metric={metric}, item={item}' - if not items and item_metric > n: - # should we assert instead? it's probably a surprise to the caller too, and might fail for whatever - # limit they were trying to avoid, but let's give them a shot at least. - LOGGER.error(f"expected a single item's metric to be less than the chunk limit! {item_metric} > {n}, " - f"but returning to make progress") - items.append(item) - it.take_peeked(item) - items_metric += item_metric - break - elif items_metric + item_metric <= n: - items.append(item) - it.take_peeked(item) - items_metric += item_metric - if items_metric >= n: - # we're full - break - # else keep accumulating - else: - assert items_metric + item_metric > n - # we're full - break - # don't catch exception, let that be a concern for callers - except StopIteration: - pass - - has_more = it.has_more() - return tuple(items), has_more - - -def chunk(it: Union[Iterable[V], PeekingIterator[V]], n: int, metric: Callable[[V], int] = one - ) -> Iterable[Iterable[V]]: - """ - :param it: stream of values as a PeekingIterator (or regular iterable if you are only going to take the first chunk - and don't care about the peeked value being consumed) - :param n: consume stream until n is reached. if n is 0, process whole stream as one chunk. - :param metric: the callable that returns positive metric for a value - :returns the Iterable (generator) of chunks - """ - if not isinstance(it, PeekingIterator): - it = PeekingIterator(it) - assert isinstance(it, PeekingIterator) - has_more: bool = True - while has_more: - items, has_more = one_chunk(it=it, n=n, metric=metric) - if items or has_more: - yield items - - -async def async_one_chunk( - it: PeekingAsyncIterator[V], n: int, metric: Callable[[V], int] = one) -> Tuple[Iterable[V], bool]: - """ - :param it: stream of values as a PeekingAsyncIterator - :param n: consume stream until n is reached. if n is 0, process whole stream as one chunk. - :param metric: the callable that returns positive metric for a value - :returns the chunk and if there are more items - """ - items: List[V] = [] - items_metric: int = 0 - if not isinstance(it, PeekingAsyncIterator): - it = PeekingAsyncIterator(it) - assert isinstance(it, PeekingAsyncIterator) - try: - while True: - item = await it.peek() - item_metric = metric(item) - # negative would be insane, let's say positive - assert item_metric > 0, \ - f'expected metric to be positive! item_metric={item_metric}, metric={metric}, item={item}' - if not items and item_metric > n: - # should we assert instead? it's probably a surprise to the caller too, and might fail for whatever - # limit they were trying to avoid, but let's give them a shot at least. - LOGGER.error(f"expected a single item's metric to be less than the chunk limit! {item_metric} > {n}, " - f"but returning to make progress") - items.append(item) - it.take_peeked(item) - items_metric += item_metric - break - elif items_metric + item_metric <= n: - items.append(item) - it.take_peeked(item) - items_metric += item_metric - if items_metric >= n: - # we're full - break - # else keep accumulating - else: - assert items_metric + item_metric > n - # we're full - break - # don't catch exception, let that be a concern for callers - except StopAsyncIteration: - pass - - has_more = await it.has_more() - return tuple(items), has_more - - -async def async_chunk(*, it: Union[AsyncIterator[V], PeekingAsyncIterator[V]], n: int, metric: Callable[[V], int] - ) -> AsyncIterator[Iterable[V]]: - """ - :param it: stream of values as a PeekingAsyncIterator - :param n: consume stream until n is reached. if n is 0, process whole stream as one chunk. - :param metric: the callable that returns positive metric for a value - :returns the chunk and if there are more items - """ - if not isinstance(it, PeekingAsyncIterator): - it = PeekingAsyncIterator(it) - assert isinstance(it, PeekingAsyncIterator) - has_more: bool = True - while has_more: - items, has_more = await async_one_chunk(it=it, n=n, metric=metric) - if items or has_more: - yield items - - -def reduce_in_chunks(*, stream: Iterable[V], n: int, initial: R, - consumer: Callable[[Iterable[V], R], R], metric: Callable[[V], int] = one) -> R: - """ - :param stream: stream of values - :param n: consume stream until n is reached. if n is 0, process whole stream as one chunk. - :param metric: the callable that returns positive metric for a value - :param initial: the initial state - :param consumer: the callable to handle the chunk - :returns the final state - """ - if n > 0: - it = PeekingIterator(stream) - state = initial - for items in chunk(it=it, n=n, metric=metric): - state = consumer(items, state) - return state - else: - return consumer(stream, initial) - - -async def async_reduce_in_chunks(*, stream: AsyncIterator[V], n: int, metric: Callable[[V], int], initial: R, - consumer: Callable[[Iterable[V], R], R]) -> R: - """ - :param stream: - :param n: if n is 0, process whole stream as one chunk - :param metric: the callable that returns positive metric for a value - :param initial: the initial state - :param consumer: the callable to handle the chunk - :returns the final state - """ - if n > 0: - it = PeekingAsyncIterator(stream) - state = initial - async for items in async_chunk(it=it, n=n, metric=metric): - state = consumer(items, state) - return state - else: - return consumer(tuple([_ async for _ in stream]), initial) - - -def consume_in_chunks(*, stream: Iterable[V], n: int, consumer: Callable[[Iterable[V]], None], - metric: Callable[[V], int] = one) -> int: - """ - :param stream: - :param n: consume stream until n is reached if n is 0, process whole stream as one chunk - :param metric: the callable that returns positive metric for a value - :param consumer: the callable to handle the chunk - :return: - """ - _actual_state: int = 0 - - def _consumer(things: Iterable[V], ignored: None) -> None: - nonlocal _actual_state - things = _assure_collection(things) - assert isinstance(things, Collection) # appease the types - _actual_state += len(things) - consumer(things) - reduce_in_chunks(stream=stream, n=n, initial=None, consumer=_consumer, metric=metric) - return _actual_state - - -async def async_consume_in_chunks(*, stream: AsyncIterator[V], n: int, consumer: Callable[[Iterable[V]], None], - metric: Callable[[V], int] = one) -> int: - _actual_state: int = 0 - - def _consumer(things: Iterable[V], ignored: None) -> None: - nonlocal _actual_state - things = _assure_collection(things) - assert isinstance(things, Collection) # appease the types - _actual_state += len(things) - consumer(things) - await async_reduce_in_chunks(stream=stream, n=n, initial=None, consumer=_consumer, metric=metric) - return _actual_state - - -def consume_in_chunks_with_state(*, stream: Iterable[V], n: int, consumer: Callable[[Iterable[V]], None], - state: Callable[[V], R], metric: Callable[[V], int] = one) -> Iterable[R]: - _actual_state: List[R] = list() - - def _consumer(things: Iterable[V], ignored: None) -> None: - nonlocal _actual_state - things = _assure_collection(things) - assert isinstance(things, Collection) # appease the types - _actual_state.extend(map(state, things)) - consumer(things) - - reduce_in_chunks(stream=stream, n=n, initial=None, consumer=_consumer, metric=metric) - return tuple(_actual_state) - - -def _assure_collection(iterable: Iterable[V]) -> Collection[V]: - if isinstance(iterable, Collection): - return iterable - else: - return tuple(iterable) diff --git a/tests/tests/__init__.py b/tests/tests/__init__.py new file mode 100644 index 0000000..f3145d7 --- /dev/null +++ b/tests/tests/__init__.py @@ -0,0 +1,2 @@ +# Copyright Contributors to the Amundsen project. +# SPDX-License-Identifier: Apache-2.0 diff --git a/tests/unit/test_fixtures.py b/tests/tests/test_fixtures.py similarity index 87% rename from tests/unit/test_fixtures.py rename to tests/tests/test_fixtures.py index 501fb5c..a35c6c6 100644 --- a/tests/unit/test_fixtures.py +++ b/tests/tests/test_fixtures.py @@ -3,13 +3,13 @@ import unittest -from amundsen_common.fixtures import (next_application, next_col_type, - next_columns, next_database, - next_description, - next_description_source, - next_descriptions, next_int, next_item, - next_range, next_string, next_table, - next_tag, next_tags, next_user) +from amundsen_common.tests.fixtures import (next_application, next_col_type, + next_columns, next_database, + next_description, + next_description_source, + next_descriptions, next_int, next_item, + next_range, next_string, next_table, + next_tag, next_tags, next_user) from amundsen_common.models.table import Column, ProgrammaticDescription, Stat @@ -87,8 +87,8 @@ def test_16_just_execute_next_descriptions(self) -> None: descs = next_descriptions() self.assertEqual(3, len(descs)) self.assertEqual([ - ProgrammaticDescription(source='dedefghijk001233', text='ijklmnopqrstuvwxyzab001224'), - ProgrammaticDescription(source='devwxyzabc001173', text='abcdefghijklmnopqrst001164'), + ProgrammaticDescription(source='dedefghijk001233', text='ijklmnopqrstuvwxyzab001224'), + ProgrammaticDescription(source='devwxyzabc001173', text='abcdefghijklmnopqrst001164'), ProgrammaticDescription(source='dezabcdefg001203', text='efghijklmnopqrstuvwx001194')], descs) def test_17_just_execute_next_table(self) -> None: diff --git a/tests/unit/utils/test_streams.py b/tests/unit/utils/test_streams.py deleted file mode 100644 index 765eb51..0000000 --- a/tests/unit/utils/test_streams.py +++ /dev/null @@ -1,205 +0,0 @@ -# Copyright Contributors to the Amundsen project. -# SPDX-License-Identifier: Apache-2.0 - -import asyncio -import logging -import sys -import unittest -from typing import AsyncIterator, Iterable -from unittest.mock import Mock, call - -import pytest - -from amundsen_common.utils.streams import (PeekingIterator, _assure_collection, - async_consume_in_chunks, - consume_in_chunks, - consume_in_chunks_with_state, - one_chunk, reduce_in_chunks) - - -class TestConsumer(unittest.TestCase): - def test_consume_in_chunks(self) -> None: - values = Mock() - values.side_effect = list(range(5)) - consumer = Mock() - parent = Mock() - parent.values = values - parent.consumer = consumer - - def stream() -> Iterable[int]: - for _ in range(5): - yield values() - - count = consume_in_chunks(stream=stream(), n=2, consumer=consumer) - self.assertEqual(count, 5) - # this might look at little weird, but PeekingIterator is why - self.assertSequenceEqual([call.values(), call.values(), call.values(), call.consumer((0, 1)), - call.values(), call.values(), call.consumer((2, 3)), call.consumer((4,))], - parent.mock_calls) - - def test_consume_in_chunks_with_exception(self) -> None: - consumer = Mock() - - def stream() -> Iterable[int]: - yield from range(10) - raise KeyError('hi') - - with self.assertRaisesRegex(KeyError, 'hi'): - consume_in_chunks(stream=stream(), n=4, consumer=consumer) - self.assertSequenceEqual([call.consumer((0, 1, 2, 3)), call.consumer((4, 5, 6, 7))], consumer.mock_calls) - - def test_consume_in_chunks_with_state(self) -> None: - values = Mock() - values.side_effect = list(range(5)) - consumer = Mock() - consumer.side_effect = list(range(1, 4)) - state = Mock() - state.side_effect = lambda x: x * 10 - parent = Mock() - parent.values = values - parent.consumer = consumer - parent.state = state - - def stream() -> Iterable[int]: - for _ in range(5): - yield values() - - result = consume_in_chunks_with_state(stream=stream(), n=2, consumer=consumer, state=state) - self.assertSequenceEqual(tuple(result), (0, 10, 20, 30, 40)) - # this might look at little weird, but PeekingIterator is why - self.assertSequenceEqual([call.values(), call.values(), call.values(), call.state(0), call.state(1), - call.consumer((0, 1)), call.values(), call.values(), call.state(2), call.state(3), - call.consumer((2, 3)), call.state(4), call.consumer((4,))], - parent.mock_calls) - - def test_consume_in_chunks_no_batch(self) -> None: - consumer = Mock() - count = consume_in_chunks(stream=range(100000000), n=-1, consumer=consumer) - self.assertEqual(100000000, count) - consumer.assert_called_once() - - def test_reduce_in_chunks(self) -> None: - values = Mock() - values.side_effect = list(range(5)) - consumer = Mock() - consumer.side_effect = list(range(1, 4)) - parent = Mock() - parent.values = values - parent.consumer = consumer - - def stream() -> Iterable[int]: - for _ in range(5): - yield values() - - result = reduce_in_chunks(stream=stream(), n=2, initial=0, consumer=consumer) - self.assertEqual(result, 3) - # this might look at little weird, but PeekingIterator is why - self.assertSequenceEqual([call.values(), call.values(), call.values(), call.consumer((0, 1), 0), - call.values(), call.values(), call.consumer((2, 3), 1), call.consumer((4,), 2)], - parent.mock_calls) - - @pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7 or higher") - def test_async_consume_in_chunks(self) -> None: - values = Mock() - values.side_effect = list(range(5)) - consumer = Mock() - parent = Mock() - parent.values = values - parent.consumer = consumer - - async def stream() -> AsyncIterator[int]: - for i in range(5): - yield values() - - count = asyncio.run(async_consume_in_chunks(stream=stream(), n=2, consumer=consumer)) - self.assertEqual(5, count, 'count') - # this might look at little weird, but PeekingIterator is why - self.assertSequenceEqual([call.values(), call.values(), call.values(), call.consumer((0, 1)), - call.values(), call.values(), call.consumer((2, 3)), call.consumer((4,))], - parent.mock_calls) - - def test_one_chunk_logging(self) -> None: - it = PeekingIterator(range(1, 4)) - actual, has_more = one_chunk(it=it, n=2, metric=lambda x: x) - self.assertSequenceEqual([1], tuple(actual)) - self.assertTrue(has_more) - - actual, has_more = one_chunk(it=it, n=2, metric=lambda x: x) - self.assertSequenceEqual([2], tuple(actual)) - self.assertTrue(has_more) - - with self.assertLogs(logger='amundsen_common.utils.streams', level=logging.ERROR): - actual, has_more = one_chunk(it=it, n=2, metric=lambda x: x) - self.assertSequenceEqual([3], tuple(actual)) - self.assertFalse(has_more) - - def test_assure_collection(self) -> None: - actual = _assure_collection(iter(range(2))) - self.assertIsInstance(actual, tuple) - self.assertEqual((0, 1), actual) - actual = _assure_collection(list(range(2))) - self.assertIsInstance(actual, list) - self.assertEqual([0, 1], actual) - actual = _assure_collection(set(range(2))) - self.assertIsInstance(actual, set) - self.assertEqual({0, 1}, actual) - actual = _assure_collection(frozenset(range(2))) - self.assertIsInstance(actual, frozenset) - self.assertEqual(frozenset({0, 1}), actual) - - -class TestPeekingIterator(unittest.TestCase): - # TODO: it'd be good to test the locking - def test_no_peek(self) -> None: - it = PeekingIterator(range(3)) - self.assertEqual(0, next(it)) - self.assertEqual(1, next(it)) - self.assertEqual(2, next(it)) - with self.assertRaises(StopIteration): - next(it) - - def test_peek_is_next(self) -> None: - it = PeekingIterator(range(2)) - self.assertEqual(0, it.peek()) - self.assertTrue(it.has_more()) - self.assertEqual(0, next(it)) - self.assertTrue(it.has_more()) - self.assertEqual(1, next(it)) - self.assertFalse(it.has_more()) - with self.assertRaises(StopIteration): - next(it) - - def test_peek_repeats(self) -> None: - it = PeekingIterator(range(2)) - for _ in range(100): - self.assertEqual(0, it.peek()) - self.assertEqual(0, next(it)) - self.assertEqual(1, next(it)) - - def test_peek_after_exhaustion(self) -> None: - it = PeekingIterator(range(2)) - self.assertEqual(0, next(it)) - self.assertEqual(1, next(it)) - with self.assertRaises(StopIteration): - next(it) - with self.assertRaises(StopIteration): - it.peek() - self.assertEqual(999, it.peek_default(999)) - - def test_take_peeked(self) -> None: - it = PeekingIterator(range(2)) - self.assertEqual(0, it.peek()) - it.take_peeked(0) - self.assertEqual(1, next(it)) - with self.assertRaises(StopIteration): - next(it) - - def test_take_peeked_wrong_value(self) -> None: - it = PeekingIterator(range(2)) - self.assertEqual(0, it.peek()) - with self.assertRaisesRegex(AssertionError, 'expected the peaked value to be the same'): - it.take_peeked(1) - it.take_peeked(0) - self.assertEqual(1, next(it)) - -# TODO: test PeekingAsyncIterator directly