Skip to content

Commit

Permalink
Added functions for get_selected_streams (#100)
Browse files Browse the repository at this point in the history
* Added functions for get_selected_streams

* refactor functions onto class; pylint
  • Loading branch information
Kyle Allan authored Jun 19, 2019
1 parent 4f507e8 commit 008fa38
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 5 deletions.
5 changes: 4 additions & 1 deletion singer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@
resolve_schema_references
)

from singer.catalog import Catalog
from singer.catalog import (
Catalog,
CatalogEntry
)
from singer.schema import Schema

from singer.bookmarks import (
Expand Down
37 changes: 34 additions & 3 deletions singer/catalog.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
'''Provides an object model for a Singer Catalog.'''

import json
import sys

from singer.schema import Schema
from . import metadata as metadata_module
from .bookmarks import get_currently_syncing
from .logger import get_logger
from .schema import Schema

LOGGER = get_logger()


# pylint: disable=too-many-instance-attributes
class CatalogEntry():
Expand Down Expand Up @@ -33,7 +38,9 @@ def __eq__(self, other):
return self.__dict__ == other.__dict__

def is_selected(self):
return self.schema.selected # pylint: disable=no-member
mdata = metadata_module.to_map(self.metadata)
# pylint: disable=no-member
return self.schema.selected or metadata_module.get(mdata, (), 'selected')

def to_dict(self):
result = {}
Expand Down Expand Up @@ -116,3 +123,27 @@ def get_stream(self, tap_stream_id):
if stream.tap_stream_id == tap_stream_id:
return stream
return None

def _shuffle_streams(self, state):
currently_syncing = get_currently_syncing(state)

if currently_syncing is None:
return self.streams

matching_index = 0
for i, catalog_entry in enumerate(self.streams):
if catalog_entry.tap_stream_id == currently_syncing:
matching_index = i
break
top_half = self.streams[matching_index:]
bottom_half = self.streams[:matching_index]
return top_half + bottom_half


def get_selected_streams(self, state):
for stream in self._shuffle_streams(state):
if not stream.is_selected():
LOGGER.info('Skipping stream: %s', stream.tap_stream_id)
continue

yield stream
36 changes: 35 additions & 1 deletion tests/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,40 @@
from singer.schema import Schema
from singer.catalog import Catalog, CatalogEntry

class TestGetSelectedStreams(unittest.TestCase):
def test_one_selected_stream(self):
selected_entry = CatalogEntry(tap_stream_id='a',
schema=Schema(),
metadata=[{'metadata':
{'selected': True},
'breadcrumb': []}])
catalog = Catalog(
[selected_entry,
CatalogEntry(tap_stream_id='b',schema=Schema(),metadata=[]),
CatalogEntry(tap_stream_id='c',schema=Schema(),metadata=[])])
state = {}
selected_streams = catalog.get_selected_streams(state)
self.assertEquals([e for e in selected_streams],[selected_entry])

def test_resumes_currently_syncing_stream(self):
selected_entry_a = CatalogEntry(tap_stream_id='a',
schema=Schema(),
metadata=[{'metadata':
{'selected': True},
'breadcrumb': []}])
selected_entry_c = CatalogEntry(tap_stream_id='c',
schema=Schema(),
metadata=[{'metadata':
{'selected': True},
'breadcrumb': []}])
catalog = Catalog(
[selected_entry_a,
CatalogEntry(tap_stream_id='b',schema=Schema(),metadata=[]),
selected_entry_c])
state = {'currently_syncing': 'c'}
selected_streams = catalog.get_selected_streams(state)
self.assertEquals([e for e in selected_streams][0],selected_entry_c)

class TestToDictAndFromDict(unittest.TestCase):

dict_form = {
Expand Down Expand Up @@ -89,7 +123,7 @@ def test_from_dict(self):

def test_to_dict(self):
self.assertEqual(self.dict_form, self.obj_form.to_dict())


class TestGetStream(unittest.TestCase):
def test(self):
Expand Down

0 comments on commit 008fa38

Please sign in to comment.