From 38215c7e0a219f5c08258922b5909653d2d8e5ef Mon Sep 17 00:00:00 2001 From: Eric Boucher Date: Tue, 17 May 2022 22:29:13 -0400 Subject: [PATCH] Add pagination for GraphQl streams (#118) --- poetry.lock | 23 +++++++++++++- pyproject.toml | 1 + tap_github/authenticator.py | 4 +-- tap_github/client.py | 63 ++++++++++++++++++++++++++++++++++++- tap_github/tap.py | 10 ++++-- tap_github/user_streams.py | 12 +++++-- 6 files changed, 105 insertions(+), 8 deletions(-) diff --git a/poetry.lock b/poetry.lock index a1061a79..01113508 100644 --- a/poetry.lock +++ b/poetry.lock @@ -304,6 +304,17 @@ category = "dev" optional = false python-versions = "*" +[[package]] +name = "nested-lookup" +version = "0.2.23" +description = "Python functions for working with deeply nested documents (lists and dicts)" +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +six = "*" + [[package]] name = "packaging" version = "21.3" @@ -729,7 +740,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest- [metadata] lock-version = "1.1" python-versions = "<3.11,>=3.7.2" -content-hash = "29b3347c8bab03057e91c8ca2d715d4f8ba44e3eb792b48951ae86b90a133401" +content-hash = "c231d8de569fccd6a99881dee8b3cbaed86de21d2211a9e69e0df8154d95be3a" [metadata.files] appdirs = [ @@ -855,6 +866,8 @@ cryptography = [ {file = "cryptography-3.4.8-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:34dae04a0dce5730d8eb7894eab617d8a70d0c97da76b905de9efb7128ad7085"}, {file = "cryptography-3.4.8-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1eb7bb0df6f6f583dd8e054689def236255161ebbcf62b226454ab9ec663746b"}, {file = "cryptography-3.4.8-cp36-abi3-manylinux_2_24_x86_64.whl", hash = "sha256:9965c46c674ba8cc572bc09a03f4c649292ee73e1b683adb1ce81e82e9a6a0fb"}, + {file = "cryptography-3.4.8-cp36-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:3c4129fc3fdc0fa8e40861b5ac0c673315b3c902bbdc05fc176764815b43dd1d"}, + {file = "cryptography-3.4.8-cp36-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:695104a9223a7239d155d7627ad912953b540929ef97ae0c34c7b8bf30857e89"}, {file = "cryptography-3.4.8-cp36-abi3-win32.whl", hash = "sha256:21ca464b3a4b8d8e86ba0ee5045e103a1fcfac3b39319727bc0fc58c09c6aff7"}, {file = "cryptography-3.4.8-cp36-abi3-win_amd64.whl", hash = "sha256:3520667fda779eb788ea00080124875be18f2d8f0848ec00733c0ec3bb8219fc"}, {file = "cryptography-3.4.8-pp36-pypy36_pp73-manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:d2a6e5ef66503da51d2110edf6c403dc6b494cc0082f85db12f54e9c5d4c3ec5"}, @@ -887,6 +900,7 @@ greenlet = [ {file = "greenlet-1.1.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:97e5306482182170ade15c4b0d8386ded995a07d7cc2ca8f27958d34d6736497"}, {file = "greenlet-1.1.2-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e6a36bb9474218c7a5b27ae476035497a6990e21d04c279884eb10d9b290f1b1"}, {file = "greenlet-1.1.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:abb7a75ed8b968f3061327c433a0fbd17b729947b400747c334a9c29a9af6c58"}, + {file = "greenlet-1.1.2-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:b336501a05e13b616ef81ce329c0e09ac5ed8c732d9ba7e3e983fcc1a9e86965"}, {file = "greenlet-1.1.2-cp310-cp310-win_amd64.whl", hash = "sha256:14d4f3cd4e8b524ae9b8aa567858beed70c392fdec26dbdb0a8a418392e71708"}, {file = "greenlet-1.1.2-cp35-cp35m-macosx_10_14_x86_64.whl", hash = "sha256:17ff94e7a83aa8671a25bf5b59326ec26da379ace2ebc4411d690d80a7fbcf23"}, {file = "greenlet-1.1.2-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:9f3cba480d3deb69f6ee2c1825060177a22c7826431458c697df88e6aeb3caee"}, @@ -899,6 +913,7 @@ greenlet = [ {file = "greenlet-1.1.2-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f9d29ca8a77117315101425ec7ec2a47a22ccf59f5593378fc4077ac5b754fce"}, {file = "greenlet-1.1.2-cp36-cp36m-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:21915eb821a6b3d9d8eefdaf57d6c345b970ad722f856cd71739493ce003ad08"}, {file = "greenlet-1.1.2-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eff9d20417ff9dcb0d25e2defc2574d10b491bf2e693b4e491914738b7908168"}, + {file = "greenlet-1.1.2-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:b8c008de9d0daba7b6666aa5bbfdc23dcd78cafc33997c9b7741ff6353bafb7f"}, {file = "greenlet-1.1.2-cp36-cp36m-win32.whl", hash = "sha256:32ca72bbc673adbcfecb935bb3fb1b74e663d10a4b241aaa2f5a75fe1d1f90aa"}, {file = "greenlet-1.1.2-cp36-cp36m-win_amd64.whl", hash = "sha256:f0214eb2a23b85528310dad848ad2ac58e735612929c8072f6093f3585fd342d"}, {file = "greenlet-1.1.2-cp37-cp37m-macosx_10_14_x86_64.whl", hash = "sha256:b92e29e58bef6d9cfd340c72b04d74c4b4e9f70c9fa7c78b674d1fec18896dc4"}, @@ -907,6 +922,7 @@ greenlet = [ {file = "greenlet-1.1.2-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1e12bdc622676ce47ae9abbf455c189e442afdde8818d9da983085df6312e7a1"}, {file = "greenlet-1.1.2-cp37-cp37m-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:8c790abda465726cfb8bb08bd4ca9a5d0a7bd77c7ac1ca1b839ad823b948ea28"}, {file = "greenlet-1.1.2-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f276df9830dba7a333544bd41070e8175762a7ac20350786b322b714b0e654f5"}, + {file = "greenlet-1.1.2-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:8c5d5b35f789a030ebb95bff352f1d27a93d81069f2adb3182d99882e095cefe"}, {file = "greenlet-1.1.2-cp37-cp37m-win32.whl", hash = "sha256:64e6175c2e53195278d7388c454e0b30997573f3f4bd63697f88d855f7a6a1fc"}, {file = "greenlet-1.1.2-cp37-cp37m-win_amd64.whl", hash = "sha256:b11548073a2213d950c3f671aa88e6f83cda6e2fb97a8b6317b1b5b33d850e06"}, {file = "greenlet-1.1.2-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:9633b3034d3d901f0a46b7939f8c4d64427dfba6bbc5a36b1a67364cf148a1b0"}, @@ -915,6 +931,7 @@ greenlet = [ {file = "greenlet-1.1.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e859fcb4cbe93504ea18008d1df98dee4f7766db66c435e4882ab35cf70cac43"}, {file = "greenlet-1.1.2-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:00e44c8afdbe5467e4f7b5851be223be68adb4272f44696ee71fe46b7036a711"}, {file = "greenlet-1.1.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ec8c433b3ab0419100bd45b47c9c8551248a5aee30ca5e9d399a0b57ac04651b"}, + {file = "greenlet-1.1.2-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:2bde6792f313f4e918caabc46532aa64aa27a0db05d75b20edfc5c6f46479de2"}, {file = "greenlet-1.1.2-cp38-cp38-win32.whl", hash = "sha256:288c6a76705dc54fba69fbcb59904ae4ad768b4c768839b8ca5fdadec6dd8cfd"}, {file = "greenlet-1.1.2-cp38-cp38-win_amd64.whl", hash = "sha256:8d2f1fb53a421b410751887eb4ff21386d119ef9cde3797bf5e7ed49fb51a3b3"}, {file = "greenlet-1.1.2-cp39-cp39-macosx_10_14_x86_64.whl", hash = "sha256:166eac03e48784a6a6e0e5f041cfebb1ab400b394db188c48b3a84737f505b67"}, @@ -923,6 +940,7 @@ greenlet = [ {file = "greenlet-1.1.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b1692f7d6bc45e3200844be0dba153612103db241691088626a33ff1f24a0d88"}, {file = "greenlet-1.1.2-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:7227b47e73dedaa513cdebb98469705ef0d66eb5a1250144468e9c3097d6b59b"}, {file = "greenlet-1.1.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7ff61ff178250f9bb3cd89752df0f1dd0e27316a8bd1465351652b1b4a4cdfd3"}, + {file = "greenlet-1.1.2-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:0051c6f1f27cb756ffc0ffbac7d2cd48cb0362ac1736871399a739b2885134d3"}, {file = "greenlet-1.1.2-cp39-cp39-win32.whl", hash = "sha256:f70a9e237bb792c7cc7e44c531fd48f5897961701cdaa06cf22fc14965c496cf"}, {file = "greenlet-1.1.2-cp39-cp39-win_amd64.whl", hash = "sha256:013d61294b6cd8fe3242932c1c5e36e5d1db2c8afb58606c5a67efce62c1f5fd"}, {file = "greenlet-1.1.2.tar.gz", hash = "sha256:e30f5ea4ae2346e62cedde8794a56858a67b878dd79f7df76a0767e356b1744a"}, @@ -993,6 +1011,9 @@ mypy-extensions = [ {file = "mypy_extensions-0.4.3-py2.py3-none-any.whl", hash = "sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d"}, {file = "mypy_extensions-0.4.3.tar.gz", hash = "sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8"}, ] +nested-lookup = [ + {file = "nested-lookup-0.2.23.tar.gz", hash = "sha256:7b8900c8e706e4f8c3309ebde229b109f487b15b9a8cae27b99026ad11e4b64c"}, +] packaging = [ {file = "packaging-21.3-py3-none-any.whl", hash = "sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522"}, {file = "packaging-21.3.tar.gz", hash = "sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb"}, diff --git a/pyproject.toml b/pyproject.toml index 1d0d97fd..424d8a84 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ singer-sdk = "^0.4.7" # singer-sdk = {git = "https://gitlab.com/meltano/singer-sdk.git", rev = "97-hierarchical-streams"} types-simplejson = "^3.17.2" types-python-dateutil = "^2.8.6" +nested-lookup = "^0.2.23" [tool.poetry.dev-dependencies] pytest = "^6.1.2" diff --git a/tap_github/authenticator.py b/tap_github/authenticator.py index 0e8149df..65c06373 100644 --- a/tap_github/authenticator.py +++ b/tap_github/authenticator.py @@ -156,8 +156,6 @@ def prepare_tokens(self) -> Dict[str, TokenRateLimit]: ) available_tokens = available_tokens + [app_token] - self.logger.info(f"Tap will run with {len(available_tokens)} auth tokens") - # Get rate_limit_buffer rate_limit_buffer = self._config.get("rate_limit_buffer", None) @@ -181,6 +179,8 @@ def prepare_tokens(self) -> Dict[str, TokenRateLimit]: ) self.logger.warning(msg) + self.logger.info(f"Tap will run with {len(filtered_tokens)} auth tokens") + # Create a dict of TokenRateLimit # TODO - separate app_token and add logic to refresh the token # using generate_app_access_token. diff --git a/tap_github/client.py b/tap_github/client.py index 222f4ba3..0ae8b878 100644 --- a/tap_github/client.py +++ b/tap_github/client.py @@ -2,12 +2,15 @@ from typing import Any, Dict, Iterable, List, Optional, cast +import collections +import re import requests -import simplejson from dateutil.parser import parse from urllib.parse import parse_qs, urlparse +from nested_lookup import nested_lookup + from singer_sdk.exceptions import FatalAPIError, RetriableAPIError from singer_sdk.helpers.jsonpath import extract_jsonpath from singer_sdk.streams import GraphQLStream, RESTStream @@ -266,10 +269,68 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: resp_json = response.json() yield from extract_jsonpath(self.query_jsonpath, input=resp_json) + def get_next_page_token( + self, response: requests.Response, previous_token: Optional[Any] + ) -> Optional[Any]: + """ + Return a dict of cursors for identifying next page or None if no more pages. + + Note - pagination requires the Graphql query to have nextPageCursor_X parameters + with the assosciated hasNextPage_X, startCursor_X and endCursor_X. + + X should be an integer between 0 and 9, increasing with query depth. + + Warning - we recommend to avoid using deep (nested) pagination. + """ + + resp_json = response.json() + + # Find if results contains "hasNextPage_X" flags and if any are True. + # If so, set nextPageCursor_X to endCursor_X for X max. + + next_page_results = nested_lookup( + key="hasNextPage_", + document=resp_json, + wild=True, + with_keys=True, + ) + + has_next_page_indices: List[int] = [] + # Iterate over all the items and filter items with hasNextPage = True. + for (key, value) in next_page_results.items(): + # Check if key is even then add pair to new dictionary + if any(value): + pagination_index = int(str(key).split("_")[1]) + has_next_page_indices.append(pagination_index) + + # Check if any "hasNextPage" is True. Otherwise, exit early. + if not len(has_next_page_indices) > 0: + return None + + # Get deepest pagination item + max_pagination_index = max(has_next_page_indices) + + # We leverage previous_token to remember the pagination cursors for other indices. + next_page_cursors: Dict[str, str] = dict() + next_page_cursors.update(previous_token or {}) + + # Get the pagination cursor to update and increment it. + next_page_end_cursor_results = nested_lookup( + key=f"endCursor_{max_pagination_index}", + document=resp_json, + ) + + next_page_key = f"nextPageCursor_{pagination_index}" + next_page_cursors[next_page_key] = next_page_end_cursor_results[0] + + return next_page_cursors + def get_url_params( self, context: Optional[Dict], next_page_token: Optional[Any] ) -> Dict[str, Any]: """Return a dictionary of values to be used in URL parameterization.""" params = context or dict() params["per_page"] = self.MAX_PER_PAGE + if next_page_token: + params.update(next_page_token) return params diff --git a/tap_github/tap.py b/tap_github/tap.py index ed8ab64a..bfb6d191 100644 --- a/tap_github/tap.py +++ b/tap_github/tap.py @@ -61,14 +61,20 @@ class TapGitHub(Tap): def discover_streams(self) -> List[Stream]: """Return a list of discovered streams for each query.""" - if len(Streams.all_valid_queries().intersection(self.config)) != 1: + # If the config is empty, assume we are running --help or --capabilities. + if ( + self.config + and len(Streams.all_valid_queries().intersection(self.config)) != 1 + ): raise ValueError( "This tap requires one and only one of the following path options: " f"{Streams.all_valid_queries()}." ) streams = [] for stream_type in Streams: - if len(stream_type.valid_queries.intersection(self.config)) > 0: + if (not self.config) or len( + stream_type.valid_queries.intersection(self.config) + ) > 0: streams += [ StreamClass(tap=self) for StreamClass in stream_type.streams ] diff --git a/tap_github/user_streams.py b/tap_github/user_streams.py index 3b1af078..971bca19 100644 --- a/tap_github/user_streams.py +++ b/tap_github/user_streams.py @@ -184,6 +184,7 @@ class StarredStream(GitHubRestStream): # "repo_id" is the starred repo's id. primary_keys = ["repo_id", "username"] parent_stream_type = UserStream + # TODO - change partitioning key to user_id? state_partitioning_keys = ["username"] replication_key = "starred_at" ignore_parent_replication_key = True @@ -262,6 +263,8 @@ class UserContributedToStream(GitHubGraphqlStream): primary_keys = ["username", "name_with_owner"] replication_key = None parent_stream_type = UserStream + # TODO - add user_id to schema + # TODO - change partitioning key to user_id? state_partitioning_keys = ["username"] ignore_parent_replication_key = True @@ -270,9 +273,14 @@ def query(self) -> str: """Return dynamic GraphQL query.""" # Graphql id is equivalent to REST node_id. To keep the tap consistent, we rename "id" to "node_id". return """ - query userContributedTo($username: String!) { + query userContributedTo($username: String! $nextPageCursor_0: String) { user (login: $username) { - repositoriesContributedTo (first: 100 includeUserRepositories: true orderBy: {field: STARGAZERS, direction: DESC}) { + repositoriesContributedTo (first: 100 after: $nextPageCursor_0 includeUserRepositories: true orderBy: {field: STARGAZERS, direction: DESC}) { + pageInfo { + hasNextPage_0: hasNextPage + startCursor_0: startCursor + endCursor_0: endCursor + } nodes { node_id: id name_with_owner: nameWithOwner