Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pagination for GraphQl streams #118

Merged
merged 8 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ singer-sdk = "^0.4.7"
# singer-sdk = {git = "https://gitlab.com/meltano/singer-sdk.git", rev = "97-hierarchical-streams"}
types-simplejson = "^3.17.2"
types-python-dateutil = "^2.8.6"
nested-lookup = "^0.2.23"

[tool.poetry.dev-dependencies]
pytest = "^6.1.2"
Expand Down
4 changes: 2 additions & 2 deletions tap_github/authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ def prepare_tokens(self) -> Dict[str, TokenRateLimit]:
)
available_tokens = available_tokens + [app_token]

self.logger.info(f"Tap will run with {len(available_tokens)} auth tokens")

# Get rate_limit_buffer
rate_limit_buffer = self._config.get("rate_limit_buffer", None)

Expand All @@ -181,6 +179,8 @@ def prepare_tokens(self) -> Dict[str, TokenRateLimit]:
)
self.logger.warning(msg)

self.logger.info(f"Tap will run with {len(filtered_tokens)} auth tokens")

# Create a dict of TokenRateLimit
# TODO - separate app_token and add logic to refresh the token
# using generate_app_access_token.
Expand Down
63 changes: 62 additions & 1 deletion tap_github/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

from typing import Any, Dict, Iterable, List, Optional, cast

import collections
import re
import requests
import simplejson

from dateutil.parser import parse
from urllib.parse import parse_qs, urlparse

from nested_lookup import nested_lookup

from singer_sdk.exceptions import FatalAPIError, RetriableAPIError
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.streams import GraphQLStream, RESTStream
Expand Down Expand Up @@ -266,10 +269,68 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]:
resp_json = response.json()
yield from extract_jsonpath(self.query_jsonpath, input=resp_json)

def get_next_page_token(
self, response: requests.Response, previous_token: Optional[Any]
) -> Optional[Any]:
"""
Return a dict of cursors for identifying next page or None if no more pages.

Note - pagination requires the Graphql query to have nextPageCursor_X parameters
with the assosciated hasNextPage_X, startCursor_X and endCursor_X.

X should be an integer between 0 and 9, increasing with query depth.

Warning - we recommend to avoid using deep (nested) pagination.
"""

resp_json = response.json()

# Find if results contains "hasNextPage_X" flags and if any are True.
# If so, set nextPageCursor_X to endCursor_X for X max.

next_page_results = nested_lookup(
key="hasNextPage_",
document=resp_json,
wild=True,
with_keys=True,
)

has_next_page_indices: List[int] = []
# Iterate over all the items and filter items with hasNextPage = True.
for (key, value) in next_page_results.items():
# Check if key is even then add pair to new dictionary
if any(value):
pagination_index = int(str(key).split("_")[1])
has_next_page_indices.append(pagination_index)

# Check if any "hasNextPage" is True. Otherwise, exit early.
if not len(has_next_page_indices) > 0:
return None

# Get deepest pagination item
max_pagination_index = max(has_next_page_indices)

# We leverage previous_token to remember the pagination cursors for other indices.
next_page_cursors: Dict[str, str] = dict()
next_page_cursors.update(previous_token or {})

# Get the pagination cursor to update and increment it.
next_page_end_cursor_results = nested_lookup(
key=f"endCursor_{max_pagination_index}",
document=resp_json,
)

next_page_key = f"nextPageCursor_{pagination_index}"
next_page_cursors[next_page_key] = next_page_end_cursor_results[0]

return next_page_cursors

def get_url_params(
self, context: Optional[Dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization."""
params = context or dict()
params["per_page"] = self.MAX_PER_PAGE
if next_page_token:
params.update(next_page_token)
return params
10 changes: 8 additions & 2 deletions tap_github/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,20 @@ class TapGitHub(Tap):
def discover_streams(self) -> List[Stream]:
"""Return a list of discovered streams for each query."""

if len(Streams.all_valid_queries().intersection(self.config)) != 1:
# If the config is empty, assume we are running --help or --capabilities.
if (
self.config
and len(Streams.all_valid_queries().intersection(self.config)) != 1
):
raise ValueError(
"This tap requires one and only one of the following path options: "
f"{Streams.all_valid_queries()}."
)
streams = []
for stream_type in Streams:
if len(stream_type.valid_queries.intersection(self.config)) > 0:
if (not self.config) or len(
stream_type.valid_queries.intersection(self.config)
) > 0:
streams += [
StreamClass(tap=self) for StreamClass in stream_type.streams
]
Expand Down
12 changes: 10 additions & 2 deletions tap_github/user_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ class StarredStream(GitHubRestStream):
# "repo_id" is the starred repo's id.
primary_keys = ["repo_id", "username"]
parent_stream_type = UserStream
# TODO - change partitioning key to user_id?
state_partitioning_keys = ["username"]
replication_key = "starred_at"
ignore_parent_replication_key = True
Expand Down Expand Up @@ -262,6 +263,8 @@ class UserContributedToStream(GitHubGraphqlStream):
primary_keys = ["username", "name_with_owner"]
replication_key = None
parent_stream_type = UserStream
# TODO - add user_id to schema
# TODO - change partitioning key to user_id?
state_partitioning_keys = ["username"]
ignore_parent_replication_key = True

Expand All @@ -270,9 +273,14 @@ def query(self) -> str:
"""Return dynamic GraphQL query."""
# Graphql id is equivalent to REST node_id. To keep the tap consistent, we rename "id" to "node_id".
return """
query userContributedTo($username: String!) {
query userContributedTo($username: String! $nextPageCursor_0: String) {
user (login: $username) {
repositoriesContributedTo (first: 100 includeUserRepositories: true orderBy: {field: STARGAZERS, direction: DESC}) {
repositoriesContributedTo (first: 100 after: $nextPageCursor_0 includeUserRepositories: true orderBy: {field: STARGAZERS, direction: DESC}) {
pageInfo {
hasNextPage_0: hasNextPage
startCursor_0: startCursor
endCursor_0: endCursor
}
nodes {
node_id: id
name_with_owner: nameWithOwner
Expand Down