Skip to content

Commit

Permalink
Tap trustpilot intial changes - Discover mode, Sync mode, bulk reques…
Browse files Browse the repository at this point in the history
…t. (#1)

* inital changes: import statements, singer-python version sync change

* Initial refactor for the discover mode changes

* Moved discover logic code from __init__ file to discover.py file

* alpha:0.2.0 release changes

* added change logs

* format changelog.md

* Handled consumer sync logic and Removed unused code

* removed version number and change logs

* circleci yml syntax error

* circleci yml syntax error

* tap name error

Co-authored-by: Kethan Cherukuri <kcherukuri@LT-LR9JJWXD40.local>
Co-authored-by: kethan1122 <kcherukuri@talend.com>
Co-authored-by: kethan1122 <105211331+kethan1122@users.noreply.github.com>
  • Loading branch information
4 people authored Jun 8, 2022
1 parent cf3cb98 commit 6fa1dfd
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 69 deletions.
45 changes: 45 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
version: 2.1
orbs:
slack: circleci/slack@3.4.2

jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester
steps:
- checkout
- run:
name: 'Setup virtual env'
command: |
python3 -mvenv /usr/local/share/virtualenvs/tap-trustpilot
source /usr/local/share/virtualenvs/tap-trustpilot/bin/activate
pip install -U pip setuptools
pip install .[dev]
- run:
name: 'JSON Validator'
command: |
source /usr/local/share/virtualenvs/tap-tester/bin/activate
stitch-validate-json tap_trustpilot/schemas/*.json
- run:
name: 'pylint'
command: |
source /usr/local/share/virtualenvs/tap-trustpilot/bin/activate
pylint tap_trustpilot --disable C,W,R
workflows:
version: 2
commit:
jobs:
- build:
context: circleci-user
build_daily:
triggers:
- schedule:
cron: "0 13 * * *"
filters:
branches:
only:
- master
jobs:
- build:
context: circleci-user
Empty file added CHANGELOG.md
Empty file.
10 changes: 9 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,17 @@
classifiers=["Programming Language :: Python :: 3 :: Only"],
py_modules=["tap_trustpilot"],
install_requires=[
"singer-python>=5.0.12",
"singer-python",
"requests",
],
# requires following addition packages for code check quality
extras_require={
'dev': [
'pylint',
'ipdb',
'nose'
]
},
entry_points="""
[console_scripts]
tap-trustpilot=tap_trustpilot:main
Expand Down
35 changes: 7 additions & 28 deletions tap_trustpilot/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
#!/usr/bin/env python3
import os
import json
import singer
from singer import utils
from singer.catalog import Catalog, CatalogEntry, Schema
from . import streams as streams_
from .context import Context
from . import schemas
from singer.catalog import Catalog
from tap_trustpilot import streams as streams_
from tap_trustpilot.context import Context
from tap_trustpilot import schemas
from tap_trustpilot.discover import discover

REQUIRED_CONFIG_KEYS = [
"access_key",
Expand All @@ -18,26 +17,6 @@

LOGGER = singer.get_logger()


def check_credentials_are_authorized(ctx):
ctx.client.auth(ctx.config)


def discover(ctx):
check_credentials_are_authorized(ctx)
catalog = Catalog([])
for tap_stream_id in schemas.stream_ids:
schema = Schema.from_dict(schemas.load_schema(tap_stream_id),
inclusion="automatic")
catalog.streams.append(CatalogEntry(
stream=tap_stream_id,
tap_stream_id=tap_stream_id,
key_properties=schemas.PK_FIELDS[tap_stream_id],
schema=schema,
))
return catalog


def output_schema(stream):
schema = schemas.load_schema(stream.tap_stream_id)
pk_fields = schemas.PK_FIELDS[stream.tap_stream_id]
Expand Down Expand Up @@ -71,8 +50,8 @@ def main():
discover(ctx).dump()
print()
else:
ctx.catalog = Catalog.from_dict(args.properties) \
if args.properties else discover(ctx)
ctx.catalog = Catalog.from_dict(args.catalog.to_dict()) \
if args.catalog else discover(ctx)
sync(ctx)

if __name__ == "__main__":
Expand Down
22 changes: 18 additions & 4 deletions tap_trustpilot/http.py → tap_trustpilot/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import requests
from singer import metrics
import singer
import backoff
import base64

LOGGER = singer.get_logger()

BASE_URL = "https://api.trustpilot.com/v1"
AUTH_URL = "{}/oauth/oauth-business-users-for-applications/accesstoken".format(BASE_URL)

Expand Down Expand Up @@ -59,6 +62,7 @@ def prepare_and_send(self, request):

request.headers['Authorization'] = 'Bearer {}'.format(self._token)
request.headers['apikey'] = self.access_key
request.headers['Content-Type'] = 'application/json'

return self.session.send(request.prepare())

Expand All @@ -69,19 +73,29 @@ def url(self, path):
def create_get_request(self, path, **kwargs):
return requests.Request(method="GET", url=self.url(path), **kwargs)

@backoff.on_exception(backoff.expo,
RateLimitException,
max_tries=10,
factor=2)
def create_post_request(self, path, payload, **kwargs):
return requests.Request(method="POST", url=self.url(path), data=payload, **kwargs)

@backoff.on_exception(backoff.expo, RateLimitException, max_tries=10, factor=2)
@backoff.on_exception(backoff.expo, requests.Timeout, max_tries=10, factor=2)
def request_with_handling(self, request, tap_stream_id):
with metrics.http_request_timer(tap_stream_id) as timer:
response = self.prepare_and_send(request)
timer.tags[metrics.Tag.http_status_code] = response.status_code
if response.status_code in [429, 503]:
raise RateLimitException()
# below exception should handle Pagination limit exceeded error if page value is more than 1000
# depends on access level of access_token being used in config.json file
if response.status_code == 400 and response.json().get('details') == "Pagination limit exceeded.":
LOGGER.warning("400 Bad Request, Pagination limit exceeded.")
return []
response.raise_for_status()
return response.json()

def GET(self, request_kwargs, *args, **kwargs):
req = self.create_get_request(**request_kwargs)
return self.request_with_handling(req, *args, **kwargs)

def POST(self, request_kwargs, *args, **kwargs):
req = self.create_post_request(**request_kwargs)
return self.request_with_handling(req, *args, **kwargs)
16 changes: 8 additions & 8 deletions tap_trustpilot/context.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from datetime import datetime, date
import pendulum
# import pendulum
import singer
from singer import bookmarks as bks_
from .http import Client
from .client import Client


class Context(object):
Expand Down Expand Up @@ -55,12 +55,12 @@ def set_offset(self, path, val):
def clear_offsets(self, tap_stream_id):
bks_.clear_offset(self.state, tap_stream_id)

def update_start_date_bookmark(self, path):
val = self.get_bookmark(path)
if not val:
val = self.config["start_date"]
self.set_bookmark(path, val)
return pendulum.parse(val)
# def update_start_date_bookmark(self, path):
# val = self.get_bookmark(path)
# if not val:
# val = self.config["start_date"]
# self.set_bookmark(path, val)
# return pendulum.parse(val)

def write_state(self):
singer.write_state(self.state)
23 changes: 23 additions & 0 deletions tap_trustpilot/discover.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from singer.catalog import Catalog, CatalogEntry, Schema
from tap_trustpilot import schemas

def check_credentials_are_authorized(ctx):
ctx.client.auth(ctx.config)

def discover(ctx):
check_credentials_are_authorized(ctx)
discover_schemas, field_metadata = schemas.get_schemas()
streams = []
for stream_name, raw_schema in discover_schemas.items():
schema = Schema.from_dict(raw_schema)
mdata = field_metadata[stream_name]
streams.append(
CatalogEntry(
tap_stream_id=stream_name,
stream=stream_name,
schema=schema,
key_properties=schemas.PK_FIELDS[stream_name],
metadata=mdata
)
)
return Catalog(streams)
41 changes: 25 additions & 16 deletions tap_trustpilot/schemas.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,9 @@
#!/usr/bin/env python3
import os
import json
import singer
from singer import utils


class IDS(object):
BUSINESS_UNITS = "business_units"
REVIEWS = "reviews"
CONSUMERS = "consumers"

stream_ids = [getattr(IDS, x) for x in dir(IDS)
if not x.startswith("__")]

PK_FIELDS = {
IDS.BUSINESS_UNITS: ["id"],
IDS.REVIEWS: ["business_unit_id", "id"],
IDS.CONSUMERS: ["id"],
}
from singer import utils,metadata
from tap_trustpilot.streams import STREAMS, PK_FIELDS, IDS


def get_abs_path(path):
Expand All @@ -31,3 +18,25 @@ def load_schema(tap_stream_id):
def load_and_write_schema(tap_stream_id):
schema = load_schema(tap_stream_id)
singer.write_schema(tap_stream_id, schema, PK_FIELDS[tap_stream_id])

def get_schemas():
""" Load schemas from schemas folder """
schemas = {}
field_metadata = {}

for stream_name, stream_metadata in STREAMS.items():
path = get_abs_path(f'schemas/{stream_name}.json')
with open(path, encoding='utf-8') as file:
schema = json.load(file)
schemas[stream_name] = schema

mdata = metadata.get_standard_metadata(
schema=schema,
key_properties=stream_metadata.key_properties,
replication_method=stream_metadata.replication_method,
valid_replication_keys=stream_metadata.replication_keys
)
field_metadata[stream_name] = mdata
field_metadata["stream"] = STREAMS

return schemas, field_metadata
Loading

0 comments on commit 6fa1dfd

Please sign in to comment.