diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 0000000..7572fb8 --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,45 @@ +version: 2.1 +orbs: + slack: circleci/slack@3.4.2 + +jobs: + build: + docker: + - image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester + steps: + - checkout + - run: + name: 'Setup virtual env' + command: | + python3 -mvenv /usr/local/share/virtualenvs/tap-trustpilot + source /usr/local/share/virtualenvs/tap-trustpilot/bin/activate + pip install -U pip setuptools + pip install .[dev] + - run: + name: 'JSON Validator' + command: | + source /usr/local/share/virtualenvs/tap-tester/bin/activate + stitch-validate-json tap_trustpilot/schemas/*.json + - run: + name: 'pylint' + command: | + source /usr/local/share/virtualenvs/tap-trustpilot/bin/activate + pylint tap_trustpilot --disable C,W,R + +workflows: + version: 2 + commit: + jobs: + - build: + context: circleci-user + build_daily: + triggers: + - schedule: + cron: "0 13 * * *" + filters: + branches: + only: + - master + jobs: + - build: + context: circleci-user \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..e69de29 diff --git a/setup.py b/setup.py index f1e9bf2..b09e17a 100755 --- a/setup.py +++ b/setup.py @@ -10,9 +10,17 @@ classifiers=["Programming Language :: Python :: 3 :: Only"], py_modules=["tap_trustpilot"], install_requires=[ - "singer-python>=5.0.12", + "singer-python", "requests", ], + # requires following addition packages for code check quality + extras_require={ + 'dev': [ + 'pylint', + 'ipdb', + 'nose' + ] + }, entry_points=""" [console_scripts] tap-trustpilot=tap_trustpilot:main diff --git a/tap_trustpilot/__init__.py b/tap_trustpilot/__init__.py index cb8e6eb..ce23d7b 100644 --- a/tap_trustpilot/__init__.py +++ b/tap_trustpilot/__init__.py @@ -1,12 +1,11 @@ #!/usr/bin/env python3 -import os -import json import singer from singer import utils -from singer.catalog import Catalog, CatalogEntry, Schema -from . import streams as streams_ -from .context import Context -from . import schemas +from singer.catalog import Catalog +from tap_trustpilot import streams as streams_ +from tap_trustpilot.context import Context +from tap_trustpilot import schemas +from tap_trustpilot.discover import discover REQUIRED_CONFIG_KEYS = [ "access_key", @@ -18,26 +17,6 @@ LOGGER = singer.get_logger() - -def check_credentials_are_authorized(ctx): - ctx.client.auth(ctx.config) - - -def discover(ctx): - check_credentials_are_authorized(ctx) - catalog = Catalog([]) - for tap_stream_id in schemas.stream_ids: - schema = Schema.from_dict(schemas.load_schema(tap_stream_id), - inclusion="automatic") - catalog.streams.append(CatalogEntry( - stream=tap_stream_id, - tap_stream_id=tap_stream_id, - key_properties=schemas.PK_FIELDS[tap_stream_id], - schema=schema, - )) - return catalog - - def output_schema(stream): schema = schemas.load_schema(stream.tap_stream_id) pk_fields = schemas.PK_FIELDS[stream.tap_stream_id] @@ -71,8 +50,8 @@ def main(): discover(ctx).dump() print() else: - ctx.catalog = Catalog.from_dict(args.properties) \ - if args.properties else discover(ctx) + ctx.catalog = Catalog.from_dict(args.catalog.to_dict()) \ + if args.catalog else discover(ctx) sync(ctx) if __name__ == "__main__": diff --git a/tap_trustpilot/http.py b/tap_trustpilot/client.py similarity index 72% rename from tap_trustpilot/http.py rename to tap_trustpilot/client.py index 3683c7d..1c60d85 100644 --- a/tap_trustpilot/http.py +++ b/tap_trustpilot/client.py @@ -1,8 +1,11 @@ import requests from singer import metrics +import singer import backoff import base64 +LOGGER = singer.get_logger() + BASE_URL = "https://api.trustpilot.com/v1" AUTH_URL = "{}/oauth/oauth-business-users-for-applications/accesstoken".format(BASE_URL) @@ -59,6 +62,7 @@ def prepare_and_send(self, request): request.headers['Authorization'] = 'Bearer {}'.format(self._token) request.headers['apikey'] = self.access_key + request.headers['Content-Type'] = 'application/json' return self.session.send(request.prepare()) @@ -69,19 +73,29 @@ def url(self, path): def create_get_request(self, path, **kwargs): return requests.Request(method="GET", url=self.url(path), **kwargs) - @backoff.on_exception(backoff.expo, - RateLimitException, - max_tries=10, - factor=2) + def create_post_request(self, path, payload, **kwargs): + return requests.Request(method="POST", url=self.url(path), data=payload, **kwargs) + + @backoff.on_exception(backoff.expo, RateLimitException, max_tries=10, factor=2) + @backoff.on_exception(backoff.expo, requests.Timeout, max_tries=10, factor=2) def request_with_handling(self, request, tap_stream_id): with metrics.http_request_timer(tap_stream_id) as timer: response = self.prepare_and_send(request) timer.tags[metrics.Tag.http_status_code] = response.status_code if response.status_code in [429, 503]: raise RateLimitException() + # below exception should handle Pagination limit exceeded error if page value is more than 1000 + # depends on access level of access_token being used in config.json file + if response.status_code == 400 and response.json().get('details') == "Pagination limit exceeded.": + LOGGER.warning("400 Bad Request, Pagination limit exceeded.") + return [] response.raise_for_status() return response.json() def GET(self, request_kwargs, *args, **kwargs): req = self.create_get_request(**request_kwargs) return self.request_with_handling(req, *args, **kwargs) + + def POST(self, request_kwargs, *args, **kwargs): + req = self.create_post_request(**request_kwargs) + return self.request_with_handling(req, *args, **kwargs) diff --git a/tap_trustpilot/context.py b/tap_trustpilot/context.py index 472abbf..9d14bca 100644 --- a/tap_trustpilot/context.py +++ b/tap_trustpilot/context.py @@ -1,8 +1,8 @@ from datetime import datetime, date -import pendulum +# import pendulum import singer from singer import bookmarks as bks_ -from .http import Client +from .client import Client class Context(object): @@ -55,12 +55,12 @@ def set_offset(self, path, val): def clear_offsets(self, tap_stream_id): bks_.clear_offset(self.state, tap_stream_id) - def update_start_date_bookmark(self, path): - val = self.get_bookmark(path) - if not val: - val = self.config["start_date"] - self.set_bookmark(path, val) - return pendulum.parse(val) + # def update_start_date_bookmark(self, path): + # val = self.get_bookmark(path) + # if not val: + # val = self.config["start_date"] + # self.set_bookmark(path, val) + # return pendulum.parse(val) def write_state(self): singer.write_state(self.state) diff --git a/tap_trustpilot/discover.py b/tap_trustpilot/discover.py new file mode 100644 index 0000000..6159094 --- /dev/null +++ b/tap_trustpilot/discover.py @@ -0,0 +1,23 @@ +from singer.catalog import Catalog, CatalogEntry, Schema +from tap_trustpilot import schemas + +def check_credentials_are_authorized(ctx): + ctx.client.auth(ctx.config) + +def discover(ctx): + check_credentials_are_authorized(ctx) + discover_schemas, field_metadata = schemas.get_schemas() + streams = [] + for stream_name, raw_schema in discover_schemas.items(): + schema = Schema.from_dict(raw_schema) + mdata = field_metadata[stream_name] + streams.append( + CatalogEntry( + tap_stream_id=stream_name, + stream=stream_name, + schema=schema, + key_properties=schemas.PK_FIELDS[stream_name], + metadata=mdata + ) + ) + return Catalog(streams) \ No newline at end of file diff --git a/tap_trustpilot/schemas.py b/tap_trustpilot/schemas.py index aa8c95b..ca9818f 100644 --- a/tap_trustpilot/schemas.py +++ b/tap_trustpilot/schemas.py @@ -1,22 +1,9 @@ #!/usr/bin/env python3 import os +import json import singer -from singer import utils - - -class IDS(object): - BUSINESS_UNITS = "business_units" - REVIEWS = "reviews" - CONSUMERS = "consumers" - -stream_ids = [getattr(IDS, x) for x in dir(IDS) - if not x.startswith("__")] - -PK_FIELDS = { - IDS.BUSINESS_UNITS: ["id"], - IDS.REVIEWS: ["business_unit_id", "id"], - IDS.CONSUMERS: ["id"], -} +from singer import utils,metadata +from tap_trustpilot.streams import STREAMS, PK_FIELDS, IDS def get_abs_path(path): @@ -31,3 +18,25 @@ def load_schema(tap_stream_id): def load_and_write_schema(tap_stream_id): schema = load_schema(tap_stream_id) singer.write_schema(tap_stream_id, schema, PK_FIELDS[tap_stream_id]) + +def get_schemas(): + """ Load schemas from schemas folder """ + schemas = {} + field_metadata = {} + + for stream_name, stream_metadata in STREAMS.items(): + path = get_abs_path(f'schemas/{stream_name}.json') + with open(path, encoding='utf-8') as file: + schema = json.load(file) + schemas[stream_name] = schema + + mdata = metadata.get_standard_metadata( + schema=schema, + key_properties=stream_metadata.key_properties, + replication_method=stream_metadata.replication_method, + valid_replication_keys=stream_metadata.replication_keys + ) + field_metadata[stream_name] = mdata + field_metadata["stream"] = STREAMS + + return schemas, field_metadata diff --git a/tap_trustpilot/streams.py b/tap_trustpilot/streams.py index ab5a7f0..22f686a 100644 --- a/tap_trustpilot/streams.py +++ b/tap_trustpilot/streams.py @@ -1,10 +1,26 @@ +import json + import singer -from .schemas import IDS -from . import transform +# from tap_trustpilot.schemas import IDS +from tap_trustpilot import transform LOGGER = singer.get_logger() PAGE_SIZE = 100 +CONSUMER_CHUNK_SIZE = 1000 + +class IDS(object): + BUSINESS_UNITS = "business_units" + REVIEWS = "reviews" + CONSUMERS = "consumers" +stream_ids = [getattr(IDS, x) for x in dir(IDS) + if not x.startswith("__")] + +PK_FIELDS = { + IDS.BUSINESS_UNITS: ["id"], + IDS.REVIEWS: ["business_unit_id", "id"], + IDS.CONSUMERS: ["id"], +} class Stream(object): def __init__(self, tap_stream_id, path, @@ -43,6 +59,12 @@ def transform(self, ctx, records): class BusinessUnits(Stream): + + # tap_stream_id = "business_units" + key_properties = ['id'] + replication_keys = None + replication_method = "FULL_TABLE" + params = {} def raw_fetch(self, ctx): return ctx.client.GET({"path": self.path}, self.tap_stream_id) @@ -63,9 +85,9 @@ def sync(self, ctx): self.write_records([ctx.cache["business_unit"]]) - class Paginated(Stream): - def get_params(self, page): + @staticmethod + def get_params(page): return { "page": page, "perPage": PAGE_SIZE, @@ -96,7 +118,14 @@ def _sync(self, ctx): class Reviews(Paginated): - def add_consumers_to_cache(self, ctx, batch): + + key_properties = ["business_unit_id", "id"] + replication_keys = None + replication_method = "FULL_TABLE" + params = {} + + @staticmethod + def add_consumers_to_cache(ctx, batch): for record in batch: consumer_id = record.get('consumer', {}).get('id') if consumer_id is not None: @@ -109,17 +138,29 @@ def sync(self, ctx): class Consumers(Stream): + + key_properties = ['id'] + replication_keys = None + replication_method = "FULL_TABLE" + params = {} + def sync(self, ctx): business_unit_id = ctx.cache['business_unit']['id'] - total = len(ctx.cache['consumer_ids']) - for i, consumer_id in enumerate(ctx.cache['consumer_ids']): - LOGGER.info("Fetching consumer {} of {} ({})".format(i + 1, total, consumer_id)) - path = self.path.format(consumerId=consumer_id) - resp = ctx.client.GET({"path": path}, self.tap_stream_id) + total = len(ctx.cache.get('consumer_ids',[])) - raw_records = self.format_response([resp]) + # chunk list of consumer IDs to smaller lists of size 1000 + consumer_ids = list(ctx.cache.get('consumer_ids',[])) + chunked_consumer_ids = [consumer_ids[i: i+CONSUMER_CHUNK_SIZE] for i in range(0, len(consumer_ids), + CONSUMER_CHUNK_SIZE)] + + for i, consumer_id_list in enumerate(chunked_consumer_ids): + LOGGER.info("Fetching consumer page {} of {}".format(i + 1, len(chunked_consumer_ids))) + resp = ctx.client.POST({"path": self.path, "payload": json.dumps({"consumerIds": consumer_id_list})}, + self.tap_stream_id) + raw_records = self.format_response([resp]) + raw_records = list(raw_records[0].get('consumers', {}).values()) for raw_record in raw_records: raw_record['business_unit_id'] = business_unit_id @@ -131,6 +172,14 @@ def sync(self, ctx): all_streams = [ business_units, Reviews(IDS.REVIEWS, '/business-units/:business_unit_id/reviews', collection_key='reviews'), - Consumers(IDS.CONSUMERS, '/consumers/{consumerId}/profile') + Consumers(IDS.CONSUMERS, '/consumers/profile/bulk') ] all_stream_ids = [s.tap_stream_id for s in all_streams] + +STREAMS = { + 'business_units': BusinessUnits(IDS.BUSINESS_UNITS, "/business-units/:business_unit_id/profileinfo"), + 'reviews': Reviews(IDS.REVIEWS, '/business-units/:business_unit_id/reviews', collection_key='reviews'), + 'consumers': Consumers(IDS.CONSUMERS, '/consumers/{consumerId}/profile') +} + +