Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Graphql implementation of the repo StargazersStream #123

Merged
merged 25 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7d90097
initialization graphql stargazers
ericboucher May 17, 2022
0440ec0
Fix stargazers query
ericboucher May 18, 2022
89bb766
simplify query
ericboucher May 18, 2022
348f303
Update repository_streams.py
ericboucher May 18, 2022
42310c8
Add early exit with fake "since"
ericboucher May 18, 2022
ed77f27
Add new stream and order alphabetically
ericboucher May 18, 2022
3977dee
Add missing user params
ericboucher May 18, 2022
eec8d3d
Test different path for api_calls_tests_cache
ericboucher May 18, 2022
3b8e9b3
Retry pytest once on error
ericboucher May 18, 2022
f46c2e1
Fix push target in github action
ericboucher May 18, 2022
6ee2765
Update test_tap.yml
ericboucher May 18, 2022
7d43ea6
Update test_tap.yml
ericboucher May 18, 2022
2b433c6
Update test_tap.yml
ericboucher May 18, 2022
8ad4f69
Update test_tap.yml
ericboucher May 18, 2022
5b5b64c
Use alternative_sync_chidren in tap_core
ericboucher May 18, 2022
9e0dd6d
Update test_tap.yml
ericboucher May 18, 2022
6c58936
Update test_tap.yml
ericboucher May 18, 2022
0bb3531
Update test_tap.yml
ericboucher May 18, 2022
b58691a
Update tap_github/tests/fixtures.py
ericboucher May 18, 2022
288dd81
Override REST stream
ericboucher May 19, 2022
dc52830
Update repository_streams.py
ericboucher May 19, 2022
79d1970
Update test_tap.yml
ericboucher May 19, 2022
443825b
Revert "Override REST stream"
ericboucher May 19, 2022
1c79c06
Keep both streams but add warning
ericboucher May 19, 2022
862e7ca
Update repository_streams.py
ericboucher May 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions .github/workflows/test_tap.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
name: Test tap-github

on:
# Run on all pull requests and on pushes to master.
# Run on all pull requests and on pushes to main.
pull_request:
push:
branches:
- master
- main

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
tests:
Expand All @@ -26,13 +30,11 @@ jobs:
uses: actions/cache@v2.1.7
with:
# must match the path in tests/__init__.py
path: '**/api_calls_tests_cache.sqlite'
path: 'api_calls_tests_cache.sqlite'
# github cache expires after 1wk, and we expire the content after 24h
# this key should not need to change unless we need to clear the cache
key: api-cache-v3
restore-keys: |
api-cache-v3
api-cache-
restore-keys: api-cache-v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
Expand Down Expand Up @@ -61,5 +63,13 @@ jobs:
run: |
poetry run mypy . --ignore-missing-imports
- name: Test with pytest
id: test_pytest
continue-on-error: true
run: |
poetry run pytest --capture=no
- name: Test with pytest (run 2)
id: retry_test_pytest
if: steps.test_pytest.outcome=='failure' # check the step outcome, wait and retry
run: |
sleep 60m
poetry run pytest --capture=no
5 changes: 5 additions & 0 deletions tap_github/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,4 +333,9 @@ def get_url_params(
params["per_page"] = self.MAX_PER_PAGE
if next_page_token:
params.update(next_page_token)

since = self.get_starting_timestamp(context)
if self.replication_key and since:
params["since"] = str(since)

return params
110 changes: 109 additions & 1 deletion tap_github/repository_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
from singer_sdk import typing as th # JSON Schema typing helpers
from singer_sdk.helpers.jsonpath import extract_jsonpath

from dateutil.parser import parse
from urllib.parse import parse_qs, urlparse

from tap_github.client import GitHubGraphqlStream, GitHubRestStream
from tap_github.schema_objects import (
user_object,
Expand Down Expand Up @@ -1476,7 +1479,7 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]:
class StargazersStream(GitHubRestStream):
"""Defines 'Stargazers' stream. Warning: this stream does NOT track star deletions."""

name = "stargazers"
name = "stargazers_rest"
path = "/repos/{org}/{repo}/stargazers"
primary_keys = ["user_id", "repo", "org"]
parent_stream_type = RepositoryStream
Expand All @@ -1485,6 +1488,13 @@ class StargazersStream(GitHubRestStream):
# GitHub is missing the "since" parameter on this endpoint.
missing_since_parameter = True

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# TODO - remove warning with next release.
self.logger.warning(
"The stream 'stargazers_rest' is deprecated. Please use the Graphql version instead: 'stargazers'."
)

@property
def http_headers(self) -> dict:
"""Return the http headers needed.
Expand Down Expand Up @@ -1517,6 +1527,104 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict:
).to_dict()


class StargazersGraphqlStream(GitHubGraphqlStream):
"""Defines 'UserContributedToStream' stream. Warning: this stream 'only' gets the first 100 projects (by stars)."""

name = "stargazers"
ericboucher marked this conversation as resolved.
Show resolved Hide resolved
query_jsonpath = "$.data.repository.stargazers.edges.[*]"
primary_keys = ["user_id", "repo_id"]
aaronsteers marked this conversation as resolved.
Show resolved Hide resolved
replication_key = "starred_at"
parent_stream_type = RepositoryStream
state_partitioning_keys = ["repo_id"]
# The parent repository object changes if the number of stargazers changes.
ignore_parent_replication_key = False

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# TODO - remove warning with next release.
self.logger.warning(
"This stream 'stargazers' might conflict with previous implementation. "
"Looking for the older version? Use 'stargazers_rest'."
)

def post_process(self, row: dict, context: Optional[Dict] = None) -> dict:
"""
Add a user_id top-level field to be used as state replication key.
"""
row["user_id"] = row["user"]["id"]
if context is not None:
row["repo_id"] = context["repo_id"]
return row

def get_next_page_token(
self, response: requests.Response, previous_token: Optional[Any]
) -> Optional[Any]:
"""
Exit early if a since parameter is provided.
"""
request_parameters = parse_qs(str(urlparse(response.request.url).query))

# parse_qs interprets "+" as a space, revert this to keep an aware datetime
try:
since = (
request_parameters["since"][0].replace(" ", "+")
if "since" in request_parameters
else ""
)
except IndexError:
since = ""

# If since parameter is present, try to exit early by looking at the last "starred_at".
# Noting that we are traversing in DESCENDING order by STARRED_AT.
if since:
results = extract_jsonpath(self.query_jsonpath, input=response.json())
*_, last = results
if parse(last["starred_at"]) < parse(since):
return None
return super().get_next_page_token(response, previous_token)

@property
def query(self) -> str:
"""Return dynamic GraphQL query."""
# Graphql id is equivalent to REST node_id. To keep the tap consistent, we rename "id" to "node_id".
return """
query repositoryStargazers($repo: String! $org: String! $nextPageCursor_0: String) {
repository(name: $repo owner: $org) {
stargazers(first: 100 orderBy: {field: STARRED_AT direction: DESC} after: $nextPageCursor_0) {
pageInfo {
hasNextPage_0: hasNextPage
startCursor_0: startCursor
endCursor_0: endCursor
}
edges {
user: node {
node_id: id
id: databaseId
login
avatar_url: avatarUrl
html_url: url
type: __typename
site_admin: isSiteAdmin
}
starred_at: starredAt
}
}
}
}
"""

schema = th.PropertiesList(
# Parent Keys
th.Property("repo", th.StringType),
th.Property("org", th.StringType),
th.Property("repo_id", th.IntegerType),
# Stargazer Info
th.Property("user_id", th.IntegerType),
th.Property("starred_at", th.DateTimeType),
th.Property("user", user_object),
).to_dict()


class StatsContributorsStream(GitHubRestStream):
"""
Defines 'StatsContributors' stream. Fetching contributors activity.
Expand Down
56 changes: 29 additions & 27 deletions tap_github/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

from tap_github.repository_streams import (
AnonymousContributorsStream,
AssigneesStream,
CollaboratorsStream,
CommitCommentsStream,
CommitsStream,
CommunityProfileStream,
ContributorsStream,
Expand All @@ -13,26 +16,24 @@
IssueEventsStream,
IssuesStream,
LanguagesStream,
MilestonesStream,
ProjectCardsStream,
ProjectColumnsStream,
ProjectsStream,
PullRequestCommits,
PullRequestsStream,
ReadmeHtmlStream,
ReadmeStream,
ReleasesStream,
RepositoryStream,
ReviewCommentsStream,
ReviewsStream,
StargazersGraphqlStream,
StargazersStream,
StatsContributorsStream,
AssigneesStream,
CollaboratorsStream,
ReviewsStream,
ReviewCommentsStream,
ProjectsStream,
ProjectColumnsStream,
ProjectCardsStream,
PullRequestCommits,
MilestonesStream,
CommitCommentsStream,
ReleasesStream,
WorkflowsStream,
WorkflowRunJobsStream,
WorkflowRunsStream,
WorkflowsStream,
)
from tap_github.user_streams import (
StarredStream,
Expand All @@ -41,9 +42,9 @@
)
from tap_github.organization_streams import (
OrganizationStream,
TeamsStream,
TeamMembersStream,
TeamRolesStream,
TeamsStream,
)


Expand All @@ -63,34 +64,35 @@ def __init__(self, valid_queries: Set[str], streams: List[Type[Stream]]):
{"repositories", "organizations", "searches"},
[
AnonymousContributorsStream,
CommitsStream,
AssigneesStream,
CollaboratorsStream,
CommitCommentsStream,
CommitsStream,
CommunityProfileStream,
ContributorsStream,
EventsStream,
MilestonesStream,
ReleasesStream,
CollaboratorsStream,
AssigneesStream,
IssuesStream,
IssueCommentsStream,
IssueEventsStream,
IssuesStream,
LanguagesStream,
PullRequestsStream,
MilestonesStream,
ProjectCardsStream,
ProjectColumnsStream,
ProjectsStream,
PullRequestCommits,
ReviewsStream,
ReviewCommentsStream,
PullRequestsStream,
ReadmeHtmlStream,
ReadmeStream,
ReleasesStream,
RepositoryStream,
ReviewCommentsStream,
ReviewsStream,
StargazersGraphqlStream,
StargazersStream,
StatsContributorsStream,
ProjectsStream,
ProjectColumnsStream,
ProjectCardsStream,
WorkflowsStream,
WorkflowRunJobsStream,
WorkflowRunsStream,
WorkflowsStream,
],
)
USERS = (
Expand All @@ -103,7 +105,7 @@ def __init__(self, valid_queries: Set[str], streams: List[Type[Stream]]):
)
ORGANIZATIONS = (
{"organizations"},
[OrganizationStream, TeamsStream, TeamMembersStream, TeamRolesStream],
[OrganizationStream, TeamMembersStream, TeamRolesStream, TeamsStream],
)

@classmethod
Expand Down
29 changes: 28 additions & 1 deletion tap_github/tests/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
import logging
import datetime

import pytest
Expand Down Expand Up @@ -25,7 +27,7 @@ def repo_list_config(request):
"""
marker = request.node.get_closest_marker("repo_list")
if marker is None:
repo_list = ["octocat/hello-world", "mapswipe/mapswipe"]
repo_list = ["MeltanoLabs/tap-github", "mapswipe/mapswipe"]
else:
repo_list = marker.args[0]

Expand Down Expand Up @@ -93,3 +95,28 @@ def organization_list_config(request):
"organizations": organization_list,
"rate_limit_buffer": 100,
}


def alternative_sync_chidren(self, child_context: dict) -> None:
"""
Override for Stream._sync_children.
Enabling us to use an ORG_LEVEL_TOKEN for the collaborators stream.
"""
for child_stream in self.child_streams:
# Use org:write access level credentials for collaborators stream
if child_stream.name in ["collaborators"]:
ORG_LEVEL_TOKEN = os.environ.get("ORG_LEVEL_TOKEN")
if not ORG_LEVEL_TOKEN:
logging.warning(
'No "ORG_LEVEL_TOKEN" found. Skipping collaborators stream sync.'
)
continue
SAVED_GTHUB_TOKEN = os.environ.get("GITHUB_TOKEN")
os.environ["GITHUB_TOKEN"] = ORG_LEVEL_TOKEN
child_stream.sync(context=child_context)
os.environ["GITHUB_TOKEN"] = SAVED_GTHUB_TOKEN or ""
continue

# default behavior:
if child_stream.selected or child_stream.has_selected_descendents:
child_stream.sync(context=child_context)
9 changes: 7 additions & 2 deletions tap_github/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import logging

from unittest import mock
from unittest.mock import patch

from .fixtures import alternative_sync_chidren
from singer_sdk.testing import get_standard_tap_tests

from tap_github.tap import TapGitHub
Expand All @@ -27,8 +29,11 @@ def test_standard_tap_tests_for_search_mode(search_config):
def test_standard_tap_tests_for_repo_list_mode(repo_list_config):
"""Run standard tap tests from the SDK."""
tests = get_standard_tap_tests(TapGitHub, config=repo_list_config)
for test in tests:
test()
with patch(
"singer_sdk.streams.core.Stream._sync_children", alternative_sync_chidren
):
for test in tests:
test()


def test_standard_tap_tests_for_username_list_mode(username_list_config):
Expand Down
Loading