Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tap trustpilot intial changes - Discover mode, Sync mode, bulk request. #1

Merged
merged 14 commits into from
Jun 8, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
version: 2.1
orbs:
slack: circleci/slack@3.4.2

jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester
steps:
- checkout
- run:
name: 'Setup virtual env'
command: |
python3 -mvenv /usr/local/share/virtualenvs/tap-trustpilot
source /usr/local/share/virtualenvs/tap-trustpilot>/bin/activate
pip install -U pip setuptools
pip install .[dev]
- run:
name: 'JSON Validator'
command: |
source /usr/local/share/virtualenvs/tap-tester/bin/activate
stitch-validate-json tap_trustpilot/schemas/*.json
- run:
name: 'pylint'
command: |
source /usr/local/share/virtualenvs/tap-trustpilot/bin/activate
pylint tap_trustpilot --disable C,W,R
- run:
name: 'Unit Tests'
# command: |
# source /usr/local/share/virtualenvs/tap-trustpilot/bin/activate
# pip install coverage
# nosetests --with-coverage --cover-erase --cover-package=tap_trustpilot --cover-html-dir=htmlcov tests/unittests
# coverage html
# - store_test_results:
# path: test_output/report.xml
# - store_artifacts:
# path: htmlcov
- run:
name: 'Integration Tests'
# command: |
# aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh
# source dev_env.sh
# source /usr/local/share/virtualenvs/tap-tester/bin/activate
# pip install .[dev,test] # Install tap for the test client subclass usage
# run-test --tap=tap-trustpilot tests

workflows:
version: 2
commit:
jobs:
- build:
context: circleci-user
build_daily:
triggers:
- schedule:
cron: "0 13 * * *"
filters:
branches:
only:
- master
Empty file added CHANGELOG.md
Empty file.
10 changes: 9 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,17 @@
classifiers=["Programming Language :: Python :: 3 :: Only"],
py_modules=["tap_trustpilot"],
install_requires=[
"singer-python>=5.0.12",
"singer-python",
"requests",
],
# requires following addition packages for code check quality
extras_require={
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments to the code

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

'dev': [
'pylint',
'ipdb',
'nose'
]
},
entry_points="""
[console_scripts]
tap-trustpilot=tap_trustpilot:main
Expand Down
35 changes: 7 additions & 28 deletions tap_trustpilot/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
#!/usr/bin/env python3
import os
import json
import singer
from singer import utils
from singer.catalog import Catalog, CatalogEntry, Schema
from . import streams as streams_
from .context import Context
from . import schemas
from singer.catalog import Catalog
from tap_trustpilot import streams as streams_
from tap_trustpilot.context import Context
from tap_trustpilot import schemas
from tap_trustpilot.discover import discover

REQUIRED_CONFIG_KEYS = [
"access_key",
Expand All @@ -18,26 +17,6 @@

LOGGER = singer.get_logger()


def check_credentials_are_authorized(ctx):
ctx.client.auth(ctx.config)


def discover(ctx):
check_credentials_are_authorized(ctx)
catalog = Catalog([])
for tap_stream_id in schemas.stream_ids:
schema = Schema.from_dict(schemas.load_schema(tap_stream_id),
inclusion="automatic")
catalog.streams.append(CatalogEntry(
stream=tap_stream_id,
tap_stream_id=tap_stream_id,
key_properties=schemas.PK_FIELDS[tap_stream_id],
schema=schema,
))
return catalog


def output_schema(stream):
schema = schemas.load_schema(stream.tap_stream_id)
pk_fields = schemas.PK_FIELDS[stream.tap_stream_id]
Expand Down Expand Up @@ -71,8 +50,8 @@ def main():
discover(ctx).dump()
print()
else:
ctx.catalog = Catalog.from_dict(args.properties) \
if args.properties else discover(ctx)
ctx.catalog = Catalog.from_dict(args.catalog.to_dict()) \
if args.catalog else discover(ctx)
sync(ctx)

if __name__ == "__main__":
Expand Down
22 changes: 18 additions & 4 deletions tap_trustpilot/http.py → tap_trustpilot/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import requests
from singer import metrics
import singer
import backoff
import base64

LOGGER = singer.get_logger()

BASE_URL = "https://api.trustpilot.com/v1"
AUTH_URL = "{}/oauth/oauth-business-users-for-applications/accesstoken".format(BASE_URL)

Expand Down Expand Up @@ -59,6 +62,7 @@ def prepare_and_send(self, request):

request.headers['Authorization'] = 'Bearer {}'.format(self._token)
request.headers['apikey'] = self.access_key
request.headers['Content-Type'] = 'application/json'

return self.session.send(request.prepare())

Expand All @@ -69,19 +73,29 @@ def url(self, path):
def create_get_request(self, path, **kwargs):
return requests.Request(method="GET", url=self.url(path), **kwargs)

@backoff.on_exception(backoff.expo,
RateLimitException,
max_tries=10,
factor=2)
def create_post_request(self, path, payload, **kwargs):
return requests.Request(method="POST", url=self.url(path), data=payload, **kwargs)

@backoff.on_exception(backoff.expo, RateLimitException, max_tries=10, factor=2)
@backoff.on_exception(backoff.expo, requests.Timeout, max_tries=10, factor=2)
def request_with_handling(self, request, tap_stream_id):
with metrics.http_request_timer(tap_stream_id) as timer:
response = self.prepare_and_send(request)
timer.tags[metrics.Tag.http_status_code] = response.status_code
if response.status_code in [429, 503]:
raise RateLimitException()
# below exception should handle Pagination limit exceeded error if page value is more than 1000
# depends on access level of access_token being used in config.json file
if response.status_code == 400 and response.json().get('details') == "Pagination limit exceeded.":
LOGGER.warning("400 Bad Request, Pagination limit exceeded.")
return []
response.raise_for_status()
return response.json()

def GET(self, request_kwargs, *args, **kwargs):
req = self.create_get_request(**request_kwargs)
return self.request_with_handling(req, *args, **kwargs)

def POST(self, request_kwargs, *args, **kwargs):
req = self.create_post_request(**request_kwargs)
return self.request_with_handling(req, *args, **kwargs)
16 changes: 8 additions & 8 deletions tap_trustpilot/context.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from datetime import datetime, date
import pendulum
# import pendulum
import singer
from singer import bookmarks as bks_
from .http import Client
from .client import Client


class Context(object):
Expand Down Expand Up @@ -55,12 +55,12 @@ def set_offset(self, path, val):
def clear_offsets(self, tap_stream_id):
bks_.clear_offset(self.state, tap_stream_id)

def update_start_date_bookmark(self, path):
val = self.get_bookmark(path)
if not val:
val = self.config["start_date"]
self.set_bookmark(path, val)
return pendulum.parse(val)
# def update_start_date_bookmark(self, path):
# val = self.get_bookmark(path)
# if not val:
# val = self.config["start_date"]
# self.set_bookmark(path, val)
# return pendulum.parse(val)

def write_state(self):
singer.write_state(self.state)
23 changes: 23 additions & 0 deletions tap_trustpilot/discover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from singer.catalog import Catalog, CatalogEntry, Schema
from tap_trustpilot import schemas

def check_credentials_are_authorized(ctx):
ctx.client.auth(ctx.config)

def discover(ctx):
check_credentials_are_authorized(ctx)
discover_schemas, field_metadata = schemas.get_schemas()
streams = []
for stream_name, raw_schema in discover_schemas.items():
schema = Schema.from_dict(raw_schema)
mdata = field_metadata[stream_name]
streams.append(
CatalogEntry(
tap_stream_id=stream_name,
stream=stream_name,
schema=schema,
key_properties=schemas.PK_FIELDS[stream_name],
metadata=mdata
)
)
return Catalog(streams)
41 changes: 25 additions & 16 deletions tap_trustpilot/schemas.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,9 @@
#!/usr/bin/env python3
import os
import json
import singer
from singer import utils


class IDS(object):
BUSINESS_UNITS = "business_units"
REVIEWS = "reviews"
CONSUMERS = "consumers"

stream_ids = [getattr(IDS, x) for x in dir(IDS)
if not x.startswith("__")]

PK_FIELDS = {
IDS.BUSINESS_UNITS: ["id"],
IDS.REVIEWS: ["business_unit_id", "id"],
IDS.CONSUMERS: ["id"],
}
from singer import utils,metadata
from tap_trustpilot.streams import STREAMS, PK_FIELDS, IDS


def get_abs_path(path):
Expand All @@ -31,3 +18,25 @@ def load_schema(tap_stream_id):
def load_and_write_schema(tap_stream_id):
schema = load_schema(tap_stream_id)
singer.write_schema(tap_stream_id, schema, PK_FIELDS[tap_stream_id])

def get_schemas():
""" Load schemas from schemas folder """
schemas = {}
field_metadata = {}

for stream_name, stream_metadata in STREAMS.items():
path = get_abs_path(f'schemas/{stream_name}.json')
with open(path, encoding='utf-8') as file:
schema = json.load(file)
schemas[stream_name] = schema

mdata = metadata.get_standard_metadata(
schema=schema,
key_properties=stream_metadata.key_properties,
replication_method=stream_metadata.replication_method,
valid_replication_keys=stream_metadata.replication_keys
)
field_metadata[stream_name] = mdata
field_metadata["stream"] = STREAMS

return schemas, field_metadata
Loading