Skip to content
This repository has been archived by the owner on Jun 30, 2022. It is now read-only.

Commit

Permalink
Merge branch 'moe_writing_branch_from_4075ca967e5d2a78929eec99e6cc23a…
Browse files Browse the repository at this point in the history
…14fde4790'
  • Loading branch information
silviulica committed Apr 19, 2016
2 parents f37afc2 + c60af56 commit 698921f
Show file tree
Hide file tree
Showing 64 changed files with 10,624 additions and 724 deletions.
33 changes: 33 additions & 0 deletions google/cloud/dataflow/coders/observable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Observable base class for iterables."""


class ObservableMixin(object):
"""An observable iterable.
Subclasses need to call self.notify_observers with any object yielded.
"""

def __init__(self):
self.observers = []

def register_observer(self, callback):
self.observers.append(callback)

def notify_observers(self, value, **kwargs):
for o in self.observers:
o(value, **kwargs)
54 changes: 54 additions & 0 deletions google/cloud/dataflow/coders/observable_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for the Observable mixin class."""

import logging
import unittest


from google.cloud.dataflow.coders import observable


class ObservableMixinTest(unittest.TestCase):
observed_count = 0
observed_sum = 0
observed_keys = []

def observer(self, value, key=None):
self.observed_count += 1
self.observed_sum += value
self.observed_keys.append(key)

def test_observable(self):
class Watched(observable.ObservableMixin):

def __iter__(self):
for i in (1, 4, 3):
self.notify_observers(i, key='a%d' % i)
yield i

watched = Watched()
watched.register_observer(lambda v, key: self.observer(v, key=key))
for _ in watched:
pass

self.assertEquals(3, self.observed_count)
self.assertEquals(8, self.observed_sum)
self.assertEquals(['a1', 'a3', 'a4'], sorted(self.observed_keys))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
27 changes: 27 additions & 0 deletions google/cloud/dataflow/coders/slow_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,33 @@ def get(self):
return ''.join(self.data)


class ByteCountingOutputStream(OutputStream):
"""A pure Python implementation of stream.ByteCountingOutputStream."""

def __init__(self):
# Note that we don't actually use any of the data initialized by our super.
super(ByteCountingOutputStream, self).__init__()
self.count = 0

def write(self, byte_array, nested=False):
blen = len(byte_array)
if nested:
self.write_var_int64(blen)
self.count += blen

def write_byte(self, _):
self.count += 1

def get_count(self):
return self.count

def get(self):
raise NotImplementedError

def __str__(self):
return '<%s %s>' % (self.__class__.__name__, self.count)


class InputStream(object):
"""A pure Python implementation of stream.InputStream."""

Expand Down
11 changes: 11 additions & 0 deletions google/cloud/dataflow/coders/stream.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ cdef class OutputStream(object):
cdef extend(self, size_t missing)


cdef class ByteCountingOutputStream(OutputStream):
cdef size_t count

cpdef write(self, bytes b, bint nested=*)
cpdef write_byte(self, unsigned char val)
cpdef write_bigendian_int64(self, libc.stdint.int64_t val)
cpdef write_bigendian_int32(self, libc.stdint.int32_t val)
cpdef size_t get_count(self)
cpdef bytes get(self)


cdef class InputStream(object):
cdef size_t pos
cdef bytes all
Expand Down
38 changes: 38 additions & 0 deletions google/cloud/dataflow/coders/stream.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,44 @@ cdef class OutputStream(object):
self.data = <char*>libc.stdlib.realloc(self.data, self.size)


cdef class ByteCountingOutputStream(OutputStream):
"""An output string stream implementation that only counts the bytes.
This implementation counts the number of bytes it "writes" but
doesn't actually write them anyway. Thus it has write() but not
get(). get_count() returns how many bytes were written.
This is useful for sizing an encoding.
"""

def __cinit__(self):
self.count = 0

cpdef write(self, bytes b, bint nested=False):
cdef size_t blen = len(b)
if nested:
self.write_var_int64(blen)
self.count += blen

cpdef write_byte(self, unsigned char _):
self.count += 1

cpdef write_bigendian_int64(self, libc.stdint.int64_t _):
self.count += 8

cpdef write_bigendian_int32(self, libc.stdint.int32_t _):
self.count += 4

cpdef size_t get_count(self):
return self.count

cpdef bytes get(self):
raise NotImplementedError

def __str__(self):
return '<%s %s>' % (self.__class__.__name__, self.count)


cdef class InputStream(object):
"""An input string stream implementation supporting read() and size()."""

Expand Down
29 changes: 29 additions & 0 deletions google/cloud/dataflow/coders/stream_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

"""Tests for the stream implementations."""

import logging
import math
import unittest

Expand All @@ -22,8 +23,11 @@


class StreamTest(unittest.TestCase):
# pylint: disable=invalid-name
InputStream = slow_stream.InputStream
OutputStream = slow_stream.OutputStream
ByteCountingOutputStream = slow_stream.ByteCountingOutputStream
# pylint: enable=invalid-name

def test_read_write(self):
out_s = self.OutputStream()
Expand Down Expand Up @@ -99,6 +103,27 @@ def test_read_write_bigendian_int32(self):
for v in values:
self.assertEquals(v, in_s.read_bigendian_int32())

def test_byte_counting(self):
bc_s = self.ByteCountingOutputStream()
self.assertEquals(0, bc_s.get_count())
bc_s.write('def')
self.assertEquals(3, bc_s.get_count())
bc_s.write('')
self.assertEquals(3, bc_s.get_count())
bc_s.write_byte(10)
self.assertEquals(4, bc_s.get_count())
# "nested" also writes the length of the string, which should
# cause 1 extra byte to be counted.
bc_s.write('2345', nested=True)
self.assertEquals(9, bc_s.get_count())
bc_s.write_var_int64(63)
self.assertEquals(10, bc_s.get_count())
bc_s.write_bigendian_int64(42)
self.assertEquals(18, bc_s.get_count())
bc_s.write_bigendian_int32(36)
self.assertEquals(22, bc_s.get_count())
bc_s.write_bigendian_double(6.25)
self.assertEquals(30, bc_s.get_count())

try:
# pylint: disable=g-import-not-at-top
Expand All @@ -108,22 +133,26 @@ class FastStreamTest(StreamTest):
"""Runs the test with the compiled stream classes."""
InputStream = stream.InputStream
OutputStream = stream.OutputStream
ByteCountingOutputStream = stream.ByteCountingOutputStream


class SlowFastStreamTest(StreamTest):
"""Runs the test with compiled and uncompiled stream classes."""
InputStream = stream.InputStream
OutputStream = slow_stream.OutputStream
ByteCountingOutputStream = slow_stream.ByteCountingOutputStream


class FastSlowStreamTest(StreamTest):
"""Runs the test with uncompiled and compiled stream classes."""
InputStream = slow_stream.InputStream
OutputStream = stream.OutputStream
ByteCountingOutputStream = stream.ByteCountingOutputStream

except ImportError:
pass


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
85 changes: 80 additions & 5 deletions google/cloud/dataflow/dataflow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,19 +250,91 @@ def match(actual):
assert_that(results, matcher(1, a_list, some_pairs))
pipeline.run()

def test_as_list_without_unique_labels(self):
a_list = [1, 2, 3]
def test_as_singleton_without_unique_labels(self):
# This should succeed as calling AsSingleton on the same PCollection twice
# with the same defaults will return the same PCollectionView.
a_list = [2]
pipeline = Pipeline('DirectPipelineRunner')
main_input = pipeline | Create('main input', [1])
side_list = pipeline | Create('side list', a_list)
results = main_input | FlatMap(
'test',
lambda x, s1, s2: [[x, s1, s2]],
AsSingleton(side_list), AsSingleton(side_list))

def matcher(expected_elem, expected_singleton):
def match(actual):
[[actual_elem, actual_singleton1, actual_singleton2]] = actual
equal_to([expected_elem])([actual_elem])
equal_to([expected_singleton])([actual_singleton1])
equal_to([expected_singleton])([actual_singleton2])
return match

assert_that(results, matcher(1, 2))
pipeline.run()

def test_as_singleton_with_different_defaults_without_unique_labels(self):
# This should fail as AsSingleton with distinct default values should create
# distinct PCollectionViews with the same full_label.
a_list = [2]
pipeline = Pipeline('DirectPipelineRunner')
main_input = pipeline | Create('main input', [1])
side_list = pipeline | Create('side list', a_list)

with self.assertRaises(RuntimeError) as e:
_ = main_input | FlatMap(
'test',
lambda x, ls1, ls2: [[x, ls1, ls2]],
AsList(side_list), AsList(side_list))
lambda x, s1, s2: [[x, s1, s2]],
AsSingleton(side_list), AsSingleton(side_list, default_value=3))
self.assertTrue(
e.exception.message.startswith(
'Transform "AsList" does not have a stable unique label.'))
'Transform "ViewAsSingleton(side list.None)" does not have a '
'stable unique label.'))

def test_as_singleton_with_different_defaults_with_unique_labels(self):
a_list = []
pipeline = Pipeline('DirectPipelineRunner')
main_input = pipeline | Create('main input', [1])
side_list = pipeline | Create('side list', a_list)
results = main_input | FlatMap(
'test',
lambda x, s1, s2: [[x, s1, s2]],
AsSingleton('si1', side_list, default_value=2),
AsSingleton('si2', side_list, default_value=3))

def matcher(expected_elem, expected_singleton1, expected_singleton2):
def match(actual):
[[actual_elem, actual_singleton1, actual_singleton2]] = actual
equal_to([expected_elem])([actual_elem])
equal_to([expected_singleton1])([actual_singleton1])
equal_to([expected_singleton2])([actual_singleton2])
return match

assert_that(results, matcher(1, 2, 3))
pipeline.run()

def test_as_list_without_unique_labels(self):
# This should succeed as calling AsList on the same PCollection twice will
# return the same PCollectionView.
a_list = [1, 2, 3]
pipeline = Pipeline('DirectPipelineRunner')
main_input = pipeline | Create('main input', [1])
side_list = pipeline | Create('side list', a_list)
results = main_input | FlatMap(
'test',
lambda x, ls1, ls2: [[x, ls1, ls2]],
AsList(side_list), AsList(side_list))

def matcher(expected_elem, expected_list):
def match(actual):
[[actual_elem, actual_list1, actual_list2]] = actual
equal_to([expected_elem])([actual_elem])
equal_to(expected_list)(actual_list1)
equal_to(expected_list)(actual_list2)
return match

assert_that(results, matcher(1, [1, 2, 3]))
pipeline.run()

def test_as_list_with_unique_labels(self):
a_list = [1, 2, 3]
Expand All @@ -282,6 +354,9 @@ def match(actual):
equal_to(expected_list)(actual_list2)
return match

assert_that(results, matcher(1, [1, 2, 3]))
pipeline.run()

def test_as_dict_with_unique_labels(self):
some_kvs = [('a', 1), ('b', 2)]
pipeline = Pipeline('DirectPipelineRunner')
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/dataflow/examples/cookbook/bigquery_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def run(argv=None):

p = df.Pipeline(argv=pipeline_args)

from apitools.clients import bigquery # pylint: disable=g-import-not-at-top
from google.cloud.dataflow.internal.clients import bigquery # pylint: disable=g-import-not-at-top

table_schema = bigquery.TableSchema()

Expand Down
Loading

0 comments on commit 698921f

Please sign in to comment.