diff --git a/pytest.ini b/pytest.ini index 643ff660..39587f2c 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,3 +1,4 @@ [pytest] markers = - repo_list: mark a test as a repo_list. + repo_list: mark a test as using a list of repos in config + username_list: mark a test as using a list of usernames in config diff --git a/tap_github/client.py b/tap_github/client.py index b686119e..222f4ba3 100644 --- a/tap_github/client.py +++ b/tap_github/client.py @@ -234,6 +234,12 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: yield from results + def post_process(self, row: dict, context: Optional[Dict[str, str]] = None) -> dict: + """Add `repo_id` by default to all streams.""" + if context is not None and "repo_id" in context: + row["repo_id"] = context["repo_id"] + return row + class GitHubGraphqlStream(GraphQLStream, GitHubRestStream): """GitHub Graphql stream class.""" diff --git a/tap_github/repository_streams.py b/tap_github/repository_streams.py index d0604c53..f12ade34 100644 --- a/tap_github/repository_streams.py +++ b/tap_github/repository_streams.py @@ -1,12 +1,12 @@ """Repository Stream types classes for tap-github.""" -from typing import Any, Dict, Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional, Tuple import requests from singer_sdk import typing as th # JSON Schema typing helpers from singer_sdk.helpers.jsonpath import extract_jsonpath -from tap_github.client import GitHubRestStream +from tap_github.client import GitHubGraphqlStream, GitHubRestStream from tap_github.schema_objects import ( user_object, label_object, @@ -57,9 +57,73 @@ def records_jsonpath(self) -> str: # type: ignore else: return "$[*]" + def get_repo_ids(self, repo_list: List[Tuple[str]]) -> List[Dict[str, str]]: + """Enrich the list of repos with their numeric ID from github. + + This helps maintain a stable id for context and bookmarks. + It uses the github graphql api to fetch the databaseId. + It also removes non-existant repos and corrects casing to ensure + data is correct downstream. + """ + # use a temp handmade stream to reuse all the graphql setup of the tap + class TempStream(GitHubGraphqlStream): + name = "tempStream" + schema = th.PropertiesList( + th.Property("id", th.StringType), + th.Property("databaseId", th.IntegerType), + ).to_dict() + + def __init__(self, tap, repo_list) -> None: + super().__init__(tap) + self.repo_list = repo_list + + @property + def query(self) -> str: + chunks = list() + # there is probably some limit to how many items can be requested + # in a single query, but it's well above 1k. + for i, repo in enumerate(self.repo_list): + chunks.append( + f'repo{i}: repository(name: "{repo[1]}", owner: "{repo[0]}") ' + "{ nameWithOwner databaseId }" + ) + return "query {" + " ".join(chunks) + " }" + + repos_with_ids: list = list() + temp_stream = TempStream(self._tap, list(repo_list)) + # replace manually provided org/repo values by the ones obtained + # from github api. This guarantees that case is correct in the output data. + # See https://github.com/MeltanoLabs/tap-github/issues/110 + # Also remove repos which do not exist to avoid crashing further down + # the line. + for record in temp_stream.request_records({}): + for item in record.keys(): + try: + org, repo = record[item]["nameWithOwner"].split("/") + except TypeError: + # one of the repos returned `None`, which means it does + # not exist, log some details, and move on to the next one + repo_full_name = "/".join(repo_list[int(item[4:])]) + self.logger.warn( + ( + f"Repository not found: {repo_full_name} \t" + "Removing it from list" + ) + ) + continue + repos_with_ids.append( + {"org": org, "repo": repo, "repo_id": record[item]["databaseId"]} + ) + self.logger.info(f"Running the tap on {len(repos_with_ids)} repositories") + return repos_with_ids + @property - def partitions(self) -> Optional[List[Dict]]: - """Return a list of partitions.""" + def partitions(self) -> Optional[List[Dict[str, str]]]: + """Return a list of partitions. + + This is called before syncing records, we use it to fetch some additional + context + """ if "searches" in self.config: return [ {"search_name": s["name"], "search_query": s["query"]} @@ -67,7 +131,7 @@ def partitions(self) -> Optional[List[Dict]]: ] if "repositories" in self.config: split_repo_names = map(lambda s: s.split("/"), self.config["repositories"]) - return [{"org": r[0], "repo": r[1]} for r in split_repo_names] + return self.get_repo_ids(list(split_repo_names)) if "organizations" in self.config: return [{"org": org} for org in self.config["organizations"]] return None @@ -82,6 +146,7 @@ def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: return { "org": record["owner"]["login"], "repo": record["name"], + "repo_id": record["id"], } def get_records(self, context: Optional[Dict]) -> Iterable[Dict[str, Any]]: @@ -100,11 +165,13 @@ def get_records(self, context: Optional[Dict]) -> Iterable[Dict[str, Any]]: ): # build a minimal mock record so that self._sync_records # can proceed with child streams + # the id is fetched in `get_repo_ids` above yield { "owner": { "login": context["org"], }, "name": context["repo"], + "id": context["repo_id"], } else: yield from super().get_records(context) @@ -190,6 +257,7 @@ class ReadmeStream(GitHubRestStream): # Parent Keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # README Keys th.Property("type", th.StringType), th.Property("encoding", th.StringType), @@ -249,6 +317,7 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: # Parent Keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Readme HTML th.Property("raw_html", th.StringType), ).to_dict() @@ -269,6 +338,7 @@ class CommunityProfileStream(GitHubRestStream): # Parent Keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Community Profile th.Property("health_percentage", th.IntegerType), th.Property("description", th.StringType), @@ -372,6 +442,8 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: # the parent stream, e.g. for fork/parent PR events. row["target_repo"] = row.pop("repo", None) row["target_org"] = row.pop("org", None) + if context is not None: + row["repo_id"] = context["repo_id"] return row schema = th.PropertiesList( @@ -379,6 +451,7 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: th.Property("type", th.StringType), th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), th.Property("public", th.BooleanType), th.Property("_sdc_repository", th.StringType), th.Property("created_at", th.DateTimeType), @@ -518,6 +591,7 @@ class MilestonesStream(GitHubRestStream): # Parent Keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Rest th.Property("url", th.StringType), th.Property("html_url", th.StringType), @@ -551,6 +625,7 @@ class ReleasesStream(GitHubRestStream): # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Rest th.Property("url", th.StringType), th.Property("html_url", th.StringType), @@ -613,6 +688,7 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: # Parent Keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # A list of languages parsed by GitHub is available here: # https://github.com/github/linguist/blob/master/lib/linguist/languages.yml th.Property("language_name", th.StringType), @@ -632,6 +708,7 @@ class CollaboratorsStream(GitHubRestStream): # Parent Keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Rest th.Property("login", th.StringType), th.Property("id", th.IntegerType), @@ -670,6 +747,7 @@ class AssigneesStream(GitHubRestStream): # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Rest th.Property("login", th.StringType), th.Property("id", th.IntegerType), @@ -736,6 +814,8 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: # replace +1/-1 emojis to avoid downstream column name errors. row["plus_one"] = row.pop("+1", None) row["minus_one"] = row.pop("-1", None) + if context is not None: + row["repo_id"] = context["repo_id"] return row schema = th.PropertiesList( @@ -745,6 +825,7 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: th.Property("html_url", th.StringType), th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), th.Property("number", th.IntegerType), th.Property("updated_at", th.DateTimeType), th.Property("created_at", th.DateTimeType), @@ -811,6 +892,8 @@ def get_records(self, context: Optional[Dict] = None) -> Iterable[Dict[str, Any] def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: row["issue_number"] = int(row["issue_url"].split("/")[-1]) + if context is not None: + row["repo_id"] = context["repo_id"] if row["body"] is not None: # some comment bodies include control characters such as \x00 # that some targets (such as postgresql) choke on. This ensures @@ -823,6 +906,7 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Rest th.Property("id", th.IntegerType), th.Property("node_id", th.StringType), @@ -869,6 +953,8 @@ def get_records(self, context: Optional[Dict] = None) -> Iterable[Dict[str, Any] def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: row["issue_number"] = int(row["issue"].pop("number")) row["issue_url"] = row["issue"].pop("url") + if context is not None: + row["repo_id"] = context["repo_id"] return row schema = th.PropertiesList( @@ -876,6 +962,7 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: th.Property("node_id", th.StringType), th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), th.Property("issue_number", th.IntegerType), th.Property("issue_url", th.StringType), th.Property("event", th.StringType), @@ -911,11 +998,13 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: assert context is not None, "CommitsStream was called without context" row["repo"] = context["repo"] row["org"] = context["org"] + row["repo_id"] = context["repo_id"] return row schema = th.PropertiesList( th.Property("org", th.StringType), th.Property("repo", th.StringType), + th.Property("repo_id", th.IntegerType), th.Property("node_id", th.StringType), th.Property("url", th.StringType), th.Property("sha", th.StringType), @@ -977,6 +1066,7 @@ class CommitCommentsStream(GitHubRestStream): # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Rest th.Property("html_url", th.StringType), th.Property("url", th.StringType), @@ -1039,6 +1129,8 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: # replace +1/-1 emojis to avoid downstream column name errors. row["plus_one"] = row.pop("+1", None) row["minus_one"] = row.pop("-1", None) + if context is not None: + row["repo_id"] = context["repo_id"] return row def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: @@ -1046,18 +1138,21 @@ def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: return { "org": context["org"], "repo": context["repo"], + "repo_id": context["repo_id"], "pull_number": record["number"], } return { "pull_number": record["number"], "org": record["base"]["user"]["login"], "repo": record["base"]["repo"]["name"], + "repo_id": record["base"]["repo"]["id"], } schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # PR keys th.Property("id", th.IntegerType), th.Property("node_id", th.StringType), @@ -1162,6 +1257,7 @@ class PullRequestCommits(GitHubRestStream): # Parent keys th.Property("org", th.StringType), th.Property("repo", th.StringType), + th.Property("repo_id", th.IntegerType), th.Property("pull_number", th.IntegerType), # Rest th.Property("url", th.StringType), @@ -1235,6 +1331,7 @@ class ReviewsStream(GitHubRestStream): th.Property("pull_number", th.IntegerType), th.Property("org", th.StringType), th.Property("repo", th.StringType), + th.Property("repo_id", th.IntegerType), # Rest th.Property("id", th.IntegerType), th.Property("node_id", th.StringType), @@ -1270,6 +1367,7 @@ class ReviewCommentsStream(GitHubRestStream): # Parent keys th.Property("org", th.StringType), th.Property("repo", th.StringType), + th.Property("repo_id", th.IntegerType), # Rest th.Property("url", th.StringType), th.Property("pull_request_review_id", th.IntegerType), @@ -1322,6 +1420,7 @@ class ContributorsStream(GitHubRestStream): # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # User/Bot contributor keys th.Property("login", th.StringType), th.Property("id", th.IntegerType), @@ -1365,6 +1464,7 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Anonymous contributor keys th.Property("email", th.StringType), th.Property("name", th.StringType), @@ -1401,12 +1501,15 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: Add a user_id top-level field to be used as state replication key. """ row["user_id"] = row["user"]["id"] + if context is not None: + row["repo_id"] = context["repo_id"] return row schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), th.Property("user_id", th.IntegerType), # Stargazer Info th.Property("starred_at", th.DateTimeType), @@ -1458,6 +1561,7 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Activity keys th.Property("week_start", th.IntegerType), th.Property("additions", th.IntegerType), @@ -1484,12 +1588,18 @@ class ProjectsStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: - return {"project_id": record["id"]} + return { + "project_id": record["id"], + "repo_id": context["repo_id"] if context else None, + "org": context["org"] if context else None, + "repo": context["repo"] if context else None, + } schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Rest th.Property("owner_url", th.StringType), th.Property("url", th.StringType), @@ -1517,12 +1627,18 @@ class ProjectColumnsStream(GitHubRestStream): state_partitioning_keys = ["project_id", "repo", "org"] def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: - return {"column_id": record["id"]} + return { + "column_id": record["id"], + "repo_id": context["repo_id"] if context else None, + "org": context["org"] if context else None, + "repo": context["repo"] if context else None, + } schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), th.Property("project_id", th.IntegerType), # Rest th.Property("url", th.StringType), @@ -1549,6 +1665,7 @@ class ProjectCardsStream(GitHubRestStream): # Parent Keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), th.Property("project_id", th.IntegerType), th.Property("column_id", th.IntegerType), # Properties @@ -1584,6 +1701,7 @@ class WorkflowsStream(GitHubRestStream): # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # PR keys th.Property("id", th.IntegerType), th.Property("node_id", th.StringType), @@ -1618,6 +1736,7 @@ class WorkflowRunsStream(GitHubRestStream): # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # PR keys th.Property("id", th.IntegerType), th.Property("name", th.StringType), @@ -1665,6 +1784,7 @@ def get_child_context(self, record: dict, context: Optional[dict]) -> dict: "org": context["org"] if context else None, "repo": context["repo"] if context else None, "run_id": record["id"], + "repo_id": context["repo_id"] if context else None, } @@ -1685,6 +1805,7 @@ class WorkflowRunJobsStream(GitHubRestStream): # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # PR keys th.Property("id", th.IntegerType), th.Property("run_id", th.IntegerType), diff --git a/tap_github/tests/__init__.py b/tap_github/tests/__init__.py index 8103d8ce..4052797d 100644 --- a/tap_github/tests/__init__.py +++ b/tap_github/tests/__init__.py @@ -11,9 +11,13 @@ "api_calls_tests_cache", backend="sqlite", # make sure that API keys don't end up being cached - ignored_parameters=["Authorization"], + # Also ignore user-agent so that various versions of request + # can share the cache + ignored_parameters=["Authorization", "user-agent"], # tell requests_cache to check headers for the above parameter match_headers=True, # expire the cache after 24h (86400 seconds) expire_after=24 * 60 * 60, + # make sure graphql calls get cached as well + allowable_methods=["GET", "POST"], ) diff --git a/tap_github/tests/test_tap.py b/tap_github/tests/test_tap.py index 6ad8e852..bdf6af54 100644 --- a/tap_github/tests/test_tap.py +++ b/tap_github/tests/test_tap.py @@ -1,25 +1,48 @@ import os import logging import pytest +from typing import Optional from unittest.mock import patch +from singer_sdk.helpers._singer import Catalog +from singer_sdk.helpers import _catalog as cat_helpers from tap_github.tap import TapGitHub -from .fixtures import repo_list_config +from .fixtures import repo_list_config, username_list_config repo_list_2 = [ + "MeltanoLabs/tap-github", + # mistype the repo name so we can check that the tap corrects it + "MeltanoLabs/Tap-GitLab", + # mistype the org + "meltanolabs/target-athena", +] +# the same list, but without typos, for validation +repo_list_2_corrected = [ "MeltanoLabs/tap-github", "MeltanoLabs/tap-gitlab", "MeltanoLabs/target-athena", ] +# the github repo ids that match the repo names above +# in the same order +repo_list_2_ids = [ + 365087920, + 416891176, + 361619143, +] @pytest.mark.repo_list(repo_list_2) def test_validate_repo_list_config(repo_list_config): """Verify that the repositories list is parsed correctly""" repo_list_context = [ - {"org": repo.split("/")[0], "repo": repo.split("/")[1]} for repo in repo_list_2 + { + "org": repo[0].split("/")[0], + "repo": repo[0].split("/")[1], + "repo_id": repo[1], + } + for repo in zip(repo_list_2_corrected, repo_list_2_ids) ] tap = TapGitHub(config=repo_list_config) partitions = tap.streams["repositories"].partitions @@ -51,29 +74,78 @@ def alternative_sync_chidren(self, child_context: dict) -> None: child_stream.sync(context=child_context) -@pytest.mark.repo_list(repo_list_2) -def test_get_a_repository_in_repo_list_mode(capsys, repo_list_config): +def run_tap_with_config(capsys, config_obj: dict, skip_stream: Optional[str]) -> str: """ - Discover the catalog, and request 2 repository records + Run the tap with the given config and capture stdout, optionally + skipping a stream (this is meant to be the top level stream) """ - tap1 = TapGitHub(config=repo_list_config) + tap1 = TapGitHub(config=config_obj) tap1.run_discovery() - catalog = tap1.catalog_dict - # disable child streams - # FIXME: this does not work, the child streams are still fetched - # deselect_all_streams(catalog) - # set_catalog_stream_selected( - # catalog=catalog, stream_name="repositories", selected=True - # ) + catalog = Catalog.from_dict(tap1.catalog_dict) + # Reset and re-initialize with an input catalog + if skip_stream is not None: + cat_helpers.set_catalog_stream_selected( + catalog=catalog, + stream_name=skip_stream, + selected=False, + ) # discard previous output to stdout (potentially from other tests) capsys.readouterr() with patch( "singer_sdk.streams.core.Stream._sync_children", alternative_sync_chidren ): - tap2 = TapGitHub(config=repo_list_config, catalog=catalog) + tap2 = TapGitHub(config=config_obj, catalog=catalog.to_dict()) tap2.sync_all() captured = capsys.readouterr() - # Verify we got the right number of records (one per repo in the list) - assert captured.out.count('{"type": "RECORD", "stream": "repositories"') == len( - repo_list_2 + return captured.out + + +@pytest.mark.parametrize("skip_parent_streams", [False, True]) +@pytest.mark.repo_list(repo_list_2) +def test_get_a_repository_in_repo_list_mode( + capsys, repo_list_config, skip_parent_streams +): + """ + Discover the catalog, and request 2 repository records. + The test is parametrized to run twice, with and without + syncing the top level `repositories` stream. + """ + repo_list_config["skip_parent_streams"] = skip_parent_streams + captured_out = run_tap_with_config( + capsys, repo_list_config, "repositories" if skip_parent_streams else None + ) + # Verify we got the right number of records + # one per repo in the list only if we sync the "repositories" stream, 0 if not + assert captured_out.count('{"type": "RECORD", "stream": "repositories"') == len( + repo_list_2 * (not skip_parent_streams) + ) + # check that the tap corrects invalid case in config input + assert '"repo": "Tap-GitLab"' not in captured_out + assert '"org": "meltanolabs"' not in captured_out + + +# case is incorrect on purpose, so we can check that the tap corrects it +# and run the test twice, with and without syncing the `users` stream +@pytest.mark.parametrize("skip_parent_streams", [False, True]) +@pytest.mark.username_list(["EricBoucher", "aaRONsTeeRS"]) +def test_get_a_user_in_user_usernames_mode( + capsys, username_list_config, skip_parent_streams +): + """ + Discover the catalog, and request 2 repository records + """ + username_list_config["skip_parent_streams"] = skip_parent_streams + captured_out = run_tap_with_config( + capsys, username_list_config, "users" if skip_parent_streams else None + ) + # Verify we got the right number of records: + # one per user in the list if we sync the root stream, 0 otherwise + assert captured_out.count('{"type": "RECORD", "stream": "users"') == len( + username_list_config["user_usernames"] * (not skip_parent_streams) ) + # these 2 are inequalities as number will keep changing :) + assert captured_out.count('{"type": "RECORD", "stream": "starred"') > 150 + assert captured_out.count('{"type": "RECORD", "stream": "user_contributed_to"') > 50 + assert '{"username": "aaronsteers"' in captured_out + assert '{"username": "aaRONsTeeRS"' not in captured_out + assert '{"username": "EricBoucher"' not in captured_out diff --git a/tap_github/user_streams.py b/tap_github/user_streams.py index d227011b..5606d810 100644 --- a/tap_github/user_streams.py +++ b/tap_github/user_streams.py @@ -1,6 +1,6 @@ """User Stream types classes for tap-github.""" -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Iterable, Any from singer_sdk import typing as th # JSON Schema typing helpers @@ -26,7 +26,8 @@ def path(self) -> str: # type: ignore def partitions(self) -> Optional[List[Dict]]: """Return a list of partitions.""" if "user_usernames" in self.config: - return [{"username": u} for u in self.config["user_usernames"]] + # return [{"username": u} for u in self.config["user_usernames"]] + return self.get_user_ids(self.config["user_usernames"]) elif "user_ids" in self.config: return [{"id": id} for id in self.config["user_ids"]] return None @@ -34,8 +35,94 @@ def partitions(self) -> Optional[List[Dict]]: def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: return { "username": record["login"], + "user_id": record["id"], } + def get_user_ids(self, user_list: List[str]) -> List[Dict[str, str]]: + """Enrich the list of userse with their numeric ID from github. + + This helps maintain a stable id for context and bookmarks. + It uses the github graphql api to fetch the databaseId. + It also removes non-existant repos and corrects casing to ensure + data is correct downstream. + """ + # use a temp handmade stream to reuse all the graphql setup of the tap + class TempStream(GitHubGraphqlStream): + name = "tempStream" + schema = th.PropertiesList( + th.Property("id", th.StringType), + th.Property("databaseId", th.IntegerType), + ).to_dict() + + def __init__(self, tap, user_list) -> None: + super().__init__(tap) + self.user_list = user_list + + @property + def query(self) -> str: + chunks = list() + # there is probably some limit to how many items can be requested + # in a single query, but it's well above 1k. + for i, user in enumerate(self.user_list): + chunks.append( + f'user{i}: user(login: "{user}") ' "{ login databaseId }" + ) + return "query {" + " ".join(chunks) + " }" + + users_with_ids: list = list() + temp_stream = TempStream(self._tap, list(user_list)) + # replace manually provided org/repo values by the ones obtained + # from github api. This guarantees that case is correct in the output data. + # See https://github.com/MeltanoLabs/tap-github/issues/110 + # Also remove repos which do not exist to avoid crashing further down + # the line. + for record in temp_stream.request_records({}): + self.logger.error(record) + for item in record.keys(): + try: + username = record[item]["login"] + except TypeError: + # one of the usernames returned `None`, which means it does + # not exist, log some details, and move on to the next one + invalid_username = user_list[int(item[4:])] + self.logger.warn( + ( + f"Repository not found: {invalid_username} \t" + "Removing it from list" + ) + ) + continue + users_with_ids.append( + {"username": username, "user_id": record[item]["databaseId"]} + ) + self.logger.info(f"Running the tap on {len(users_with_ids)} users") + self.logger.info(users_with_ids) + return users_with_ids + + def get_records(self, context: Optional[Dict]) -> Iterable[Dict[str, Any]]: + """ + Override the parent method to allow skipping API calls + if the stream is deselected and skip_parent_streams is True in config. + This allows running the tap with fewer API calls and preserving + quota when only syncing a child stream. Without this, + the API call is sent but data is discarded. + """ + if ( + not self.selected + and "skip_parent_streams" in self.config + and self.config["skip_parent_streams"] + and context is not None + ): + # build a minimal mock record so that self._sync_records + # can proceed with child streams + # the id is fetched in `get_user_ids` above + yield { + "login": context["username"], + "id": context["user_id"], + } + else: + yield from super().get_records(context) + schema = th.PropertiesList( th.Property("login", th.StringType), th.Property("id", th.IntegerType), @@ -102,12 +189,15 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: Add a repo_id top-level field to be used as state replication key. """ row["repo_id"] = row["repo"]["id"] + if context is not None: + row["user_id"] = context["user_id"] return row schema = th.PropertiesList( # Parent Keys th.Property("username", th.StringType), th.Property("repo_id", th.StringType), + th.Property("user_id", th.IntegerType), # Starred Repo Info th.Property("starred_at", th.DateTimeType), th.Property(