diff --git a/cartography/intel/github/repos.py b/cartography/intel/github/repos.py index 3a7015508..12d47dff8 100644 --- a/cartography/intel/github/repos.py +++ b/cartography/intel/github/repos.py @@ -1,3 +1,4 @@ +import configparser import logging from string import Template from typing import Any @@ -14,7 +15,6 @@ from cartography.util import run_cleanup_job from cartography.util import timeit - logger = logging.getLogger(__name__) GITHUB_ORG_REPOS_PAGINATED_GRAPHQL = """ @@ -76,6 +76,11 @@ text } } + setupCfg:object(expression: "HEAD:setup.cfg") { + ... on Blob { + text + } + } } } } @@ -121,7 +126,8 @@ def transform(repos_json: List[Dict]) -> Dict: _transform_repo_objects(repo_object, transformed_repo_list) _transform_repo_owners(repo_object['owner']['url'], repo_object, transformed_repo_owners) _transform_collaborators(repo_object['collaborators'], repo_object['url'], transformed_collaborators) - _transform_python_requirements(repo_object['requirements'], repo_object['url'], transformed_requirements_files) + _transform_requirements_txt(repo_object['requirements'], repo_object['url'], transformed_requirements_files) + _transform_setup_cfg_requirements(repo_object['setupCfg'], repo_object['url'], transformed_requirements_files) results = { 'repos': transformed_repo_list, 'repo_languages': transformed_repo_languages, @@ -235,58 +241,125 @@ def _transform_collaborators(collaborators: Dict, repo_url: str, transformed_col transformed_collaborators[user_permission].append(user) -def _transform_python_requirements(req_file_contents: Dict, repo_url: str, out_requirements_files: List[Dict]) -> None: +def _transform_requirements_txt( + req_file_contents: Optional[Dict], + repo_url: str, + out_requirements_files: List[Dict], +) -> None: """ - Performs data transformations for the requirements.txt files in a GitHub repo, if available. - :param req_file_contents: str: The text contents of the requirements file. + Performs data transformations for the requirements.txt file in a GitHub repo, if available. + :param req_file_contents: Dict: The contents of the requirements.txt file. :param repo_url: str: The URL of the GitHub repo. :param out_requirements_files: Output array to append transformed results to. :return: Nothing. """ if req_file_contents and req_file_contents.get('text'): text_contents = req_file_contents['text'] + requirements_list = text_contents.split("\n") + _transform_python_requirements(requirements_list, repo_url, out_requirements_files) - parsed_list = [] - for line in text_contents.split("\n"): - # Remove trailing comments and extra whitespace - stripped_line = line.partition('#')[0].strip() - if stripped_line == '': - continue - try: - req = Requirement(stripped_line) - except InvalidRequirement: - # INFO and not WARN/ERROR as we intentionally don't support all ways to specify Python requirements - logger.info( - f"Failed to parse line \"{line}\" in repo {repo_url}'s requirements.txt; skipping line.", - exc_info=True, - ) - continue - parsed_list.append(req) - - for req in parsed_list: - pinned_version = None - if len(req.specifier) == 1: - specifier = next(iter(req.specifier)) - if specifier.operator == '==': - pinned_version = specifier.version - - # Set `spec` to a default value. Example values for str(req.specifier): "<4.0,>=3.0" or "==1.0.0". - spec: Optional[str] = str(req.specifier) - # Set spec to `None` instead of empty string so that the Neo4j driver will leave the library.specifier field - # undefined. As convention, we prefer undefined values over empty strings in the graph. - if spec == '': - spec = None - - canon_name = canonicalize_name(req.name) - requirement_id = f"{canon_name}|{pinned_version}" if pinned_version else canon_name - - out_requirements_files.append({ - "id": requirement_id, - "name": canon_name, - "specifier": spec, - "version": pinned_version, - "repo_url": repo_url, - }) + +def _transform_setup_cfg_requirements( + setup_cfg_contents: Optional[Dict], + repo_url: str, + out_requirements_files: List[Dict], +) -> None: + """ + Performs data transformations for the setup.cfg file in a GitHub repo, if available. + :param setup_cfg_contents: Dict: Contains contents of a repo's setup.cfg file. + :param repo_url: str: The URL of the GitHub repo. + :param out_requirements_files: Output array to append transformed results to. + :return: Nothing. + """ + if not setup_cfg_contents or not setup_cfg_contents.get('text'): + return + text_contents = setup_cfg_contents['text'] + setup_cfg = configparser.ConfigParser() + try: + setup_cfg.read_string(text_contents) + except configparser.Error: + logger.info( + f"Failed to parse {repo_url}'s setup.cfg; skipping.", + exc_info=True, + ) + return + requirements_list = parse_setup_cfg(setup_cfg) + _transform_python_requirements(requirements_list, repo_url, out_requirements_files) + + +def _transform_python_requirements( + requirements_list: List[str], + repo_url: str, + out_requirements_files: List[Dict], +) -> None: + """ + Helper function to perform data transformations on an arbitrary list of requirements. + :param requirements_list: List[str]: List of requirements + :param repo_url: str: The URL of the GitHub repo. + :param out_requirements_files: Output array to append transformed results to. + :return: Nothing. + """ + parsed_list = [] + for line in requirements_list: + stripped_line = line.partition('#')[0].strip() + if stripped_line == '': + continue + try: + req = Requirement(stripped_line) + except InvalidRequirement: + # INFO and not WARN/ERROR as we intentionally don't support all ways to specify Python requirements + logger.info( + f"Failed to parse line \"{line}\" in repo {repo_url}'s requirements.txt; skipping line.", + exc_info=True, + ) + continue + parsed_list.append(req) + + for req in parsed_list: + pinned_version = None + if len(req.specifier) == 1: + specifier = next(iter(req.specifier)) + if specifier.operator == '==': + pinned_version = specifier.version + + # Set `spec` to a default value. Example values for str(req.specifier): "<4.0,>=3.0" or "==1.0.0". + spec: Optional[str] = str(req.specifier) + # Set spec to `None` instead of empty string so that the Neo4j driver will leave the library.specifier field + # undefined. As convention, we prefer undefined values over empty strings in the graph. + if spec == '': + spec = None + + canon_name = canonicalize_name(req.name) + requirement_id = f"{canon_name}|{pinned_version}" if pinned_version else canon_name + + out_requirements_files.append({ + "id": requirement_id, + "name": canon_name, + "specifier": spec, + "version": pinned_version, + "repo_url": repo_url, + }) + + +def parse_setup_cfg(config: configparser.ConfigParser) -> List[str]: + reqs: List[str] = [] + reqs.extend(_parse_setup_cfg_requirements(config.get("options", "install_requires", fallback=""))) + reqs.extend(_parse_setup_cfg_requirements(config.get("options", "setup_requires", fallback=""))) + if config.has_section("options.extras_require"): + for _, val in config.items("options.extras_require"): + reqs.extend(_parse_setup_cfg_requirements(val)) + return reqs + + +# logic taken from setuptools: +# https://github.com/pypa/setuptools/blob/f359b8a7608c7f118710af02cb5edab4e6abb942/setuptools/config.py#L241-L258 +def _parse_setup_cfg_requirements(reqs: str, separator: str = ";") -> List[str]: + if "\n" in reqs: + reqs_list = reqs.splitlines() + else: + reqs_list = reqs.split(separator) + + return [req.strip() for req in reqs_list if req.strip()] @timeit diff --git a/docs/schema/github.md b/docs/schema/github.md index 457767ab7..021c6a4b1 100644 --- a/docs/schema/github.md +++ b/docs/schema/github.md @@ -174,7 +174,9 @@ Representation of a single Programming Language [language object](https://develo ## Dependency::PythonLibrary -Representation of a Python library as listed in a [requirements.txt](https://pip.pypa.io/en/stable/user_guide/#requirements-files) file. +Representation of a Python library as listed in a [requirements.txt](https://pip.pypa.io/en/stable/user_guide/#requirements-files) +or [setup.cfg](https://setuptools.pypa.io/en/latest/userguide/declarative_config.html) file. +Within a setup.cfg file, cartography will load everything from `install_requires`, `setup_requires`, and `extras_require`. | Field | Description | |-------|-------------| diff --git a/tests/data/github/repos.py b/tests/data/github/repos.py index 2ce2dbd1b..a362538c5 100644 --- a/tests/data/github/repos.py +++ b/tests/data/github/repos.py @@ -1,3 +1,5 @@ +import textwrap + GET_REPOS = [ { 'name': 'sample_repo', @@ -32,6 +34,14 @@ }, 'collaborators': {'edges': [], 'nodes': []}, 'requirements': {'text': 'cartography\nhttplib2<0.7.0\njinja2\nlxml\n-e git+https://example.com#egg=foobar\nhttps://example.com/foobar.tar.gz\npip @ https://github.com/pypa/pip/archive/1.3.1.zip#sha1=da9234ee9982d4bbb3c72346a6de940a148ea686\n'}, # noqa + 'setupCfg': { + 'text': textwrap.dedent(''' + [options] + install_requires = + neo4j + scipy!=1.20.0 # comment + '''), + }, }, { 'name': 'SampleRepo2', 'nameWithOwner': 'example_org/SampleRepo2', @@ -64,6 +74,7 @@ }, 'collaborators': None, 'requirements': None, + 'setupCfg': None, }, { 'name': 'cartography', @@ -139,7 +150,16 @@ ], }, 'requirements': { - 'text': 'cartography==0.1.0\nhttplib2>=0.7.0\njinja2\nlxml\n# This is a comment line to be ignored\n', + 'text': 'cartography==0.1.0\nhttplib2>=0.7.0\njinja2\nlxml\n# This is a comment line to be ignored\nokta==0.9.0', # noqa + }, + 'setupCfg': { + 'text': textwrap.dedent(''' + [options] + install_requires = + neo4j>=1.0.0 + numpy!=1.20.0 # comment + okta + '''), }, }, ] diff --git a/tests/integration/cartography/intel/github/test_repos.py b/tests/integration/cartography/intel/github/test_repos.py index 5044f6c7d..a23980d7e 100644 --- a/tests/integration/cartography/intel/github/test_repos.py +++ b/tests/integration/cartography/intel/github/test_repos.py @@ -190,7 +190,7 @@ def test_repository_to_collaborators(neo4j_session): def test_pinned_python_library_to_repo(neo4j_session): """ - Ensure that repositories are connected to pinned Python libraries. + Ensure that repositories are connected to pinned Python libraries stated as dependencies in requirements.txt. Create the path (:RepoA)-[:REQUIRES{specifier:"0.1.0"}]->(:PythonLibrary{'Cartography'})<-[:REQUIRES]-(:RepoB), and verify that exactly 1 repo is connected to the PythonLibrary with a specifier (RepoA). """ @@ -210,7 +210,7 @@ def test_pinned_python_library_to_repo(neo4j_session): def test_upinned_python_library_to_repo(neo4j_session): """ - Ensure that repositories are connected to un-pinned Python libraries. + Ensure that repositories are connected to un-pinned Python libraries stated as dependencies in requirements.txt. That is, create the path (:RepoA)-[r:REQUIRES{specifier:"0.1.0"}]->(:PythonLibrary{'Cartography'})<-[:REQUIRES]-(:RepoB), and verify that exactly 1 repo is connected to the PythonLibrary without using a pinned specifier (RepoB). @@ -227,3 +227,39 @@ def test_upinned_python_library_to_repo(neo4j_session): actual_nodes = {n['repo_count'] for n in nodes} expected_nodes = {1} assert actual_nodes == expected_nodes + + +def test_setup_cfg_library_to_repo(neo4j_session): + """ + Ensure that repositories are connected to Python libraries stated as dependencies in setup.cfg. + and verify that exactly 2 repos are connected to the PythonLibrary. + """ + _ensure_local_neo4j_has_test_data(neo4j_session) + + # Note: don't query for relationship attributes in code that needs to be fast. + query = """ + MATCH (repo:GitHubRepository)-[r:REQUIRES]->(lib:PythonLibrary{id:'neo4j'}) + RETURN count(repo) as repo_count + """ + nodes = neo4j_session.run(query) + actual_nodes = {n['repo_count'] for n in nodes} + expected_nodes = {2} + assert actual_nodes == expected_nodes + + +def test_python_library_in_multiple_requirements_files(neo4j_session): + """ + Ensure that repositories are connected to Python libraries stated as dependencies in + both setup.cfg and requirements.txt. Ensures that if the dependency has different + specifiers in each file, a separate node is created for each. + """ + _ensure_local_neo4j_has_test_data(neo4j_session) + + query = """ + MATCH (repo:GitHubRepository)-[r:REQUIRES]->(lib:PythonLibrary{name:'okta'}) + RETURN lib.id as lib_ids + """ + nodes = neo4j_session.run(query) + node_ids = {n['lib_ids'] for n in nodes} + assert len(node_ids) == 2 + assert node_ids == {'okta', 'okta|0.9.0'}