From 839ebb36d891bebdae09b6653feed8f31d3a957a Mon Sep 17 00:00:00 2001 From: Andrey Kabanov Date: Mon, 11 Oct 2021 06:36:32 -0700 Subject: [PATCH] Add tap tester (#14) * changes to tap entrypoint to work with rest of tap * changes to discover.py to work with rest of tap * changes to schema.py to work with rest of the tap * changes to sync.py to work with rest of the tap * updated README links to documentation * changes to client.py: * actually raise 429 error * pylint fixes * initial working version of streams.py * adding pagination to streams.py * adding links object as part of client return for pagination changes * adjusting indentation to match PEP8: * __init__.py * client.py * sync.py * removing extraneous keys and related functions from discover.py * changes to streams.py: * indentation fixes to match PEP8 * unpacking result of client.get with `records, _` * adding tap-tester base.py * adding tap-tester sync canary * adding tap-tester discovery * adding tap-tester start date * adding tap-tester automated fields * changes to setup.py: * bumping singer-python * adding dev requirements * adding circleci config * adding tap-tester pagination * fix for onetimes stream not supporting cursor based pagination * updating cirlce config * adding additional assertions per PR feedback * add additional streams to pagination test * switch customers stream to page based * Link header was not honoring original call params and duplicating records * Make pylint happy * Update tap-tester invocation * pylint fixes * remove extraneous string interpolation * remove extraneous class attributes for shop stream * fixes to start_date test: * adjust start_date_2 to work with data * add stream replication methods for tests * modify/add assertions based on stream replication methods and data * adding assertion for valid-replication-keys to discover test Co-authored-by: Andy Lu --- .circleci/config.yml | 44 ++ README.md | 12 +- setup.py | 7 +- tap_recharge/__init__.py | 30 +- tap_recharge/client.py | 44 +- tap_recharge/discover.py | 32 +- tap_recharge/schema.py | 105 ++--- tap_recharge/streams.py | 449 +++++++++++++++++++++ tap_recharge/sync.py | 508 ++---------------------- tests/base.py | 355 +++++++++++++++++ tests/test_recharge_automated_fields.py | 63 +++ tests/test_recharge_discovery.py | 121 ++++++ tests/test_recharge_pagination.py | 62 +++ tests/test_recharge_start_date.py | 110 +++++ tests/test_recharge_sync_canary.py | 43 ++ 15 files changed, 1386 insertions(+), 599 deletions(-) create mode 100644 .circleci/config.yml create mode 100644 tap_recharge/streams.py create mode 100644 tests/base.py create mode 100644 tests/test_recharge_automated_fields.py create mode 100644 tests/test_recharge_discovery.py create mode 100644 tests/test_recharge_pagination.py create mode 100644 tests/test_recharge_start_date.py create mode 100644 tests/test_recharge_sync_canary.py diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 0000000..307b36d --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,44 @@ +version: 2 +jobs: + build: + docker: + - image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester + steps: + - checkout + - run: + name: 'Setup virtual env' + command: | + python3 -mvenv /usr/local/share/virtualenvs/tap-recharge + source /usr/local/share/virtualenvs/tap-recharge/bin/activate + pip install .[dev] + - run: + name: 'pylint' + command: | + source /usr/local/share/virtualenvs/tap-recharge/bin/activate + pylint tap_recharge --disable 'missing-module-docstring,missing-function-docstring,missing-class-docstring,no-else-raise,raise-missing-from,inconsistent-return-statements' + - run: + when: always + name: 'Integration Tests' + command: | + aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh + source dev_env.sh + source /usr/local/share/virtualenvs/tap-tester/bin/activate + run-test --tap=tap-recharge tests + +workflows: + version: 2 + commit: + jobs: + - build: + context: circleci-user + build_daily: + triggers: + - schedule: + cron: "0 14 * * *" + filters: + branches: + only: + - master + jobs: + - build: + context: circleci-user diff --git a/README.md b/README.md index 553467e..1d183a8 100644 --- a/README.md +++ b/README.md @@ -14,10 +14,10 @@ This tap: - [Customers](https://developer.rechargepayments.com/#list-customers) - [Discounts](https://developer.rechargepayments.com/#list-discounts) - [Metafields for Store, Customers, Subscriptions](https://developer.rechargepayments.com/#list-metafields) - - [One-time Products](https://developer.rechargepayments.com/#list-onetimes-alpha) + - [One-time Products](https://developer.rechargepayments.com/#list-onetimes) - [Orders](https://developer.rechargepayments.com/#list-orders) - - [Products](https://developer.rechargepayments.com/#list-products-beta) - - [Shop](https://developer.rechargepayments.com/#retrieve-shop) + - [Products](https://developer.rechargepayments.com/#list-products) + - [Shop](https://developer.rechargepayments.com/#retrieve-a-shop) - [Subscriptions](https://developer.rechargepayments.com/#list-subscriptions) - Outputs the schema for each resource - Incrementally pulls data based on the input state @@ -94,7 +94,7 @@ This tap: - Bookmark: updated_at (date-time) - Transformations: None -[**onetimes**](https://developer.rechargepayments.com/#list-onetimes-alpha) +[**onetimes**](https://developer.rechargepayments.com/#list-onetimes) - Endpoint: https://api.rechargeapps.com/onetimes - Primary keys: id - Foreign keys: address_id (addresses), customer_id (customers), recharge_product_id (products), shopify_product_id, shopify_variant_id @@ -112,7 +112,7 @@ This tap: - Bookmark: updated_at (date-time) - Transformations: None -[**products**](https://developer.rechargepayments.com/#list-products-beta) +[**products**](https://developer.rechargepayments.com/#list-products) - Endpoint: https://api.rechargeapps.com/products - Primary keys: id - Foreign keys: collection_id (collections), shopify_product_id @@ -120,7 +120,7 @@ This tap: - Bookmark: updated_at (date-time) - Transformations: None -[**shop**](https://developer.rechargepayments.com/#retrieve-shop) +[**shop**](https://developer.rechargepayments.com/#retrieve-a-shop) - Endpoint: https://api.rechargeapps.com/shop - Primary keys: id - Foreign keys: None diff --git a/setup.py b/setup.py index bd7ceb0..8fc820a 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ install_requires=[ 'backoff==1.8.0', 'requests==2.23.0', - 'singer-python==5.9.0' + 'singer-python==5.10.0' ], entry_points=''' [console_scripts] @@ -22,4 +22,9 @@ 'tap_recharge': [ 'schemas/*.json' ] + }, + extras_require={ + 'dev': [ + 'pylint' + ] }) diff --git a/tap_recharge/__init__.py b/tap_recharge/__init__.py index 5c13700..dfdaf9f 100644 --- a/tap_recharge/__init__.py +++ b/tap_recharge/__init__.py @@ -1,15 +1,12 @@ #!/usr/bin/env python3 -import sys -import json -import argparse -import singer -from singer import metadata, utils +from singer import get_logger, utils + from tap_recharge.client import RechargeClient from tap_recharge.discover import discover from tap_recharge.sync import sync -LOGGER = singer.get_logger() +LOGGER = get_logger() REQUIRED_CONFIG_KEYS = [ 'access_token', @@ -21,17 +18,19 @@ def do_discover(): LOGGER.info('Starting discover') catalog = discover() - json.dump(catalog.to_dict(), sys.stdout, indent=2) + catalog.dump() LOGGER.info('Finished discover') -@singer.utils.handle_top_exception(LOGGER) +@utils.handle_top_exception(LOGGER) def main(): + """Entrypoint function for tap.""" - parsed_args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS) + parsed_args = utils.parse_args(REQUIRED_CONFIG_KEYS) - with RechargeClient(parsed_args.config['access_token'], - parsed_args.config['user_agent']) as client: + with RechargeClient( + parsed_args.config['access_token'], + parsed_args.config['user_agent']) as client: state = {} if parsed_args.state: @@ -40,10 +39,11 @@ def main(): if parsed_args.discover: do_discover() elif parsed_args.catalog: - sync(client=client, - catalog=parsed_args.catalog, - state=state, - start_date=parsed_args.config['start_date']) + sync( + client=client, + catalog=parsed_args.catalog, + state=state, + config=parsed_args.config) if __name__ == '__main__': main() diff --git a/tap_recharge/client.py b/tap_recharge/client.py index cbfa2dc..108f65f 100644 --- a/tap_recharge/client.py +++ b/tap_recharge/client.py @@ -1,8 +1,8 @@ import backoff import requests -from requests.exceptions import ConnectionError -from singer import metrics, utils + import singer +from singer import metrics, utils LOGGER = singer.get_logger() @@ -82,8 +82,8 @@ def raise_for_error(response): return response = response.json() if ('error' in response) or ('errorCode' in response): - message = '%s: %s' % (response.get('error', str(error)), - response.get('message', 'Unknown Error')) + message = f"{response.get('error', str(error))}: \ + {response.get('message', 'Unknown Error')}" error_code = response.get('status') ex = get_exception_for_error_code(error_code) if response.status_code == 401 and 'Expired access token' in message: @@ -97,10 +97,11 @@ def raise_for_error(response): raise RechargeError(error) -class RechargeClient(object): - def __init__(self, - access_token, - user_agent=None): +class RechargeClient: + def __init__( + self, + access_token, + user_agent=None): self.__access_token = access_token self.__user_agent = user_agent self.__session = requests.Session() @@ -114,10 +115,11 @@ def __enter__(self): def __exit__(self, exception_type, exception_value, traceback): self.__session.close() - @backoff.on_exception(backoff.expo, - Server5xxError, - max_tries=5, - factor=2) + @backoff.on_exception( + backoff.expo, + Server5xxError, + max_tries=5, + factor=2) def check_access_token(self): if self.__access_token is None: raise Exception('Error: Missing access_token.') @@ -131,16 +133,17 @@ def check_access_token(self): url='https://api.rechargeapps.com', headers=headers) if response.status_code != 200: - LOGGER.error('Error status_code = {}'.format(response.status_code)) + LOGGER.error('Error status_code = %s', response.status_code) raise_for_error(response) else: return True - @backoff.on_exception(backoff.expo, - (Server5xxError, ConnectionError, Server429Error), - max_tries=5, - factor=2) + @backoff.on_exception( + backoff.expo, + (Server5xxError, requests.ConnectionError, Server429Error), + max_tries=5, + factor=2) # Call/rate limit: https://developer.rechargepayments.com/#rate-limits @utils.ratelimit(120, 60) def request(self, method, path=None, url=None, **kwargs): @@ -177,6 +180,9 @@ def request(self, method, path=None, url=None, **kwargs): if response.status_code >= 500: raise Server5xxError() + if response.status_code == 429: + raise Server429Error() + if response.status_code != 200: raise_for_error(response) @@ -184,10 +190,10 @@ def request(self, method, path=None, url=None, **kwargs): try: response_json = response.json() except Exception as err: - LOGGER.error('{}'.format(err)) + LOGGER.error(err) raise Exception(err) - return response_json + return response_json, response.links def get(self, path, **kwargs): return self.request('GET', path=path, **kwargs) diff --git a/tap_recharge/discover.py b/tap_recharge/discover.py index 503b948..fcd2a61 100644 --- a/tap_recharge/discover.py +++ b/tap_recharge/discover.py @@ -1,20 +1,24 @@ -from singer.catalog import Catalog, CatalogEntry, Schema -from tap_recharge.schema import get_schemas, STREAMS +from singer.catalog import Catalog +from tap_recharge.schema import get_schemas + def discover(): + """ + Constructs a singer Catalog object based on the schemas and metadata. + """ schemas, field_metadata = get_schemas() - catalog = Catalog([]) + streams = [] + + for schema_name, schema in schemas.items(): + schema_meta = field_metadata[schema_name] - for stream_name, schema_dict in schemas.items(): - schema = Schema.from_dict(schema_dict) - mdata = field_metadata[stream_name] + catalog_entry = { + 'stream': schema_name, + 'tap_stream_id': schema_name, + 'schema': schema, + 'metadata': schema_meta + } - catalog.streams.append(CatalogEntry( - stream=stream_name, - tap_stream_id=stream_name, - key_properties=STREAMS[stream_name]['key_properties'], - schema=schema, - metadata=mdata - )) + streams.append(catalog_entry) - return catalog + return Catalog.from_dict({'streams': streams}) diff --git a/tap_recharge/schema.py b/tap_recharge/schema.py index 010bd39..df843a5 100644 --- a/tap_recharge/schema.py +++ b/tap_recharge/schema.py @@ -1,98 +1,53 @@ import os import json -from singer import metadata -# Reference: https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#Metadata -STREAMS = { - 'addresses': { - 'key_properties': ['id'], - 'replication_method': 'INCREMENTAL', - 'replication_keys': ['updated_at'] - }, - 'charges': { - 'key_properties': ['id'], - 'replication_method': 'INCREMENTAL', - 'replication_keys': ['updated_at'] - }, - 'collections': { - 'key_properties': ['id'], - 'replication_method': 'INCREMENTAL', - 'replication_keys': ['updated_at'] - }, - 'customers': { - 'key_properties': ['id'], - 'replication_method': 'INCREMENTAL', - 'replication_keys': ['updated_at'] - }, - 'discounts': { - 'key_properties': ['id'], - 'replication_method': 'INCREMENTAL', - 'replication_keys': ['updated_at'] - }, - 'metafields_store': { - 'key_properties': ['id'], - 'replication_method': 'INCREMENTAL', - 'replication_keys': ['updated_at'] - }, - 'metafields_customer': { - 'key_properties': ['id'], - 'replication_method': 'INCREMENTAL', - 'replication_keys': ['updated_at'] - }, - 'metafields_subscription': { - 'key_properties': ['id'], - 'replication_method': 'INCREMENTAL', - 'replication_keys': ['updated_at'] - }, - 'onetimes': { - 'key_properties': ['id'], - 'replication_method': 'INCREMENTAL', - 'replication_keys': ['updated_at'] - }, - 'orders': { - 'key_properties': ['id'], - 'replication_method': 'INCREMENTAL', - 'replication_keys': ['updated_at'] - }, - 'products': { - 'key_properties': ['id'], - 'replication_method': 'INCREMENTAL', - 'replication_keys': ['updated_at'] - }, - 'shop': { - 'key_properties': ['id'], - 'replication_method': 'FULL_TABLE' - }, - 'subscriptions': { - 'key_properties': ['id'], - 'replication_method': 'INCREMENTAL', - 'replication_keys': ['updated_at'] - } -} +from singer import metadata +from tap_recharge.streams import STREAMS def get_abs_path(path): return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) def get_schemas(): + """ + Loads the schemas defined for the tap. + + This function iterates through the STREAMS dictionary which contains + a mapping of the stream name and its corresponding class and loads + the matching schema file from the schemas directory. + """ schemas = {} field_metadata = {} - for stream_name, stream_metadata in STREAMS.items(): - schema_path = get_abs_path('schemas/{}.json'.format(stream_name)) - with open(schema_path) as file: + for stream_name, stream_object in STREAMS.items(): + schema_path = get_abs_path(f'schemas/{stream_name}.json') + with open(schema_path, encoding='utf-8') as file: schema = json.load(file) schemas[stream_name] = schema - mdata = metadata.new() + if stream_object.replication_method == 'INCREMENTAL': + replication_keys = stream_object.valid_replication_keys + else: + replication_keys = None + + # pylint: disable=line-too-long # Documentation: https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#singer-python-helper-functions # Reference: https://github.com/singer-io/singer-python/blob/master/singer/metadata.py#L25-L44 mdata = metadata.get_standard_metadata( schema=schema, - key_properties=stream_metadata.get('key_properties', None), - valid_replication_keys=stream_metadata.get('replication_keys', None), - replication_method=stream_metadata.get('replication_method', None) + key_properties=stream_object.key_properties, + replication_method=stream_object.replication_method, + valid_replication_keys=replication_keys, ) + + mdata = metadata.to_map(mdata) + + if replication_keys: + for replication_key in replication_keys: + mdata = metadata.write(mdata, ('properties', replication_key), 'inclusion', 'automatic') + + mdata = metadata.to_list(mdata) + field_metadata[stream_name] = mdata return schemas, field_metadata diff --git a/tap_recharge/streams.py b/tap_recharge/streams.py new file mode 100644 index 0000000..9a1a390 --- /dev/null +++ b/tap_recharge/streams.py @@ -0,0 +1,449 @@ +""" +This module defines the stream classes and their individual sync logic. +""" + +import datetime + +from typing import Iterator + +import singer +from singer import Transformer, utils, metrics + +from tap_recharge.client import RechargeClient + + +LOGGER = singer.get_logger() + +MAX_PAGE_LIMIT = 250 + + +class BaseStream: + """ + A base class representing singer streams. + + :param client: The API client used to extract records from external source + """ + tap_stream_id = None + replication_method = None + replication_key = None + key_properties = [] + valid_replication_keys = [] + path = None + params = {} + parent = None + data_key = None + + def __init__(self, client: RechargeClient): + self.client = client + + def get_records( + self, + bookmark_datetime: datetime = None, + is_parent: bool = False) -> list: + """ + Returns a list of records for that stream. + + :param bookmark_datetime: The datetime object representing the + bookmark date + :param is_parent: If true, may change the type of data + that is returned for a child stream to consume + :return: list of records + """ + raise NotImplementedError("Child classes of BaseStream require " + "`get_records` implementation") + + def get_parent_data(self, bookmark_datetime: datetime = None) -> list: + """ + Returns a list of records from the parent stream. + + :param bookmark_datetime: The datetime object representing the + bookmark date + :return: A list of records + """ + # pylint: disable=not-callable + parent = self.parent(self.client) + return parent.get_records(bookmark_datetime, is_parent=True) + + +# pylint: disable=abstract-method +class IncrementalStream(BaseStream): + """ + A child class of a base stream used to represent streams that use the + INCREMENTAL replication method. + + :param client: The API client used extract records from the external source + """ + replication_method = 'INCREMENTAL' + + # pylint: disable=too-many-arguments + def sync( + self, + state: dict, + stream_schema: dict, + stream_metadata: dict, + config: dict, + transformer: Transformer) -> dict: + """ + The sync logic for an incremental stream. + + :param state: A dictionary representing singer state + :param stream_schema: A dictionary containing the stream schema + :param stream_metadata: A dictionnary containing stream metadata + :param config: A dictionary containing tap config data + :param transformer: A singer Transformer object + :return: State data in the form of a dictionary + """ + start_date = singer.get_bookmark( + state, + self.tap_stream_id, + self.replication_key, + config['start_date']) + bookmark_datetime = utils.strptime_to_utc(start_date) + max_datetime = bookmark_datetime + + with metrics.record_counter(self.tap_stream_id) as counter: + for record in self.get_records(config, bookmark_datetime): + transformed_record = transformer.transform(record, stream_schema, stream_metadata) + + record_datetime = utils.strptime_to_utc(transformed_record[self.replication_key]) + + if record_datetime >= bookmark_datetime: + singer.write_record(self.tap_stream_id, transformed_record) + counter.increment() + max_datetime = max(record_datetime, bookmark_datetime) + + bookmark_date = utils.strftime(max_datetime) + + state = singer.write_bookmark( + state, + self.tap_stream_id, + self.replication_key, + bookmark_date) + + singer.write_state(state) + + return state + + +class FullTableStream(BaseStream): + """ + A child class of a base stream used to represent streams that use the + FULL_TABLE replication method. + + :param client: The API client used extract records from the external source + """ + replication_method = 'FULL_TABLE' + + # pylint: disable=too-many-arguments + def sync( + self, + state: dict, + stream_schema: dict, + stream_metadata: dict, + config: dict, + transformer: Transformer) -> dict: + """ + The sync logic for an full table stream. + + :param state: A dictionary representing singer state + :param stream_schema: A dictionary containing the stream schema + :param stream_metadata: A dictionnary containing stream metadata + :param config: A dictionary containing tap config data + :param transformer: A singer Transformer object + :return: State data in the form of a dictionary + """ + with metrics.record_counter(self.tap_stream_id) as counter: + for record in self.get_records(config): + transformed_record = transformer.transform( + record, + stream_schema, + stream_metadata) + singer.write_record(self.tap_stream_id, transformed_record) + counter.increment() + + singer.write_state(state) + + return state + + +class PageBasedPagingStream(IncrementalStream): + """ + A generic page based pagination implementation for the Recharge API. + + Docs: https://developer.rechargepayments.com/?python#page-based-pagination + """ + + def get_records( + self, + bookmark_datetime: datetime = None, + is_parent: bool = False) -> Iterator[list]: + page = 1 + self.params.update({ + 'limit': MAX_PAGE_LIMIT, + 'page': page + }) + result_size = MAX_PAGE_LIMIT + + while result_size == MAX_PAGE_LIMIT: + records, _ = self.client.get(self.path, params=self.params) + + result_size = len(records.get(self.data_key)) + page += 1 + self.params.update({'page': page}) + + yield from records.get(self.data_key) + + +class CursorPagingStream(IncrementalStream): + """ + A generic cursor pagination implemantation for the Recharge API. + + Docs: https://developer.rechargepayments.com/?python#cursor-pagination + """ + + def get_records( + self, + bookmark_datetime: datetime = None, + is_parent: bool = False) -> Iterator[list]: + self.params.update({'limit': MAX_PAGE_LIMIT}) + paging = True + path = self.path + url = None + + while paging: + records, links = self.client.get(path, url=url, params=self.params) + + if links.get('next'): + path = None + self.params = None + url = links.get('next', {}).get('url') + else: + paging = False + + yield from records.get(self.data_key) + + +class Addresses(CursorPagingStream): + """ + Retrieves addresses from the Recharge API. + + Docs: https://developer.rechargepayments.com/#list-addresses + """ + tap_stream_id = 'addresses' + key_properties = ['id'] + path = 'addresses' + replication_key = 'updated_at' + valid_replication_keys = ['updated_at'] + params = {'sort_by': f'{replication_key}-asc'} + data_key = 'addresses' + + +class Charges(CursorPagingStream): + """ + Retrieves charges from the Recharge API. + + Docs: https://developer.rechargepayments.com/#list-charges + """ + tap_stream_id = 'charges' + key_properties = ['id'] + path = 'charges' + replication_key = 'updated_at' + valid_replication_keys = ['updated_at'] + params = {'sort_by': f'{replication_key}-asc'} + data_key = 'charges' + + +class Collections(CursorPagingStream): + """ + Retrieves collections from the Recharge API. + + Docs: https://developer.rechargepayments.com/#list-collections + """ + tap_stream_id = 'collections' + key_properties = ['id'] + path = 'collections' + replication_key = 'updated_at' + valid_replication_keys = ['updated_at'] + params = {'sort_by': f'{replication_key}-asc'} + data_key = 'collections' + + +class Customers(PageBasedPagingStream): + """ + Retrieves customers from the Recharge API. + + Docs: https://developer.rechargepayments.com/#list-customers + """ + tap_stream_id = 'customers' + key_properties = ['id'] + path = 'customers' + replication_key = 'updated_at' + valid_replication_keys = ['updated_at'] + params = {'sort_by': f'{replication_key}-asc'} + data_key = 'customers' + + +class Discounts(CursorPagingStream): + """ + Retrieves discounts from the Recharge API. + + Docs: https://developer.rechargepayments.com/#list-discounts + """ + tap_stream_id = 'discounts' + key_properties = ['id'] + path = 'discounts' + replication_key = 'updated_at' + valid_replication_keys = ['updated_at'] + params = {'sort_by': f'{replication_key}-asc'} + data_key = 'discounts' + + +class MetafieldsStore(PageBasedPagingStream): + """ + Retrieves store metafields from the Recharge API. + + Docs: https://developer.rechargepayments.com/#list-metafields + """ + tap_stream_id = 'metafields_store' + key_properties = ['id'] + path = 'metafields' + replication_key = 'updated_at' # pseudo-incremental; doesn't support `updated_at_min` param + valid_replication_keys = ['updated_at'] + params = { + 'sort_by': f'{replication_key}-asc', + 'owner_resource': 'store' + } + data_key = 'metafields' + + +class MetafieldsCustomer(PageBasedPagingStream): + """ + Retrieves customer metafields from the Recharge API. + + Docs: https://developer.rechargepayments.com/#list-metafields + """ + tap_stream_id = 'metafields_customer' + key_properties = ['id'] + path = 'metafields' + replication_key = 'updated_at' # pseudo-incremental; doesn't support `updated_at_min` param + valid_replication_keys = ['updated_at'] + params = { + 'sort_by': f'{replication_key}-asc', + 'owner_resource': 'customer' + } + data_key = 'metafields' + + +class MetafieldsSubscription(PageBasedPagingStream): + """ + Retrieves subscription metafields from the Recharge API. + + Docs: https://developer.rechargepayments.com/#list-metafields + """ + tap_stream_id = 'metafields_subscription' + key_properties = ['id'] + path = 'metafields' + replication_key = 'updated_at' # pseudo-incremental; doesn't support `updated_at_min` param + valid_replication_keys = ['updated_at'] + params = { + 'sort_by': f'{replication_key}-asc', + 'owner_resource': 'subscription' + } + data_key = 'metafields' + + +class Onetimes(PageBasedPagingStream): + """ + Retrieves non-recurring line items on queued orders from the Recharge API. + + Docs: https://developer.rechargepayments.com/#list-onetimes + """ + tap_stream_id = 'onetimes' + key_properties = ['id'] + path = 'onetimes' + replication_key = 'updated_at' + valid_replication_keys = ['updated_at'] + params = {'sort_by': f'{replication_key}-asc'} + data_key = 'onetimes' + + +class Orders(CursorPagingStream): + """ + Retrieves orders from the Recharge API. + + Docs: https://developer.rechargepayments.com/#list-orders + """ + tap_stream_id = 'orders' + key_properties = ['id'] + path = 'orders' + replication_key = 'updated_at' + valid_replication_keys = ['updated_at'] + params = {'sort_by': f'{replication_key}-asc'} + data_key = 'orders' + + +class Products(CursorPagingStream): + """ + Retrieves products from the Recharge API. + + Docs: https://developer.rechargepayments.com/#list-products + """ + tap_stream_id = 'products' + key_properties = ['id'] + path = 'products' + replication_key = 'updated_at' # pseudo-incremental; doesn't support `updated_at_min` param + valid_replication_keys = ['updated_at'] + params = {'sort_by': f'{replication_key}-asc'} + data_key = 'products' + + +class Shop(FullTableStream): + """ + Retrieves basic info about your store setup from the Recharge API. + + Docs: https://developer.rechargepayments.com/#shop + """ + tap_stream_id = 'shop' + key_properties = ['id'] + path = 'shop' + data_key = 'shop' + + def get_records( + self, + bookmark_datetime: datetime = None, + is_parent: bool = False) -> Iterator[list]: + records, _ = self.client.get(self.path) + + return [records.get(self.data_key)] + + +class Subscriptions(CursorPagingStream): + """ + Retrieves subscriptions from the Recharge API. + + Docs: https://developer.rechargepayments.com/#list-subscriptions + """ + tap_stream_id = 'subscriptions' + key_properties = ['id'] + path = 'subscriptions' + replication_key = 'updated_at' + valid_replication_keys = ['updated_at'] + params = {'sort_by': f'{replication_key}-asc'} + data_key = 'subscriptions' + + +STREAMS = { + 'addresses': Addresses, + 'charges': Charges, + 'collections': Collections, + 'customers': Customers, + 'discounts': Discounts, + 'metafields_store': MetafieldsStore, + 'metafields_customer': MetafieldsCustomer, + 'metafields_subscription': MetafieldsSubscription, + 'onetimes': Onetimes, + 'orders': Orders, + 'products': Products, + 'shop': Shop, + 'subscriptions': Subscriptions +} diff --git a/tap_recharge/sync.py b/tap_recharge/sync.py index 0c72cc1..9d9d847 100644 --- a/tap_recharge/sync.py +++ b/tap_recharge/sync.py @@ -1,474 +1,44 @@ import singer -from singer import metrics, metadata, Transformer, utils +from singer import Transformer, Catalog, metadata -LOGGER = singer.get_logger() - - -def write_schema(catalog, stream_name): - stream = catalog.get_stream(stream_name) - schema = stream.schema.to_dict() - try: - singer.write_schema(stream_name, schema, stream.key_properties) - except OSError as err: - LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) - raise err - - -def write_record(stream_name, record, time_extracted): - try: - singer.write_record(stream_name, record, time_extracted=time_extracted) - except OSError as err: - LOGGER.info('OS Error writing record for: {}'.format(stream_name)) - LOGGER.info('record: {}'.format(record)) - raise err - - -def get_bookmark(state, stream, default): - if (state is None) or ('bookmarks' not in state): - return default - return ( - state - .get('bookmarks', {}) - .get(stream, default) - ) - - -def write_bookmark(state, stream, value): - if 'bookmarks' not in state: - state['bookmarks'] = {} - state['bookmarks'][stream] = value - LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value)) - singer.write_state(state) - - -def process_records(catalog, #pylint: disable=too-many-branches - stream_name, - records, - time_extracted, - bookmark_field=None, - bookmark_type=None, - max_bookmark_value=None, - last_datetime=None, - last_integer=None, - parent=None, - parent_id=None): - stream = catalog.get_stream(stream_name) - schema = stream.schema.to_dict() - stream_metadata = metadata.to_map(stream.metadata) - - with metrics.record_counter(stream_name) as counter: - for record in records: - # If child object, add parent_id to record - if parent_id and parent: - record[parent + '_id'] = parent_id - - # Transform record for Singer.io - with Transformer() as transformer: - transformed_record = transformer.transform(record, - schema, - stream_metadata) - - # Reset max_bookmark_value to new value if higher - if bookmark_field and (bookmark_field in transformed_record): - if (max_bookmark_value is None) or \ - (transformed_record[bookmark_field] > max_bookmark_value): - max_bookmark_value = transformed_record[bookmark_field] - - if bookmark_field and (bookmark_field in transformed_record): - if bookmark_type == 'integer': - # Keep only records whose bookmark is after the last_integer - if transformed_record[bookmark_field] >= last_integer: - write_record(stream_name, transformed_record, time_extracted=time_extracted) - counter.increment() - elif bookmark_type == 'datetime': - last_dttm = transformer._transform_datetime(last_datetime) - bookmark_dttm = transformer._transform_datetime(record[bookmark_field]) - # Keep only records whose bookmark is after the last_datetime - if bookmark_dttm >= last_dttm: - write_record(stream_name, transformed_record, time_extracted=time_extracted) - counter.increment() - else: - write_record(stream_name, transformed_record, time_extracted=time_extracted) - counter.increment() - - return max_bookmark_value, counter.value - - -# Sync a specific parent or child endpoint. -def sync_endpoint(client, #pylint: disable=too-many-branches - catalog, - state, - start_date, - stream_name, - path, - endpoint_config, - data_key, - static_params, - bookmark_query_field=None, - bookmark_field=None, - bookmark_type=None, - id_fields=None, - parent=None, - parent_id=None): - - # Get the latest bookmark for the stream and set the last_integer/datetime - last_datetime = None - last_integer = None - max_bookmark_value = None - if bookmark_type == 'integer': - last_integer = get_bookmark(state, stream_name, 0) - max_bookmark_value = last_integer - else: - last_datetime = get_bookmark(state, stream_name, start_date) - max_bookmark_value = last_datetime - - write_schema(catalog, stream_name) - - # pagination: loop thru all pages of data - page = 1 - pg_size = 100 - from_rec = 1 - record_count = pg_size # initial value, set with first API call - total_records = 0 - while record_count == pg_size: - params = { - 'page': page, - 'limit': pg_size, - **static_params # adds in endpoint specific, sort, filter params - } - - if bookmark_query_field: - if bookmark_type == 'datetime': - params[bookmark_query_field] = last_datetime[0:10] # last_datetime date - elif bookmark_type == 'integer': - params[bookmark_query_field] = last_integer - - LOGGER.info('{} - Sync start {}'.format( - stream_name, - 'since: {}, '.format(last_datetime) if bookmark_query_field else '')) - - # Squash params to query-string params - querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]) - LOGGER.info('URL for {}: https://api.rechargeapps.com/{}?{}'\ - .format(stream_name, path, querystring)) - - # Get data, API request - data = client.get( - path, - params=querystring, - endpoint=stream_name) - # time_extracted: datetime when the data was extracted from the API - time_extracted = utils.now() - - # Transform data: currently NO transformations on data - # The data_key identifies the collection of records below the element - # LOGGER.info('data = {}'.format(data)) # TESTING, comment out - transformed_data = [] # initialize the record list - if data_key in data: - transformed_data = data[data_key] - # LOGGER.info('transformed_data = {}'.format(transformed_data)) # TESTING, comment out - if not transformed_data or transformed_data is None: - break # No data results - - # If transformed_data is a single-record dict (like shop endpoint), add it to a list - if isinstance(transformed_data, dict): - tdata = [] - tdata.append(transformed_data) - transformed_data = tdata - - # Process records and get the max_bookmark_value and record_count for the set of records - max_bookmark_value, record_count = process_records( - catalog=catalog, - stream_name=stream_name, - records=transformed_data, - time_extracted=time_extracted, - bookmark_field=bookmark_field, - bookmark_type=bookmark_type, - max_bookmark_value=max_bookmark_value, - last_datetime=last_datetime, - last_integer=last_integer, - parent=parent, - parent_id=parent_id) - - # Set total_records - total_records = total_records + record_count - - # Loop thru parent batch records for each children objects (if should stream) - children = endpoint_config.get('children') - if children: - for child_stream_name, child_endpoint_config in children.items(): - should_stream, last_stream_child = should_sync_stream(get_selected_streams(catalog), - None, - child_stream_name) - if should_stream: - # For each parent record - for record in transformed_data: - i = 0 - # Set parent_id - for id_field in id_fields: - if i == 0: - parent_id_field = id_field - if id_field == 'id': - parent_id_field = id_field - i = i + 1 - parent_id = record.get(parent_id_field) - - LOGGER.info('Syncing: {}, parent_stream: {}, parent_id: {}'.format( - child_stream_name, - stream_name, - parent_id)) - child_path = child_endpoint_config.get('path').format(parent_id) - child_total_records = sync_endpoint( - client=client, - catalog=catalog, - state=state, - start_date=start_date, - stream_name=child_stream_name, - path=child_path, - endpoint_config=child_endpoint_config, - data_key=child_endpoint_config.get('data_key', child_stream_name), - static_params=child_endpoint_config.get('params', {}), - bookmark_query_field=child_endpoint_config.get('bookmark_query_field'), - bookmark_field=child_endpoint_config.get('bookmark_field'), - bookmark_type=child_endpoint_config.get('bookmark_type'), - id_fields=child_endpoint_config.get('id_fields'), - parent=child_endpoint_config.get('parent'), - parent_id=parent_id) - LOGGER.info('Synced: {}, parent_id: {}, total_records: {}'.format( - child_stream_name, - parent_id, - child_total_records)) - - # Set to_rec: to record; ending record for the batch - to_rec = from_rec + record_count - 1 - - LOGGER.info('{} - Synced Page {} - Records {} to {}'.format( - stream_name, - page, - from_rec, - to_rec)) - # Pagination: increment the page by 1 - page = page + 1 - from_rec = to_rec + 1 - - # Update the state with the max_bookmark_value for the stream after ALL pages - # because the API requested data is NOT sorted - if bookmark_field: - write_bookmark(state, - stream_name, - max_bookmark_value) - - # Return the list of ids to the stream, in case this is a parent stream with children. - return total_records - - -# Review catalog and make a list of selected streams -def get_selected_streams(catalog): - selected_streams = set() - for stream in catalog.streams: - mdata = metadata.to_map(stream.metadata) - root_metadata = mdata.get(()) - if root_metadata and root_metadata.get('selected') is True: - selected_streams.add(stream.tap_stream_id) - return list(selected_streams) +from tap_recharge.client import RechargeClient +from tap_recharge.streams import STREAMS +LOGGER = singer.get_logger() -# Currently syncing sets the stream currently being delivered in the state. -# If the integration is interrupted, this state property is used to identify -# the starting point to continue from. -# Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46 -def update_currently_syncing(state, stream_name): - if (stream_name is None) and ('currently_syncing' in state): - del state['currently_syncing'] - else: - singer.set_currently_syncing(state, stream_name) +def sync( + client: RechargeClient, + config: dict, + state: dict, + catalog: Catalog) -> dict: + """Sync data from tap source""" + + with Transformer() as transformer: + for stream in catalog.get_selected_streams(state): + tap_stream_id = stream.tap_stream_id + stream_obj = STREAMS[tap_stream_id](client) + stream_schema = stream.schema.to_dict() + stream_metadata = metadata.to_map(stream.metadata) + + LOGGER.info('Starting sync for stream: %s', tap_stream_id) + + state = singer.set_currently_syncing(state, tap_stream_id) + singer.write_state(state) + + singer.write_schema( + tap_stream_id, + stream_schema, + stream_obj.key_properties, + stream.replication_key + ) + + state = stream_obj.sync( + state, + stream_schema, + stream_metadata, + config, + transformer) + singer.write_state(state) + + state = singer.set_currently_syncing(state, None) singer.write_state(state) - - -# Review last_stream (last currently syncing stream), if any, -# and continue where it left off in the selected streams. -# Or begin from the beginning, if no last_stream, and sync -# all selected steams. -# Returns should_sync_stream (true/false) and last_stream. -def should_sync_stream(selected_streams, last_stream, stream_name): - if last_stream == stream_name or last_stream is None: - if last_stream is not None: - last_stream = None - if stream_name in selected_streams: - return True, last_stream - return False, last_stream - - -def sync(client, catalog, state, start_date): - selected_streams = get_selected_streams(catalog) - LOGGER.info('selected_streams: {}'.format(selected_streams)) - - if not selected_streams: - return - - # last_stream = Previous currently synced stream, if the load was interrupted - last_stream = singer.get_currently_syncing(state) - LOGGER.info('last/currently syncing stream: {}'.format(last_stream)) - - # endpoints: API URL endpoints to be called - # properties: - # : Plural stream name for the endpoint - # path: API endpoint relative path, when added to the base URL, creates the full path - # params: Query, sort, and other endpoint specific parameters - # data_key: JSON element containing the records for the endpoint - # bookmark_query_field: Typically a date-time field used for filtering the query - # bookmark_field: Replication key field, typically a date-time, used for filtering the results - # and setting the state - # bookmark_type: Data type for bookmark, integer or datetime - # id_fields: Primary key (and other IDs) from the Parent stored when store_ids is true. - # children: A collection of child endpoints (where the endpoint path includes the parent id) - # parent: On each of the children, the singular stream name for parent element - # NOT NEEDED FOR THIS INTEGRATION (The Children all include a reference to the Parent) - endpoints = { - 'addresses': { - 'path': 'addresses', - 'params': {}, - 'data_key': 'addresses', - 'bookmark_query_field': 'updated_at_min', - 'bookmark_field': 'updated_at', - 'bookmark_type': 'datetime', - 'id_fields': ['id'] - }, - 'charges': { - 'path': 'charges', - 'params': {}, - 'data_key': 'charges', - 'bookmark_query_field': 'updated_at_min', - 'bookmark_field': 'updated_at', - 'bookmark_type': 'datetime', - 'id_fields': ['id'] - }, - 'collections': { - 'path': 'collections', - 'params': {}, - 'data_key': 'collections', - 'bookmark_field': 'updated_at', - 'bookmark_type': 'datetime', - 'id_fields': ['id'] - }, - 'customers': { - 'path': 'customers', - 'params': {}, - 'data_key': 'customers', - 'bookmark_query_field': 'updated_at_min', - 'bookmark_field': 'updated_at', - 'bookmark_type': 'datetime', - 'id_fields': ['id'] - }, - 'discounts': { - 'path': 'discounts', - 'params': {}, - 'data_key': 'discounts', - 'bookmark_query_field': 'updated_at_min', - 'bookmark_field': 'updated_at', - 'bookmark_type': 'datetime', - 'id_fields': ['id'] - }, - 'metafields_store': { - 'path': 'metafields', - 'params': { - 'owner_resource': 'store' - }, - 'data_key': 'metafields', - 'bookmark_field': 'updated_at', - 'bookmark_type': 'datetime', - 'id_fields': ['id'] - }, - 'metafields_customer': { - 'path': 'metafields', - 'params': { - 'owner_resource': 'customer' - }, - 'data_key': 'metafields', - 'bookmark_field': 'updated_at', - 'bookmark_type': 'datetime', - 'id_fields': ['id'] - }, - 'metafields_subscription': { - 'path': 'metafields', - 'params': { - 'owner_resource': 'subscription' - }, - 'data_key': 'metafields', - 'bookmark_field': 'updated_at', - 'bookmark_type': 'datetime', - 'id_fields': ['id'] - }, - 'onetimes': { - 'path': 'onetimes', - 'params': {}, - 'data_key': 'onetimes', - 'bookmark_query_field': 'updated_at_min', - 'bookmark_field': 'updated_at', - 'bookmark_type': 'datetime', - 'id_fields': ['id'] - }, - 'orders': { - 'path': 'orders', - 'params': {}, - 'data_key': 'orders', - 'bookmark_query_field': 'updated_at_min', - 'bookmark_field': 'updated_at', - 'bookmark_type': 'datetime', - 'id_fields': ['id'] - }, - 'products': { - 'path': 'products', - 'params': {}, - 'data_key': 'products', - 'bookmark_field': 'updated_at', - 'bookmark_type': 'datetime', - 'id_fields': ['id'] - }, - 'shop': { - 'path': 'shop', - 'params': {}, - 'data_key': 'shop', - 'id_fields': ['id'] - }, - 'subscriptions': { - 'path': 'subscriptions', - 'params': {}, - 'data_key': 'subscriptions', - 'bookmark_query_field': 'updated_at_min', - 'bookmark_field': 'updated_at', - 'bookmark_type': 'datetime', - 'id_fields': ['id'] - } - } - - # For each endpoint (above), determine if the stream should be streamed - # (based on the catalog and last_stream), then sync those streams. - for stream_name, endpoint_config in endpoints.items(): - should_stream, last_stream = should_sync_stream(selected_streams, - last_stream, - stream_name) - if should_stream: - LOGGER.info('START Syncing: {}'.format(stream_name)) - update_currently_syncing(state, stream_name) - path = endpoint_config.get('path') - total_records = sync_endpoint( - client=client, - catalog=catalog, - state=state, - start_date=start_date, - stream_name=stream_name, - path=path, - endpoint_config=endpoint_config, - data_key=endpoint_config.get('data_key', stream_name), - static_params=endpoint_config.get('params', {}), - bookmark_query_field=endpoint_config.get('bookmark_query_field'), - bookmark_field=endpoint_config.get('bookmark_field'), - bookmark_type=endpoint_config.get('bookmark_type'), - id_fields=endpoint_config.get('id_fields')) - - update_currently_syncing(state, None) - LOGGER.info('Synced: {}, total_records: {}'.format( - stream_name, - total_records)) - LOGGER.info('FINISHED Syncing: {}'.format(stream_name)) diff --git a/tests/base.py b/tests/base.py new file mode 100644 index 0000000..f35d676 --- /dev/null +++ b/tests/base.py @@ -0,0 +1,355 @@ +""" +Setup expectations for test sub classes +Run discovery for as a prerequisite for most tests +""" +import os +import unittest + +from datetime import datetime as dt, timedelta + +from tap_tester import connections, menagerie, runner + +from singer import get_logger + + +class RechargeBaseTest(unittest.TestCase): + """ + Setup expectations for test sub classes. + Metadata describing streams. + + A bunch of shared methods that are used in tap-tester tests. + Shared tap-specific methods (as needed). + """ + AUTOMATIC_FIELDS = "automatic" + REPLICATION_KEYS = "valid-replication-keys" + PRIMARY_KEYS = "table-key-properties" + FOREIGN_KEYS = "table-foreign-key-properties" + REPLICATION_METHOD = "forced-replication-method" + API_LIMIT = "max-row-limit" + INCREMENTAL = "INCREMENTAL" + FULL_TABLE = "FULL_TABLE" + START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" + BOOKMARK_COMPARISON_FORMAT = "%Y-%m-%dT00:00:00+00:00" + LOGGER = get_logger() + + start_date = "2021-09-20T00:00:00Z" + + @staticmethod + def tap_name(): + """The name of the tap""" + return "tap-recharge" + + @staticmethod + def get_type(): + """the expected url route ending""" + return "platform.recharge" + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + return_value = { + 'start_date' : '2021-09-01T00:00:00Z', + } + if original: + return return_value + + return_value["start_date"] = self.start_date + return return_value + + @staticmethod + def get_credentials(): + """Authentication information for the test account""" + return {'access_token': os.getenv('TAP_RECHARGE_ACCESS_TOKEN')} + + def expected_metadata(self): + """The expected streams and metadata about the streams""" + return { + "addresses": { + self.PRIMARY_KEYS: {"id", }, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"} + }, + "charges": { + self.PRIMARY_KEYS: {"id", }, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"} + }, + "collections": { + self.PRIMARY_KEYS: {"id", }, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"} + }, + "customers": { + self.PRIMARY_KEYS: {"id", }, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"} + }, + "discounts": { + self.PRIMARY_KEYS: {"id", }, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"} + }, + "metafields_store": { + self.PRIMARY_KEYS: {"id", }, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"} + }, + "metafields_customer": { + self.PRIMARY_KEYS: {"id", }, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"} + }, + "metafields_subscription": { + self.PRIMARY_KEYS: {"id", }, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"} + }, + "onetimes": { + self.PRIMARY_KEYS: {"id", }, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"} + }, + "orders": { + self.PRIMARY_KEYS: {"id", }, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"} + }, + "products": { + self.PRIMARY_KEYS: {"id", }, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"} + }, + "shop": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.FULL_TABLE + }, + "subscriptions": { + self.PRIMARY_KEYS: {"id", }, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"} + } + } + + + def expected_streams(self): + """A set of expected stream names""" + return set(self.expected_metadata().keys()) + + def child_streams(self): + """ + Return a set of streams that are child streams + based on having foreign key metadata + """ + return {stream for stream, metadata in self.expected_metadata().items() + if metadata.get(self.FOREIGN_KEYS)} + + def expected_primary_keys(self): + """ + return a dictionary with key of table name + and value as a set of primary key fields + """ + return {table: properties.get(self.PRIMARY_KEYS, set()) + for table, properties + in self.expected_metadata().items()} + + def expected_replication_keys(self): + """ + return a dictionary with key of table name + and value as a set of replication key fields + """ + return {table: properties.get(self.REPLICATION_KEYS, set()) + for table, properties + in self.expected_metadata().items()} + + def expected_foreign_keys(self): + """ + return a dictionary with key of table name + and value as a set of foreign key fields + """ + return {table: properties.get(self.FOREIGN_KEYS, set()) + for table, properties + in self.expected_metadata().items()} + + def expected_automatic_fields(self): + auto_fields = {} + for k, v in self.expected_metadata().items(): + auto_fields[k] = v.get(self.PRIMARY_KEYS, set()) | v.get(self.REPLICATION_KEYS, set()) \ + | v.get(self.FOREIGN_KEYS, set()) + return auto_fields + + def expected_replication_method(self): + """return a dictionary with key of table name nd value of replication method""" + return {table: properties.get(self.REPLICATION_METHOD, None) + for table, properties + in self.expected_metadata().items()} + + def setUp(self): + missing_envs = [x for x in [os.getenv('TAP_RECHARGE_ACCESS_TOKEN')] if x is None] + if len(missing_envs) != 0: + raise Exception("set environment variables") + + ######################### + # Helper Methods # + ######################### + + def run_and_verify_check_mode(self, conn_id): + """ + Run the tap in check mode and verify it succeeds. + This should be ran prior to field selection and initial sync. + + Return the connection id and found catalogs from menagerie. + """ + # run in check mode + check_job_name = runner.run_check_mode(self, conn_id) + + # verify check exit codes + exit_status = menagerie.get_exit_status(conn_id, check_job_name) + menagerie.verify_check_exit_status(self, exit_status, check_job_name) + + found_catalogs = menagerie.get_catalogs(conn_id) + self.assertGreater(len(found_catalogs), 0, msg="unable to locate schemas for connection {}".format(conn_id)) + + found_catalog_names = set(map(lambda c: c['stream_name'], found_catalogs)) + + self.assertSetEqual(self.expected_streams(), found_catalog_names, msg="discovered schemas do not match") + print("discovered schemas are OK") + + return found_catalogs + + def run_and_verify_sync(self, conn_id): + """ + Run a sync job and make sure it exited properly. + Return a dictionary with keys of streams synced + and values of records synced for each stream + """ + # Run a sync job using orchestrator + sync_job_name = runner.run_sync_mode(self, conn_id) + + # Verify tap and target exit codes + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + + # Verify actual rows were synced + sync_record_count = runner.examine_target_output_file( + self, conn_id, self.expected_streams(), self.expected_primary_keys()) + self.assertGreater( + sum(sync_record_count.values()), 0, + msg="failed to replicate any data: {}".format(sync_record_count) + ) + print("total replicated row count: {}".format(sum(sync_record_count.values()))) + + return sync_record_count + + def perform_and_verify_table_and_field_selection(self, + conn_id, + test_catalogs, + select_all_fields=True): + """ + Perform table and field selection based off of the streams to select + set and field selection parameters. + + Verify this results in the expected streams selected and all or no + fields selected for those streams. + """ + + # Select all available fields or select no fields from all testable streams + self.select_all_streams_and_fields( + conn_id=conn_id, catalogs=test_catalogs, select_all_fields=select_all_fields + ) + + catalogs = menagerie.get_catalogs(conn_id) + + # Ensure our selection affects the catalog + expected_selected = [tc.get('stream_name') for tc in test_catalogs] + for cat in catalogs: + catalog_entry = menagerie.get_annotated_schema(conn_id, cat['stream_id']) + + # Verify all testable streams are selected + selected = catalog_entry.get('annotated-schema').get('selected') + print("Validating selection on {}: {}".format(cat['stream_name'], selected)) + if cat['stream_name'] not in expected_selected: + self.assertFalse(selected, msg="Stream selected, but not testable.") + continue # Skip remaining assertions if we aren't selecting this stream + self.assertTrue(selected, msg="Stream not selected.") + + if select_all_fields: + # Verify all fields within each selected stream are selected + for field, field_props in catalog_entry.get('annotated-schema').get('properties').items(): + field_selected = field_props.get('selected') + print("\tValidating selection on {}.{}: {}".format( + cat['stream_name'], field, field_selected)) + self.assertTrue(field_selected, msg="Field not selected.") + else: + # Verify only automatic fields are selected + expected_automatic_fields = self.expected_automatic_fields().get(cat['stream_name']) + selected_fields = self.get_selected_fields_from_metadata(catalog_entry['metadata']) + self.assertEqual(expected_automatic_fields, selected_fields) + + @staticmethod + def get_selected_fields_from_metadata(metadata): + selected_fields = set() + for field in metadata: + is_field_metadata = len(field['breadcrumb']) > 1 + inclusion_automatic_or_selected = ( + field['metadata']['selected'] is True or \ + field['metadata']['inclusion'] == 'automatic' + ) + if is_field_metadata and inclusion_automatic_or_selected: + selected_fields.add(field['breadcrumb'][1]) + return selected_fields + + @staticmethod + def select_all_streams_and_fields(conn_id, catalogs, select_all_fields: bool = True): + """Select all streams and all fields within streams""" + for catalog in catalogs: + schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) + + non_selected_properties = [] + if not select_all_fields: + # get a list of all properties so that none are selected + non_selected_properties = schema.get('annotated-schema', {}).get( + 'properties', {}).keys() + + connections.select_catalog_and_fields_via_metadata( + conn_id, catalog, schema, [], non_selected_properties) + + @staticmethod + def parse_date(date_value): + """ + Pass in string-formatted-datetime, parse the value, and return it as an unformatted datetime object. + """ + date_formats = { + "%Y-%m-%dT%H:%M:%S.%fZ", + "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%dT%H:%M:%S.%f+00:00", + "%Y-%m-%dT%H:%M:%S+00:00", + "%Y-%m-%d" + } + for date_format in date_formats: + try: + date_stripped = dt.strptime(date_value, date_format) + return date_stripped + except ValueError: + continue + + raise NotImplementedError("Tests do not account for dates of this format: {}".format(date_value)) + + def timedelta_formatted(self, dtime, days=0): + try: + date_stripped = dt.strptime(dtime, self.START_DATE_FORMAT) + return_date = date_stripped + timedelta(days=days) + + return dt.strftime(return_date, self.START_DATE_FORMAT) + + except ValueError: + try: + date_stripped = dt.strptime(dtime, self.BOOKMARK_COMPARISON_FORMAT) + return_date = date_stripped + timedelta(days=days) + + return dt.strftime(return_date, self.BOOKMARK_COMPARISON_FORMAT) + + except ValueError: + return Exception("Datetime object is not of the format: {}".format(self.START_DATE_FORMAT)) + + ########################################################################## + ### Tap Specific Methods + ########################################################################## diff --git a/tests/test_recharge_automated_fields.py b/tests/test_recharge_automated_fields.py new file mode 100644 index 0000000..56dd778 --- /dev/null +++ b/tests/test_recharge_automated_fields.py @@ -0,0 +1,63 @@ +""" +Test that with no fields selected for a stream automatic fields are still replicated +""" +from tap_tester import runner, connections + +from base import RechargeBaseTest + + +class RechargeAutomaticFields(RechargeBaseTest): + """Test that with no fields selected for a stream automatic fields are still replicated""" + + @staticmethod + def name(): + return "tap_tester_recharge_automatic_fields" + + def test_run(self): + """ + Verify that for each stream you can get multiple pages of data + when no fields are selected and only the automatic fields are replicated. + + PREREQUISITE + For EACH stream add enough data that you surpass the limit of a single + fetch of data. For instance if you have a limit of 250 records ensure + that 251 (or more) records have been posted for that stream. + """ + + expected_streams = self.expected_streams() + + # instantiate connection + conn_id = connections.ensure_connection(self) + + # run check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs_automatic_fields = [catalog for catalog in found_catalogs + if catalog.get('stream_name') in expected_streams] + + self.perform_and_verify_table_and_field_selection( + conn_id, test_catalogs_automatic_fields, select_all_fields=False, + ) + + # run initial sync + record_count_by_stream = self.run_and_verify_sync(conn_id) + synced_records = runner.get_records_from_target_output() + + for stream in expected_streams: + with self.subTest(stream=stream): + # expected values + expected_keys = self.expected_automatic_fields().get(stream) + + # collect actual values + data = synced_records.get(stream, {}) + record_messages_keys = [set(row.get('data').keys()) for row in data.get('messages', {})] + + # Verify that you get some records for each stream + self.assertGreater( + record_count_by_stream.get(stream, -1), 0, + msg="The number of records is not over the stream max limit for the {} stream".format(stream)) + + # Verify that only the automatic fields are sent to the target + for actual_keys in record_messages_keys: + self.assertSetEqual(expected_keys, actual_keys) diff --git a/tests/test_recharge_discovery.py b/tests/test_recharge_discovery.py new file mode 100644 index 0000000..04ed4dd --- /dev/null +++ b/tests/test_recharge_discovery.py @@ -0,0 +1,121 @@ +"""Test tap discovery mode and metadata.""" +import re + +from tap_tester import menagerie, connections + +from base import RechargeBaseTest + + +class DiscoveryTest(RechargeBaseTest): + """Test tap discovery mode and metadata conforms to standards.""" + + @staticmethod + def name(): + return "tap_tester_recharge_discovery_test" + + def test_run(self): + """ + Testing that discovery creates the appropriate catalog with valid metadata. + + • Verify number of actual streams discovered match expected + • Verify the stream names discovered were what we expect + • Verify stream names follow naming convention + streams should only have lowercase alphas and underscores + • verify there is only 1 top level breadcrumb + • verify replication key(s) + • verify primary key(s) + • verify that primary, replication and foreign keys + are given the inclusion of automatic. + • verify that all other fields have inclusion of available metadata. + """ + streams_to_test = self.expected_streams() + + conn_id = connections.ensure_connection(self) + + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # Verify stream names follow naming convention + # streams should only have lowercase alphas and underscores + found_catalog_names = {c['tap_stream_id'] for c in found_catalogs} + self.assertTrue(all([re.fullmatch(r"[a-z_]+", name) for name in found_catalog_names]), + msg="One or more streams don't follow standard naming") + + for stream in streams_to_test: + with self.subTest(stream=stream): + # Verify ensure the catalog is found for a given stream + catalog = next(iter([catalog for catalog in found_catalogs + if catalog["stream_name"] == stream])) + self.assertIsNotNone(catalog) + + # collecting expected values + expected_primary_keys = self.expected_primary_keys()[stream] + expected_replication_keys = self.expected_replication_keys()[stream] + expected_automatic_fields = expected_primary_keys | expected_replication_keys + expected_replication_method = self.expected_replication_method()[stream] + + # collecting actual values... + schema_and_metadata = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) + metadata = schema_and_metadata["metadata"] + stream_properties = [item for item in metadata if item.get("breadcrumb") == []] + actual_primary_keys = set( + stream_properties[0].get( + "metadata", {self.PRIMARY_KEYS: []}).get(self.PRIMARY_KEYS, []) + ) + actual_replication_keys = set( + stream_properties[0].get( + "metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, []) + ) + actual_replication_method = stream_properties[0].get( + "metadata", {self.REPLICATION_METHOD: None}).get(self.REPLICATION_METHOD) + actual_automatic_fields = set( + item.get("breadcrumb", ["properties", None])[1] for item in metadata + if item.get("metadata").get("inclusion") == "automatic" + ) + + ########################################################################## + ### metadata assertions + ########################################################################## + + # verify there is only 1 top level breadcrumb in metadata + self.assertTrue(len(stream_properties) == 1, + msg="There is NOT only one top level breadcrumb for {}".format(stream) + \ + "\nstream_properties | {}".format(stream_properties)) + + # verify that if there is a replication key we are doing INCREMENTAL otherwise FULL + if actual_replication_keys: + self.assertTrue(actual_replication_method == self.INCREMENTAL, + msg="Expected INCREMENTAL replication " + "since there is a replication key") + else: + self.assertTrue(actual_replication_method == self.FULL_TABLE, + msg="Expected FULL replication " + "since there is no replication key") + + # verify the actual replication matches our expected replication method + self.assertEqual(expected_replication_method, + actual_replication_method, + msg="The actual replication method {} doesn't match the expected {}".format( + actual_replication_method, expected_replication_method)) + + # verify primary key(s) match expectations + self.assertSetEqual(expected_primary_keys, actual_primary_keys) + + # verify replication key(s) match expectations + self.assertSetEqual(expected_replication_keys, + actual_replication_keys, + msg="The actual replications keys ({}) do not match the expected ({})".format( + actual_replication_keys, expected_replication_keys)) + + # verify that primary keys and replication keys + # are given the inclusion of automatic in metadata. + self.assertSetEqual(expected_automatic_fields, actual_automatic_fields) + + # verify that all other fields have inclusion of available + # This assumes there are no unsupported fields for SaaS sources + self.assertTrue( + all({item.get("metadata").get("inclusion") == "available" + for item in metadata + if item.get("breadcrumb", []) != [] + and item.get("breadcrumb", ["properties", None])[1] + not in actual_automatic_fields}), + msg="Not all non key properties are set to available in metadata") diff --git a/tests/test_recharge_pagination.py b/tests/test_recharge_pagination.py new file mode 100644 index 0000000..68b8802 --- /dev/null +++ b/tests/test_recharge_pagination.py @@ -0,0 +1,62 @@ +""" +Test tap pagination of streams +""" + +from tap_tester import runner, connections + +from base import RechargeBaseTest + +class RechargePaginationTest(RechargeBaseTest): + + @staticmethod + def name(): + return "tap_tester_recharge_pagination_test" + + def test_run(self): + page_size = 250 + conn_id = connections.ensure_connection(self) + + # Checking pagination for streams with enough data + expected_streams = [ + "addresses", + "customers", + "discounts", + "metafields_subscription", + "onetimes", + ] + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs = [catalog for catalog in found_catalogs + if catalog.get('stream_name') in expected_streams] + + self.perform_and_verify_table_and_field_selection(conn_id, test_catalogs) + + record_count_by_stream = self.run_and_verify_sync(conn_id) + + synced_records = runner.get_records_from_target_output() + + for stream in expected_streams: + with self.subTest(stream=stream): + # expected values + expected_primary_keys = self.expected_primary_keys() + + # collect information for assertions from syncs 1 & 2 base on expected values + record_count_sync = record_count_by_stream.get(stream, 0) + primary_keys_list = [tuple(message.get('data').get(expected_pk) + for expected_pk in expected_primary_keys[stream]) + for message in synced_records.get(stream).get('messages') + if message.get('action') == 'upsert'] + + # verify records are more than page size so multiple page is working + self.assertGreater(record_count_sync, page_size) + + primary_keys_list_1 = primary_keys_list[:page_size] + primary_keys_list_2 = primary_keys_list[page_size:2*page_size] + + primary_keys_page_1 = set(primary_keys_list_1) + primary_keys_page_2 = set(primary_keys_list_2) + + # Verify by private keys that data is unique for page + self.assertEqual(len(primary_keys_page_1), page_size) # verify there are no dupes on a page + self.assertTrue(primary_keys_page_1.isdisjoint(primary_keys_page_2)) # verify there are no dupes between pages diff --git a/tests/test_recharge_start_date.py b/tests/test_recharge_start_date.py new file mode 100644 index 0000000..9698ef5 --- /dev/null +++ b/tests/test_recharge_start_date.py @@ -0,0 +1,110 @@ +from tap_tester import connections, runner + +from base import RechargeBaseTest + + +class RechargeStartDateTest(RechargeBaseTest): + + start_date_1 = "" + start_date_2 = "" + + @staticmethod + def name(): + return "tap_tester_recharge_start_date_test" + + def test_run(self): + """Instantiate start date according to the desired data set and run the test""" + + self.start_date_1 = self.get_properties().get('start_date') + self.start_date_2 = self.timedelta_formatted(self.start_date_1, days=31) + + self.start_date = self.start_date_1 + + expected_streams = self.expected_streams() + expected_replication_methods = self.expected_replication_method() + + ########################################################################## + ### First Sync + ########################################################################## + + # instantiate connection + conn_id_1 = connections.ensure_connection(self) + + # run check mode + found_catalogs_1 = self.run_and_verify_check_mode(conn_id_1) + + # table and field selection + test_catalogs_1_all_fields = [catalog for catalog in found_catalogs_1 + if catalog.get('tap_stream_id') in expected_streams] + self.perform_and_verify_table_and_field_selection(conn_id_1, test_catalogs_1_all_fields, select_all_fields=True) + + # run initial sync + record_count_by_stream_1 = self.run_and_verify_sync(conn_id_1) + synced_records_1 = runner.get_records_from_target_output() + + ########################################################################## + ### Update START DATE Between Syncs + ########################################################################## + + print("REPLICATION START DATE CHANGE: {} ===>>> {} ".format(self.start_date, self.start_date_2)) + self.start_date = self.start_date_2 + + ########################################################################## + ### Second Sync + ########################################################################## + + # create a new connection with the new start_date + conn_id_2 = connections.ensure_connection(self, original_properties=False) + + # run check mode + found_catalogs_2 = self.run_and_verify_check_mode(conn_id_2) + + # table and field selection + test_catalogs_2_all_fields = [catalog for catalog in found_catalogs_2 + if catalog.get('tap_stream_id') in expected_streams] + self.perform_and_verify_table_and_field_selection(conn_id_2, test_catalogs_2_all_fields, select_all_fields=True) + + # run sync + record_count_by_stream_2 = self.run_and_verify_sync(conn_id_2) + synced_records_2 = runner.get_records_from_target_output() + + for stream in expected_streams: + expected_replication_method = expected_replication_methods[stream] + + with self.subTest(stream=stream): + # expected values + expected_primary_keys = self.expected_primary_keys()[stream] + + # collect information for assertions from syncs 1 & 2 base on expected values + record_count_sync_1 = record_count_by_stream_1.get(stream, 0) + record_count_sync_2 = record_count_by_stream_2.get(stream, 0) + primary_keys_list_1 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + for message in synced_records_1.get(stream, {}).get('messages', {}) + if message.get('action') == 'upsert'] + primary_keys_list_2 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + for message in synced_records_2.get(stream, {}).get('messages', {}) + if message.get('action') == 'upsert'] + primary_keys_sync_1 = set(primary_keys_list_1) + primary_keys_sync_2 = set(primary_keys_list_2) + + if expected_replication_method == self.INCREMENTAL: + # Verify that the 1st sync with an earlier start date replicates + # a greater number of records as the 2nd sync. + self.assertGreater(record_count_sync_1, record_count_sync_2, + msg="The 1st sync does not contain a greater number of records than the 2nd sync") + + # Verify by primary key the records replicated in the 2nd sync are part of the 1st sync + self.assertTrue(primary_keys_sync_2.issubset(primary_keys_sync_1), + msg="Records in the 2nd sync are not a subset of the 1st sync") + elif expected_replication_method == self.FULL_TABLE: + # Verify that the 1st sync with an earlier start date replicates + # an equal number of records as the 2nd sync. + self.assertEqual(record_count_sync_1, record_count_sync_2, + msg="The 1st sync does not contain an equal number of records as in the 2nd sync") + + # Verify by primary key the same records are replicated in the 1st and 2nd syncs + self.assertSetEqual(primary_keys_sync_1, primary_keys_sync_2) + else: + raise NotImplementedError( + "INVALID EXPECTATIONS\t\tSTREAM: {} REPLICATION_METHOD: {}".format(stream, expected_replication_method) + ) diff --git a/tests/test_recharge_sync_canary.py b/tests/test_recharge_sync_canary.py new file mode 100644 index 0000000..30ede77 --- /dev/null +++ b/tests/test_recharge_sync_canary.py @@ -0,0 +1,43 @@ +""" +Test that with no fields selected for a stream automatic fields are still replicated +""" +from tap_tester import connections + +from base import RechargeBaseTest + +class SyncCanaryTest(RechargeBaseTest): + """ + Smoke test + """ + + @staticmethod + def name(): + return "tap_tester_recharge_sync_canary_test" + + def test_run(self): + """ + Run tap in check mode, then select all streams and all fields within streams. Run a sync and + verify exit codes do not throw errors. This is meant to be a smoke test for the tap. If this + is failing do not expect any other tests to pass. + """ + expected_streams = self.expected_streams() + + conn_id = connections.ensure_connection(self) + found_catalogs = self.run_and_verify_check_mode(conn_id) + + test_catalogs = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in expected_streams] + + self.perform_and_verify_table_and_field_selection( + conn_id, + test_catalogs, + select_all_fields=True) + + record_count_by_stream = self.run_and_verify_sync(conn_id) + + # Assert all expected streams synced at least one record + for stream in self.expected_streams(): + with self.subTest(stream=stream): + self.assertGreater(record_count_by_stream.get(stream, 0), + 0, + msg="{} did not sync any records".format(stream))