From f8c99ed62a91d8df8e764ea1bcbd6d5c217da639 Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Tue, 26 Apr 2022 13:34:15 +0300 Subject: [PATCH 01/15] Add repo_id to context of all child streams of Repositories --- tap_github/repository_streams.py | 196 ++++++++++++++++++++++++++++++- 1 file changed, 191 insertions(+), 5 deletions(-) diff --git a/tap_github/repository_streams.py b/tap_github/repository_streams.py index d0604c53..3e3f795b 100644 --- a/tap_github/repository_streams.py +++ b/tap_github/repository_streams.py @@ -1,12 +1,12 @@ """Repository Stream types classes for tap-github.""" -from typing import Any, Dict, Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional, Tuple import requests from singer_sdk import typing as th # JSON Schema typing helpers from singer_sdk.helpers.jsonpath import extract_jsonpath -from tap_github.client import GitHubRestStream +from tap_github.client import GitHubGraphqlStream, GitHubRestStream from tap_github.schema_objects import ( user_object, label_object, @@ -57,9 +57,73 @@ def records_jsonpath(self) -> str: # type: ignore else: return "$[*]" + def get_repo_ids(self, repo_list: List[Tuple[str]]) -> List[Dict[str, str]]: + """Enrich the list of repos with their numeric ID from github. + + This helps maintain a stable id for context and bookmarks. + It uses the github graphql api to fetch the databaseId. + It also removes non-existant repos and corrects casing to ensure + data is correct downstream. + """ + # use a temp handmade stream to reuse all the graphql setup of the tap + class TempStream(GitHubGraphqlStream): + name = "tempStream" + schema = th.PropertiesList( + th.Property("id", th.StringType), + th.Property("databaseId", th.IntegerType), + ).to_dict() + + def __init__(self, tap, repo_list) -> None: + super().__init__(tap) + self.repo_list = repo_list + + @property + def query(self) -> str: + chunks = list() + # there is probably some limit to how many items can be requested + # in a single query, but it's well above 1k. + for i, repo in enumerate(self.repo_list): + chunks.append( + f'repo{i}: repository(name: "{repo[1]}", owner: "{repo[0]}") ' + "{ nameWithOwner databaseId }" + ) + return "query {" + " ".join(chunks) + " }" + + repos_with_ids: list = list() + temp_stream = TempStream(self._tap, list(repo_list)) + # replace manually provided org/repo values by the ones obtained + # from github api. This guarantees that case is correct in the output data. + # See https://github.com/MeltanoLabs/tap-github/issues/110 + # Also remove repos which do not exist to avoid crashing further down + # the line. + for record in temp_stream.request_records({}): + for item in record.keys(): + try: + org, repo = record[item]["nameWithOwner"].split("/") + except TypeError: + # one of the repos returned `None`, which means it does + # not exist, log some details, and move on to the next one + repo_full_name = "/".join(repo_list[int(item[4:])]) + self.logger.warn( + ( + f"Repository not found: {repo_full_name} \t" + "Removing it from list" + ) + ) + continue + repos_with_ids.append( + {"org": org, "repo": repo, "id": record[item]["databaseId"]} + ) + self.logger.info(f"Running the tap on {len(repos_with_ids)} repositories") + return repos_with_ids + @property - def partitions(self) -> Optional[List[Dict]]: - """Return a list of partitions.""" + def partitions(self) -> Optional[List[Dict[str, str]]]: + """Return a list of partitions. + + This is called before syncing records, we use it to fetch some additional + context + """ if "searches" in self.config: return [ {"search_name": s["name"], "search_query": s["query"]} @@ -67,7 +131,7 @@ def partitions(self) -> Optional[List[Dict]]: ] if "repositories" in self.config: split_repo_names = map(lambda s: s.split("/"), self.config["repositories"]) - return [{"org": r[0], "repo": r[1]} for r in split_repo_names] + return self.get_repo_ids(list(split_repo_names)) if "organizations" in self.config: return [{"org": org} for org in self.config["organizations"]] return None @@ -82,6 +146,7 @@ def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: return { "org": record["owner"]["login"], "repo": record["name"], + "repo_id": record["id"], } def get_records(self, context: Optional[Dict]) -> Iterable[Dict[str, Any]]: @@ -100,11 +165,13 @@ def get_records(self, context: Optional[Dict]) -> Iterable[Dict[str, Any]]: ): # build a minimal mock record so that self._sync_records # can proceed with child streams + # the id is fetched in `get_repo_ids` above yield { "owner": { "login": context["org"], }, "name": context["repo"], + "id": context["id"], } else: yield from super().get_records(context) @@ -186,10 +253,15 @@ class ReadmeStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] tolerated_http_errors = [404] + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # README Keys th.Property("type", th.StringType), th.Property("encoding", th.StringType), @@ -245,10 +317,15 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: yield {"raw_html": response.text} + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Readme HTML th.Property("raw_html", th.StringType), ).to_dict() @@ -265,10 +342,15 @@ class CommunityProfileStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] tolerated_http_errors = [404] + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Community Profile th.Property("health_percentage", th.IntegerType), th.Property("description", th.StringType), @@ -372,6 +454,7 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: # the parent stream, e.g. for fork/parent PR events. row["target_repo"] = row.pop("repo", None) row["target_org"] = row.pop("org", None) + row["repo_id"] = context["repo_id"] return row schema = th.PropertiesList( @@ -379,6 +462,7 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: th.Property("type", th.StringType), th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), th.Property("public", th.BooleanType), th.Property("_sdc_repository", th.StringType), th.Property("created_at", th.DateTimeType), @@ -514,10 +598,15 @@ class MilestonesStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] ignore_parent_replication_key = True + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Rest th.Property("url", th.StringType), th.Property("html_url", th.StringType), @@ -547,10 +636,15 @@ class ReleasesStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] replication_key = "published_at" + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Rest th.Property("url", th.StringType), th.Property("html_url", th.StringType), @@ -609,10 +703,15 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: for key, value in languages_json.items(): yield {"language_name": key, "bytes": value} + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # A list of languages parsed by GitHub is available here: # https://github.com/github/linguist/blob/master/lib/linguist/languages.yml th.Property("language_name", th.StringType), @@ -628,10 +727,15 @@ class CollaboratorsStream(GitHubRestStream): ignore_parent_replication_key = True state_partitioning_keys = ["repo", "org"] + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Rest th.Property("login", th.StringType), th.Property("id", th.IntegerType), @@ -666,10 +770,15 @@ class AssigneesStream(GitHubRestStream): ignore_parent_replication_key = True state_partitioning_keys = ["repo", "org"] + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Rest th.Property("login", th.StringType), th.Property("id", th.IntegerType), @@ -736,6 +845,7 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: # replace +1/-1 emojis to avoid downstream column name errors. row["plus_one"] = row.pop("+1", None) row["minus_one"] = row.pop("-1", None) + row["repo_id"] = context["repo_id"] return row schema = th.PropertiesList( @@ -745,6 +855,7 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: th.Property("html_url", th.StringType), th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), th.Property("number", th.IntegerType), th.Property("updated_at", th.DateTimeType), th.Property("created_at", th.DateTimeType), @@ -811,6 +922,7 @@ def get_records(self, context: Optional[Dict] = None) -> Iterable[Dict[str, Any] def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: row["issue_number"] = int(row["issue_url"].split("/")[-1]) + row["repo_id"] = context["repo_id"] if row["body"] is not None: # some comment bodies include control characters such as \x00 # that some targets (such as postgresql) choke on. This ensures @@ -823,6 +935,7 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Rest th.Property("id", th.IntegerType), th.Property("node_id", th.StringType), @@ -869,6 +982,7 @@ def get_records(self, context: Optional[Dict] = None) -> Iterable[Dict[str, Any] def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: row["issue_number"] = int(row["issue"].pop("number")) row["issue_url"] = row["issue"].pop("url") + row["repo_id"] = context["repo_id"] return row schema = th.PropertiesList( @@ -876,6 +990,7 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: th.Property("node_id", th.StringType), th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), th.Property("issue_number", th.IntegerType), th.Property("issue_url", th.StringType), th.Property("event", th.StringType), @@ -911,11 +1026,13 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: assert context is not None, "CommitsStream was called without context" row["repo"] = context["repo"] row["org"] = context["org"] + row["repo_id"] = context["repo_id"] return row schema = th.PropertiesList( th.Property("org", th.StringType), th.Property("repo", th.StringType), + th.Property("repo_id", th.IntegerType), th.Property("node_id", th.StringType), th.Property("url", th.StringType), th.Property("sha", th.StringType), @@ -973,10 +1090,15 @@ class CommitCommentsStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] ignore_parent_replication_key = True + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Rest th.Property("html_url", th.StringType), th.Property("url", th.StringType), @@ -1039,6 +1161,7 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: # replace +1/-1 emojis to avoid downstream column name errors. row["plus_one"] = row.pop("+1", None) row["minus_one"] = row.pop("-1", None) + row["repo_id"] = context["repo_id"] return row def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: @@ -1058,6 +1181,7 @@ def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # PR keys th.Property("id", th.IntegerType), th.Property("node_id", th.StringType), @@ -1158,10 +1282,15 @@ class PullRequestCommits(GitHubRestStream): parent_stream_type = PullRequestsStream state_partitioning_keys = ["repo", "org"] + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent keys th.Property("org", th.StringType), th.Property("repo", th.StringType), + th.Property("repo_id", th.IntegerType), th.Property("pull_number", th.IntegerType), # Rest th.Property("url", th.StringType), @@ -1230,11 +1359,16 @@ class ReviewsStream(GitHubRestStream): ignore_parent_replication_key = False state_partitioning_keys = ["repo", "org"] + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent keys th.Property("pull_number", th.IntegerType), th.Property("org", th.StringType), th.Property("repo", th.StringType), + th.Property("repo_id", th.IntegerType), # Rest th.Property("id", th.IntegerType), th.Property("node_id", th.StringType), @@ -1266,10 +1400,15 @@ class ReviewCommentsStream(GitHubRestStream): ignore_parent_replication_key = True state_partitioning_keys = ["repo", "org"] + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent keys th.Property("org", th.StringType), th.Property("repo", th.StringType), + th.Property("repo_id", th.IntegerType), # Rest th.Property("url", th.StringType), th.Property("pull_request_review_id", th.IntegerType), @@ -1318,10 +1457,15 @@ class ContributorsStream(GitHubRestStream): ignore_parent_replication_key = True state_partitioning_keys = ["repo", "org"] + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # User/Bot contributor keys th.Property("login", th.StringType), th.Property("id", th.IntegerType), @@ -1361,10 +1505,15 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: parsed_response = super().parse_response(response) return filter(lambda x: x["type"] == "Anonymous", parsed_response) + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Anonymous contributor keys th.Property("email", th.StringType), th.Property("name", th.StringType), @@ -1401,12 +1550,14 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: Add a user_id top-level field to be used as state replication key. """ row["user_id"] = row["user"]["id"] + row["repo_id"] = context["repo_id"] return row schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), th.Property("user_id", th.IntegerType), # Stargazer Info th.Property("starred_at", th.DateTimeType), @@ -1454,10 +1605,15 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: week_with_author["user_id"] = week_with_author.pop("id") yield week_with_author + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Activity keys th.Property("week_start", th.IntegerType), th.Property("additions", th.IntegerType), @@ -1486,10 +1642,15 @@ class ProjectsStream(GitHubRestStream): def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: return {"project_id": record["id"]} + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # Rest th.Property("owner_url", th.StringType), th.Property("url", th.StringType), @@ -1519,10 +1680,15 @@ class ProjectColumnsStream(GitHubRestStream): def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: return {"column_id": record["id"]} + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), th.Property("project_id", th.IntegerType), # Rest th.Property("url", th.StringType), @@ -1545,10 +1711,15 @@ class ProjectCardsStream(GitHubRestStream): parent_stream_type = ProjectColumnsStream state_partitioning_keys = ["project_id", "repo", "org"] + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), th.Property("project_id", th.IntegerType), th.Property("column_id", th.IntegerType), # Properties @@ -1580,10 +1751,15 @@ class WorkflowsStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] records_jsonpath = "$.workflows[*]" + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # PR keys th.Property("id", th.IntegerType), th.Property("node_id", th.StringType), @@ -1614,10 +1790,15 @@ class WorkflowRunsStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] records_jsonpath = "$.workflow_runs[*]" + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # PR keys th.Property("id", th.IntegerType), th.Property("name", th.StringType), @@ -1681,10 +1862,15 @@ class WorkflowRunJobsStream(GitHubRestStream): state_partitioning_keys = ["repo", "org", "run_id"] records_jsonpath = "$.jobs[*]" + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + row["repo_id"] = context["repo_id"] + return row + schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), th.Property("org", th.StringType), + th.Property("repo_id", th.IntegerType), # PR keys th.Property("id", th.IntegerType), th.Property("run_id", th.IntegerType), From 8c13ccb99935642a13ce5d4a20db09ff4209a86d Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Tue, 26 Apr 2022 14:53:32 +0300 Subject: [PATCH 02/15] Add repo_id to PR stream context --- tap_github/repository_streams.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tap_github/repository_streams.py b/tap_github/repository_streams.py index 3e3f795b..f3608c18 100644 --- a/tap_github/repository_streams.py +++ b/tap_github/repository_streams.py @@ -1169,12 +1169,14 @@ def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: return { "org": context["org"], "repo": context["repo"], + "repo_id": context["repo_id"], "pull_number": record["number"], } return { "pull_number": record["number"], "org": record["base"]["user"]["login"], "repo": record["base"]["repo"]["name"], + "repo_id": record["base"]["repo"]["id"], } schema = th.PropertiesList( From d4c55e824a9c352a885919a316d8021e79d25ffb Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Wed, 27 Apr 2022 12:53:27 +0300 Subject: [PATCH 03/15] Refactor post_process onto parent class --- tap_github/client.py | 5 ++ tap_github/repository_streams.py | 84 -------------------------------- 2 files changed, 5 insertions(+), 84 deletions(-) diff --git a/tap_github/client.py b/tap_github/client.py index b686119e..8c2a506d 100644 --- a/tap_github/client.py +++ b/tap_github/client.py @@ -234,6 +234,11 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: yield from results + def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + """Add `repo_id` by default to all streams.""" + row["repo_id"] = context["repo_id"] + return row + class GitHubGraphqlStream(GraphQLStream, GitHubRestStream): """GitHub Graphql stream class.""" diff --git a/tap_github/repository_streams.py b/tap_github/repository_streams.py index f3608c18..c5c3b166 100644 --- a/tap_github/repository_streams.py +++ b/tap_github/repository_streams.py @@ -253,10 +253,6 @@ class ReadmeStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] tolerated_http_errors = [404] - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), @@ -317,10 +313,6 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: yield {"raw_html": response.text} - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), @@ -342,10 +334,6 @@ class CommunityProfileStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] tolerated_http_errors = [404] - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), @@ -598,10 +586,6 @@ class MilestonesStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] ignore_parent_replication_key = True - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), @@ -636,10 +620,6 @@ class ReleasesStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] replication_key = "published_at" - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), @@ -703,10 +683,6 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: for key, value in languages_json.items(): yield {"language_name": key, "bytes": value} - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), @@ -727,10 +703,6 @@ class CollaboratorsStream(GitHubRestStream): ignore_parent_replication_key = True state_partitioning_keys = ["repo", "org"] - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), @@ -770,10 +742,6 @@ class AssigneesStream(GitHubRestStream): ignore_parent_replication_key = True state_partitioning_keys = ["repo", "org"] - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), @@ -1090,10 +1058,6 @@ class CommitCommentsStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] ignore_parent_replication_key = True - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), @@ -1284,10 +1248,6 @@ class PullRequestCommits(GitHubRestStream): parent_stream_type = PullRequestsStream state_partitioning_keys = ["repo", "org"] - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent keys th.Property("org", th.StringType), @@ -1361,10 +1321,6 @@ class ReviewsStream(GitHubRestStream): ignore_parent_replication_key = False state_partitioning_keys = ["repo", "org"] - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent keys th.Property("pull_number", th.IntegerType), @@ -1402,10 +1358,6 @@ class ReviewCommentsStream(GitHubRestStream): ignore_parent_replication_key = True state_partitioning_keys = ["repo", "org"] - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent keys th.Property("org", th.StringType), @@ -1459,10 +1411,6 @@ class ContributorsStream(GitHubRestStream): ignore_parent_replication_key = True state_partitioning_keys = ["repo", "org"] - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), @@ -1507,10 +1455,6 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: parsed_response = super().parse_response(response) return filter(lambda x: x["type"] == "Anonymous", parsed_response) - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), @@ -1607,10 +1551,6 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: week_with_author["user_id"] = week_with_author.pop("id") yield week_with_author - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), @@ -1644,10 +1584,6 @@ class ProjectsStream(GitHubRestStream): def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: return {"project_id": record["id"]} - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), @@ -1682,10 +1618,6 @@ class ProjectColumnsStream(GitHubRestStream): def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: return {"column_id": record["id"]} - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), @@ -1713,10 +1645,6 @@ class ProjectCardsStream(GitHubRestStream): parent_stream_type = ProjectColumnsStream state_partitioning_keys = ["project_id", "repo", "org"] - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent Keys th.Property("repo", th.StringType), @@ -1753,10 +1681,6 @@ class WorkflowsStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] records_jsonpath = "$.workflows[*]" - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), @@ -1792,10 +1716,6 @@ class WorkflowRunsStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] records_jsonpath = "$.workflow_runs[*]" - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), @@ -1864,10 +1784,6 @@ class WorkflowRunJobsStream(GitHubRestStream): state_partitioning_keys = ["repo", "org", "run_id"] records_jsonpath = "$.jobs[*]" - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: - row["repo_id"] = context["repo_id"] - return row - schema = th.PropertiesList( # Parent keys th.Property("repo", th.StringType), From 23c39085fce228056e183037220fcc3827397569 Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Wed, 27 Apr 2022 13:28:33 +0300 Subject: [PATCH 04/15] Fix mypy errors --- tap_github/client.py | 4 +++- tap_github/repository_streams.py | 18 ++++++++++++------ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/tap_github/client.py b/tap_github/client.py index 8c2a506d..8fe8ee7b 100644 --- a/tap_github/client.py +++ b/tap_github/client.py @@ -234,8 +234,10 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: yield from results - def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: + def post_process(self, row: dict, context: Optional[Dict[str, str]] = None) -> dict: """Add `repo_id` by default to all streams.""" + if context is None: + return row row["repo_id"] = context["repo_id"] return row diff --git a/tap_github/repository_streams.py b/tap_github/repository_streams.py index c5c3b166..a101b366 100644 --- a/tap_github/repository_streams.py +++ b/tap_github/repository_streams.py @@ -442,7 +442,8 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: # the parent stream, e.g. for fork/parent PR events. row["target_repo"] = row.pop("repo", None) row["target_org"] = row.pop("org", None) - row["repo_id"] = context["repo_id"] + if context is not None: + row["repo_id"] = context["repo_id"] return row schema = th.PropertiesList( @@ -813,7 +814,8 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: # replace +1/-1 emojis to avoid downstream column name errors. row["plus_one"] = row.pop("+1", None) row["minus_one"] = row.pop("-1", None) - row["repo_id"] = context["repo_id"] + if context is not None: + row["repo_id"] = context["repo_id"] return row schema = th.PropertiesList( @@ -890,7 +892,8 @@ def get_records(self, context: Optional[Dict] = None) -> Iterable[Dict[str, Any] def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: row["issue_number"] = int(row["issue_url"].split("/")[-1]) - row["repo_id"] = context["repo_id"] + if context is not None: + row["repo_id"] = context["repo_id"] if row["body"] is not None: # some comment bodies include control characters such as \x00 # that some targets (such as postgresql) choke on. This ensures @@ -950,7 +953,8 @@ def get_records(self, context: Optional[Dict] = None) -> Iterable[Dict[str, Any] def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: row["issue_number"] = int(row["issue"].pop("number")) row["issue_url"] = row["issue"].pop("url") - row["repo_id"] = context["repo_id"] + if context is not None: + row["repo_id"] = context["repo_id"] return row schema = th.PropertiesList( @@ -1125,7 +1129,8 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: # replace +1/-1 emojis to avoid downstream column name errors. row["plus_one"] = row.pop("+1", None) row["minus_one"] = row.pop("-1", None) - row["repo_id"] = context["repo_id"] + if context is not None: + row["repo_id"] = context["repo_id"] return row def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: @@ -1496,7 +1501,8 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: Add a user_id top-level field to be used as state replication key. """ row["user_id"] = row["user"]["id"] - row["repo_id"] = context["repo_id"] + if context is not None: + row["repo_id"] = context["repo_id"] return row schema = th.PropertiesList( From b25230786992da73cd4cfde9c9d0785f52e6a22a Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Wed, 27 Apr 2022 13:42:00 +0300 Subject: [PATCH 05/15] Fix incorrect key in context --- tap_github/repository_streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_github/repository_streams.py b/tap_github/repository_streams.py index a101b366..14131768 100644 --- a/tap_github/repository_streams.py +++ b/tap_github/repository_streams.py @@ -112,7 +112,7 @@ def query(self) -> str: ) continue repos_with_ids.append( - {"org": org, "repo": repo, "id": record[item]["databaseId"]} + {"org": org, "repo": repo, "repo_id": record[item]["databaseId"]} ) self.logger.info(f"Running the tap on {len(repos_with_ids)} repositories") return repos_with_ids From 17631898cc1558f4ccb2e688a9d1522cff9aa084 Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Wed, 27 Apr 2022 15:33:32 +0300 Subject: [PATCH 06/15] Add missing context for projects --- tap_github/repository_streams.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/tap_github/repository_streams.py b/tap_github/repository_streams.py index 14131768..c9a5f039 100644 --- a/tap_github/repository_streams.py +++ b/tap_github/repository_streams.py @@ -1588,7 +1588,12 @@ class ProjectsStream(GitHubRestStream): state_partitioning_keys = ["repo", "org"] def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: - return {"project_id": record["id"]} + return { + "project_id": record["id"], + "repo_id": context["repo_id"] if context else None, + "org": context["org"] if context else None, + "repo": context["repo"] if context else None, + } schema = th.PropertiesList( # Parent keys @@ -1622,7 +1627,12 @@ class ProjectColumnsStream(GitHubRestStream): state_partitioning_keys = ["project_id", "repo", "org"] def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: - return {"column_id": record["id"]} + return { + "column_id": record["id"], + "repo_id": context["repo_id"] if context else None, + "org": context["org"] if context else None, + "repo": context["repo"] if context else None, + } schema = th.PropertiesList( # Parent Keys @@ -1774,6 +1784,7 @@ def get_child_context(self, record: dict, context: Optional[dict]) -> dict: "org": context["org"] if context else None, "repo": context["repo"] if context else None, "run_id": record["id"], + "repo_id": context["repo_id"] if context else None, } From 3fb6e75db665880e5459be3d984514d5dcb0e59e Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Wed, 27 Apr 2022 16:42:16 +0300 Subject: [PATCH 07/15] Fix config test --- tap_github/tests/test_tap.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/tap_github/tests/test_tap.py b/tap_github/tests/test_tap.py index 6ad8e852..7b6e534f 100644 --- a/tap_github/tests/test_tap.py +++ b/tap_github/tests/test_tap.py @@ -13,13 +13,25 @@ "MeltanoLabs/tap-gitlab", "MeltanoLabs/target-athena", ] +# the github repo ids that match the repo names above +# in the same order +repo_ids = [ + 365087920, + 416891176, + 361619143, +] @pytest.mark.repo_list(repo_list_2) def test_validate_repo_list_config(repo_list_config): """Verify that the repositories list is parsed correctly""" repo_list_context = [ - {"org": repo.split("/")[0], "repo": repo.split("/")[1]} for repo in repo_list_2 + { + "org": repo[0].split("/")[0], + "repo": repo[0].split("/")[1], + "repo_id": repo[1], + } + for repo in zip(repo_list_2, repo_ids) ] tap = TapGitHub(config=repo_list_config) partitions = tap.streams["repositories"].partitions From c897cf8f9a4acea2df6fd9c3727cd515a8604ee6 Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Wed, 27 Apr 2022 16:42:37 +0300 Subject: [PATCH 08/15] Correct context fetching --- tap_github/client.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tap_github/client.py b/tap_github/client.py index 8fe8ee7b..222f4ba3 100644 --- a/tap_github/client.py +++ b/tap_github/client.py @@ -236,9 +236,8 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: def post_process(self, row: dict, context: Optional[Dict[str, str]] = None) -> dict: """Add `repo_id` by default to all streams.""" - if context is None: - return row - row["repo_id"] = context["repo_id"] + if context is not None and "repo_id" in context: + row["repo_id"] = context["repo_id"] return row From 106d1ccfbe964c650402fa426864b5a88bdd8de1 Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Wed, 27 Apr 2022 17:37:55 +0300 Subject: [PATCH 09/15] Rename test fixture --- tap_github/tests/test_tap.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_github/tests/test_tap.py b/tap_github/tests/test_tap.py index 7b6e534f..8e1d3516 100644 --- a/tap_github/tests/test_tap.py +++ b/tap_github/tests/test_tap.py @@ -15,7 +15,7 @@ ] # the github repo ids that match the repo names above # in the same order -repo_ids = [ +repo_list_2_ids = [ 365087920, 416891176, 361619143, From d10256b6e0eed8cd868cdddc1a5e32596234ebdf Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Thu, 28 Apr 2022 11:36:06 +0300 Subject: [PATCH 10/15] Add user_id to user based streams --- tap_github/user_streams.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tap_github/user_streams.py b/tap_github/user_streams.py index d227011b..793e860a 100644 --- a/tap_github/user_streams.py +++ b/tap_github/user_streams.py @@ -34,6 +34,7 @@ def partitions(self) -> Optional[List[Dict]]: def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: return { "username": record["login"], + "user_id": record["id"], } schema = th.PropertiesList( @@ -102,12 +103,15 @@ def post_process(self, row: dict, context: Optional[Dict] = None) -> dict: Add a repo_id top-level field to be used as state replication key. """ row["repo_id"] = row["repo"]["id"] + if context is not None: + row["user_id"] = context["user_id"] return row schema = th.PropertiesList( # Parent Keys th.Property("username", th.StringType), th.Property("repo_id", th.StringType), + th.Property("user_id", th.IntegerType), # Starred Repo Info th.Property("starred_at", th.DateTimeType), th.Property( From e88548c70098956863b69c7ec1da28ce64694b42 Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Thu, 28 Apr 2022 11:37:17 +0300 Subject: [PATCH 11/15] Expand tests to check case correction and user streams --- tap_github/tests/test_tap.py | 58 ++++++++++++++++++++++++++---------- 1 file changed, 42 insertions(+), 16 deletions(-) diff --git a/tap_github/tests/test_tap.py b/tap_github/tests/test_tap.py index 8e1d3516..1326a7d4 100644 --- a/tap_github/tests/test_tap.py +++ b/tap_github/tests/test_tap.py @@ -6,12 +6,14 @@ from tap_github.tap import TapGitHub -from .fixtures import repo_list_config +from .fixtures import repo_list_config, username_list_config repo_list_2 = [ "MeltanoLabs/tap-github", - "MeltanoLabs/tap-gitlab", - "MeltanoLabs/target-athena", + # mistype the repo name so we can check that the tap corrects it + "MeltanoLabs/Tap-GitLab", + # mistype the org + "meltanolabs/target-athena", ] # the github repo ids that match the repo names above # in the same order @@ -31,7 +33,7 @@ def test_validate_repo_list_config(repo_list_config): "repo": repo[0].split("/")[1], "repo_id": repo[1], } - for repo in zip(repo_list_2, repo_ids) + for repo in zip(repo_list_2, repo_list_2_ids) ] tap = TapGitHub(config=repo_list_config) partitions = tap.streams["repositories"].partitions @@ -63,29 +65,53 @@ def alternative_sync_chidren(self, child_context: dict) -> None: child_stream.sync(context=child_context) -@pytest.mark.repo_list(repo_list_2) -def test_get_a_repository_in_repo_list_mode(capsys, repo_list_config): +def run_tap_with_config(capsys, config_obj) -> str: """ - Discover the catalog, and request 2 repository records + Run the tap with the given config and capture stdout """ - tap1 = TapGitHub(config=repo_list_config) + tap1 = TapGitHub(config=config_obj) tap1.run_discovery() catalog = tap1.catalog_dict - # disable child streams - # FIXME: this does not work, the child streams are still fetched - # deselect_all_streams(catalog) - # set_catalog_stream_selected( - # catalog=catalog, stream_name="repositories", selected=True - # ) # discard previous output to stdout (potentially from other tests) capsys.readouterr() with patch( "singer_sdk.streams.core.Stream._sync_children", alternative_sync_chidren ): - tap2 = TapGitHub(config=repo_list_config, catalog=catalog) + tap2 = TapGitHub(config=config_obj, catalog=catalog) tap2.sync_all() captured = capsys.readouterr() + return captured.out + + +@pytest.mark.repo_list(repo_list_2) +def test_get_a_repository_in_repo_list_mode(capsys, repo_list_config): + """ + Discover the catalog, and request 2 repository records + """ + captured_out = run_tap_with_config(capsys, repo_list_config) # Verify we got the right number of records (one per repo in the list) - assert captured.out.count('{"type": "RECORD", "stream": "repositories"') == len( + assert captured_out.count('{"type": "RECORD", "stream": "repositories"') == len( repo_list_2 ) + # check that the tap corrects invalid case in config input + assert '"repo": "Tap-GitLab"' not in captured_out + assert '"org": "meltanolabs"' not in captured_out + + +# case is incorrect on purpose, so we can check that the tap corrects it +@pytest.mark.repo_list(["EricBoucher", "aaRONsTeeRS"]) +def test_get_a_user_in_user_usernames_mode(capsys, username_list_config): + """ + Discover the catalog, and request 2 repository records + """ + captured_out = run_tap_with_config(capsys, username_list_config) + # Verify we got the right number of records (one per user in the list) + assert captured_out.count('{"type": "RECORD", "stream": "users"') == len( + username_list_config["user_usernames"] + ) + # these 2 are inequalities as number will keep changing :) + assert captured_out.count('{"type": "RECORD", "stream": "starred"') > 150 + assert captured_out.count('{"type": "RECORD", "stream": "user_contributed_to"') > 50 + assert '{"username": "aaronsteers"' in captured_out + assert '{"username": "aaRONsTeeRS"' not in captured_out + assert '{"username": "EricBoucher"' not in captured_out From a36d67cab680ec447dee627f708d6519e7cc1813 Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Thu, 28 Apr 2022 11:47:42 +0300 Subject: [PATCH 12/15] Fix config validation test --- tap_github/tests/test_tap.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tap_github/tests/test_tap.py b/tap_github/tests/test_tap.py index 1326a7d4..e2b9b4f4 100644 --- a/tap_github/tests/test_tap.py +++ b/tap_github/tests/test_tap.py @@ -15,6 +15,12 @@ # mistype the org "meltanolabs/target-athena", ] +# the same list, but without typos, for validation +repo_list_2_corrected = [ + "MeltanoLabs/tap-github", + "MeltanoLabs/tap-gitlab", + "MeltanoLabs/target-athena", +] # the github repo ids that match the repo names above # in the same order repo_list_2_ids = [ @@ -33,7 +39,7 @@ def test_validate_repo_list_config(repo_list_config): "repo": repo[0].split("/")[1], "repo_id": repo[1], } - for repo in zip(repo_list_2, repo_list_2_ids) + for repo in zip(repo_list_2_corrected, repo_list_2_ids) ] tap = TapGitHub(config=repo_list_config) partitions = tap.streams["repositories"].partitions From 1a5c2ef26554bc4bcc291cf5300a2e5cf1d23b32 Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Thu, 28 Apr 2022 13:22:03 +0300 Subject: [PATCH 13/15] Add option to skip top level user stream with tests --- pytest.ini | 1 + tap_github/repository_streams.py | 2 +- tap_github/tests/test_tap.py | 56 +++++++++++++++----- tap_github/user_streams.py | 90 +++++++++++++++++++++++++++++++- 4 files changed, 132 insertions(+), 17 deletions(-) diff --git a/pytest.ini b/pytest.ini index 643ff660..42af51b9 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,3 +1,4 @@ [pytest] markers = repo_list: mark a test as a repo_list. + username_list: make a test as using a list of usernames in config diff --git a/tap_github/repository_streams.py b/tap_github/repository_streams.py index c9a5f039..f12ade34 100644 --- a/tap_github/repository_streams.py +++ b/tap_github/repository_streams.py @@ -171,7 +171,7 @@ def get_records(self, context: Optional[Dict]) -> Iterable[Dict[str, Any]]: "login": context["org"], }, "name": context["repo"], - "id": context["id"], + "id": context["repo_id"], } else: yield from super().get_records(context) diff --git a/tap_github/tests/test_tap.py b/tap_github/tests/test_tap.py index e2b9b4f4..bdf6af54 100644 --- a/tap_github/tests/test_tap.py +++ b/tap_github/tests/test_tap.py @@ -1,9 +1,12 @@ import os import logging import pytest +from typing import Optional from unittest.mock import patch +from singer_sdk.helpers._singer import Catalog +from singer_sdk.helpers import _catalog as cat_helpers from tap_github.tap import TapGitHub from .fixtures import repo_list_config, username_list_config @@ -71,33 +74,50 @@ def alternative_sync_chidren(self, child_context: dict) -> None: child_stream.sync(context=child_context) -def run_tap_with_config(capsys, config_obj) -> str: +def run_tap_with_config(capsys, config_obj: dict, skip_stream: Optional[str]) -> str: """ - Run the tap with the given config and capture stdout + Run the tap with the given config and capture stdout, optionally + skipping a stream (this is meant to be the top level stream) """ tap1 = TapGitHub(config=config_obj) tap1.run_discovery() - catalog = tap1.catalog_dict + catalog = Catalog.from_dict(tap1.catalog_dict) + # Reset and re-initialize with an input catalog + if skip_stream is not None: + cat_helpers.set_catalog_stream_selected( + catalog=catalog, + stream_name=skip_stream, + selected=False, + ) # discard previous output to stdout (potentially from other tests) capsys.readouterr() with patch( "singer_sdk.streams.core.Stream._sync_children", alternative_sync_chidren ): - tap2 = TapGitHub(config=config_obj, catalog=catalog) + tap2 = TapGitHub(config=config_obj, catalog=catalog.to_dict()) tap2.sync_all() captured = capsys.readouterr() return captured.out +@pytest.mark.parametrize("skip_parent_streams", [False, True]) @pytest.mark.repo_list(repo_list_2) -def test_get_a_repository_in_repo_list_mode(capsys, repo_list_config): +def test_get_a_repository_in_repo_list_mode( + capsys, repo_list_config, skip_parent_streams +): """ - Discover the catalog, and request 2 repository records + Discover the catalog, and request 2 repository records. + The test is parametrized to run twice, with and without + syncing the top level `repositories` stream. """ - captured_out = run_tap_with_config(capsys, repo_list_config) - # Verify we got the right number of records (one per repo in the list) + repo_list_config["skip_parent_streams"] = skip_parent_streams + captured_out = run_tap_with_config( + capsys, repo_list_config, "repositories" if skip_parent_streams else None + ) + # Verify we got the right number of records + # one per repo in the list only if we sync the "repositories" stream, 0 if not assert captured_out.count('{"type": "RECORD", "stream": "repositories"') == len( - repo_list_2 + repo_list_2 * (not skip_parent_streams) ) # check that the tap corrects invalid case in config input assert '"repo": "Tap-GitLab"' not in captured_out @@ -105,15 +125,23 @@ def test_get_a_repository_in_repo_list_mode(capsys, repo_list_config): # case is incorrect on purpose, so we can check that the tap corrects it -@pytest.mark.repo_list(["EricBoucher", "aaRONsTeeRS"]) -def test_get_a_user_in_user_usernames_mode(capsys, username_list_config): +# and run the test twice, with and without syncing the `users` stream +@pytest.mark.parametrize("skip_parent_streams", [False, True]) +@pytest.mark.username_list(["EricBoucher", "aaRONsTeeRS"]) +def test_get_a_user_in_user_usernames_mode( + capsys, username_list_config, skip_parent_streams +): """ Discover the catalog, and request 2 repository records """ - captured_out = run_tap_with_config(capsys, username_list_config) - # Verify we got the right number of records (one per user in the list) + username_list_config["skip_parent_streams"] = skip_parent_streams + captured_out = run_tap_with_config( + capsys, username_list_config, "users" if skip_parent_streams else None + ) + # Verify we got the right number of records: + # one per user in the list if we sync the root stream, 0 otherwise assert captured_out.count('{"type": "RECORD", "stream": "users"') == len( - username_list_config["user_usernames"] + username_list_config["user_usernames"] * (not skip_parent_streams) ) # these 2 are inequalities as number will keep changing :) assert captured_out.count('{"type": "RECORD", "stream": "starred"') > 150 diff --git a/tap_github/user_streams.py b/tap_github/user_streams.py index 793e860a..5606d810 100644 --- a/tap_github/user_streams.py +++ b/tap_github/user_streams.py @@ -1,6 +1,6 @@ """User Stream types classes for tap-github.""" -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Iterable, Any from singer_sdk import typing as th # JSON Schema typing helpers @@ -26,7 +26,8 @@ def path(self) -> str: # type: ignore def partitions(self) -> Optional[List[Dict]]: """Return a list of partitions.""" if "user_usernames" in self.config: - return [{"username": u} for u in self.config["user_usernames"]] + # return [{"username": u} for u in self.config["user_usernames"]] + return self.get_user_ids(self.config["user_usernames"]) elif "user_ids" in self.config: return [{"id": id} for id in self.config["user_ids"]] return None @@ -37,6 +38,91 @@ def get_child_context(self, record: Dict, context: Optional[Dict]) -> dict: "user_id": record["id"], } + def get_user_ids(self, user_list: List[str]) -> List[Dict[str, str]]: + """Enrich the list of userse with their numeric ID from github. + + This helps maintain a stable id for context and bookmarks. + It uses the github graphql api to fetch the databaseId. + It also removes non-existant repos and corrects casing to ensure + data is correct downstream. + """ + # use a temp handmade stream to reuse all the graphql setup of the tap + class TempStream(GitHubGraphqlStream): + name = "tempStream" + schema = th.PropertiesList( + th.Property("id", th.StringType), + th.Property("databaseId", th.IntegerType), + ).to_dict() + + def __init__(self, tap, user_list) -> None: + super().__init__(tap) + self.user_list = user_list + + @property + def query(self) -> str: + chunks = list() + # there is probably some limit to how many items can be requested + # in a single query, but it's well above 1k. + for i, user in enumerate(self.user_list): + chunks.append( + f'user{i}: user(login: "{user}") ' "{ login databaseId }" + ) + return "query {" + " ".join(chunks) + " }" + + users_with_ids: list = list() + temp_stream = TempStream(self._tap, list(user_list)) + # replace manually provided org/repo values by the ones obtained + # from github api. This guarantees that case is correct in the output data. + # See https://github.com/MeltanoLabs/tap-github/issues/110 + # Also remove repos which do not exist to avoid crashing further down + # the line. + for record in temp_stream.request_records({}): + self.logger.error(record) + for item in record.keys(): + try: + username = record[item]["login"] + except TypeError: + # one of the usernames returned `None`, which means it does + # not exist, log some details, and move on to the next one + invalid_username = user_list[int(item[4:])] + self.logger.warn( + ( + f"Repository not found: {invalid_username} \t" + "Removing it from list" + ) + ) + continue + users_with_ids.append( + {"username": username, "user_id": record[item]["databaseId"]} + ) + self.logger.info(f"Running the tap on {len(users_with_ids)} users") + self.logger.info(users_with_ids) + return users_with_ids + + def get_records(self, context: Optional[Dict]) -> Iterable[Dict[str, Any]]: + """ + Override the parent method to allow skipping API calls + if the stream is deselected and skip_parent_streams is True in config. + This allows running the tap with fewer API calls and preserving + quota when only syncing a child stream. Without this, + the API call is sent but data is discarded. + """ + if ( + not self.selected + and "skip_parent_streams" in self.config + and self.config["skip_parent_streams"] + and context is not None + ): + # build a minimal mock record so that self._sync_records + # can proceed with child streams + # the id is fetched in `get_user_ids` above + yield { + "login": context["username"], + "id": context["user_id"], + } + else: + yield from super().get_records(context) + schema = th.PropertiesList( th.Property("login", th.StringType), th.Property("id", th.IntegerType), From 40a9d11a48694c673ff8327eeff635ad619a011d Mon Sep 17 00:00:00 2001 From: Laurent Savaete Date: Thu, 28 Apr 2022 16:41:07 +0300 Subject: [PATCH 14/15] Try to cache more requests --- tap_github/tests/__init__.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tap_github/tests/__init__.py b/tap_github/tests/__init__.py index 8103d8ce..4052797d 100644 --- a/tap_github/tests/__init__.py +++ b/tap_github/tests/__init__.py @@ -11,9 +11,13 @@ "api_calls_tests_cache", backend="sqlite", # make sure that API keys don't end up being cached - ignored_parameters=["Authorization"], + # Also ignore user-agent so that various versions of request + # can share the cache + ignored_parameters=["Authorization", "user-agent"], # tell requests_cache to check headers for the above parameter match_headers=True, # expire the cache after 24h (86400 seconds) expire_after=24 * 60 * 60, + # make sure graphql calls get cached as well + allowable_methods=["GET", "POST"], ) From cbe697192b5c05b0b1fc854182546eebc6abeaec Mon Sep 17 00:00:00 2001 From: Eric Boucher Date: Fri, 29 Apr 2022 00:27:46 -0400 Subject: [PATCH 15/15] Update marker --- pytest.ini | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pytest.ini b/pytest.ini index 42af51b9..39587f2c 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,4 @@ [pytest] markers = - repo_list: mark a test as a repo_list. - username_list: make a test as using a list of usernames in config + repo_list: mark a test as using a list of repos in config + username_list: mark a test as using a list of usernames in config