Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@
"""This analyzer checks if the package has a similar structure to other packages maintained by the same user."""

import hashlib
import io
import logging
import tarfile

from macaron.json_tools import JsonType
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
from macaron.util import send_get_http, send_get_http_raw
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIInspectorAsset, PyPIPackageJsonAsset

logger: logging.Logger = logging.getLogger(__name__)

Expand All @@ -24,20 +21,7 @@ def __init__(self) -> None:
super().__init__(
name="similar_project_analyzer",
heuristic=Heuristics.SIMILAR_PROJECTS,
# TODO: these dependencies are used as this heuristic currently downloads many package sourcecode
# tarballs. Refactoring this heuristic to run more efficiently means this should have depends_on=None.
depends_on=[
(Heuristics.EMPTY_PROJECT_LINK, HeuristicResult.FAIL),
(Heuristics.ONE_RELEASE, HeuristicResult.FAIL),
(Heuristics.HIGH_RELEASE_FREQUENCY, HeuristicResult.FAIL),
(Heuristics.UNCHANGED_RELEASE, HeuristicResult.FAIL),
(Heuristics.CLOSER_RELEASE_JOIN_DATE, HeuristicResult.FAIL),
(Heuristics.SUSPICIOUS_SETUP, HeuristicResult.FAIL),
(Heuristics.WHEEL_ABSENCE, HeuristicResult.FAIL),
(Heuristics.ANOMALOUS_VERSION, HeuristicResult.FAIL),
(Heuristics.TYPOSQUATTING_PRESENCE, HeuristicResult.FAIL),
(Heuristics.FAKE_EMAIL, HeuristicResult.FAIL),
],
depends_on=None,
)

def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
Expand All @@ -58,112 +42,127 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
HeuristicAnalyzerValueError
if the analysis fails.
"""
package_name = pypi_package_json.component_name
target_hash = self.get_structure_hash(package_name)
if not target_hash:
target_structure = self.get_normalized_structure(pypi_package_json)
if not target_structure:
return HeuristicResult.SKIP, {}
target_hash = hashlib.sha256("\n".join(target_structure).encode("utf-8")).hexdigest()
detail_info: dict = {}
similar_projects: list[str] = []
result: HeuristicResult = HeuristicResult.PASS

maintainers = pypi_package_json.pypi_registry.get_maintainers_of_package(pypi_package_json.component_name)
if not maintainers:
# NOTE: This would ideally raise an error, identifying malformed package information, but issues with
# obtaining maintainer information from the HTML page means this will remains as a SKIP for now.
return HeuristicResult.SKIP, {}

analyzed: set[str] = {pypi_package_json.component_name}

maintainers = pypi_package_json.pypi_registry.get_maintainers_of_package(package_name)
if maintainers:
for maintainer in maintainers:
maintainer_packages = pypi_package_json.pypi_registry.get_packages_by_username(maintainer)
if not maintainer_packages:
for maintainer in maintainers:
maintainer_packages = pypi_package_json.pypi_registry.get_packages_by_username(maintainer)
if not maintainer_packages:
continue
for package in maintainer_packages:
# skip if it is a package we have already analyzed
if package in analyzed:
continue
for package in maintainer_packages:
if package == package_name:
continue
analyzed.add(package)

hash_value = self.get_structure_hash(package)
if target_hash == hash_value:
return HeuristicResult.FAIL, {
"message": f"The package {package_name} has a similar structure to {package}.",
"similar_package": package,
}
adjacent_pypi_json = PyPIPackageJsonAsset(
package, None, False, pypi_package_json.pypi_registry, {}, "", PyPIInspectorAsset("", [], {})
)
if not adjacent_pypi_json.download(""):
continue
structure = self.get_normalized_structure(adjacent_pypi_json)
if not structure:
continue

return HeuristicResult.PASS, {}
hash_value = hashlib.sha256("\n".join(structure).encode("utf-8")).hexdigest()
if target_hash == hash_value:
similar_projects.append(package)

def get_url(self, package_name: str, package_type: str = "sdist") -> str | None:
"""Get the URL of the package's sdist.
detail_info["similar_projects"] = similar_projects
if similar_projects:
result = HeuristicResult.FAIL

Parameters
----------
package_name : str
The name of the package.
package_type: str
The package type to retrieve the URL of.
return result, detail_info

Returns
-------
str | None:
The URL of the package's sdist or None if not found.
"""
json_url = f"https://pypi.org/pypi/{package_name}/json"
data = send_get_http(json_url, headers={})
if not data:
logger.debug("Failed to fetch package data for %s.", package_name)
return None

sdist = next((url for url in data["urls"] if url["packagetype"] == package_type and url.get("url")), None)
return sdist["url"] if sdist else None
def get_normalized_structure(self, pypi_package_json: PyPIPackageJsonAsset) -> set[str] | None:
"""Extract a normalized structure for a package.

def get_structure(self, package_name: str) -> list[str]:
"""Get the file structure of the package's sdist.
The normalized structure is the file tree structure of all python file in the package, with the package's
name removed, so it is comparable.

Parameters
----------
package_name : str
The name of the package.
pypi_package_json: PyPIPackageJsonAsset
The PyPI package JSON asset object.

Returns
-------
list[str]:
The list of files in the package's sdist.
set[str] | None:
The normalized structure of file paths in a set, or None if a problem was encountered.
"""
# TODO: We should not download the source distributions for every package.
# This is very inefficient. We should find a different way to extract the package
# structure, e.g., the inspector service?
sdist_url = self.get_url(package_name)
if not sdist_url:
logger.debug("Package %s does not have a sdist.", package_name)
return []

response = send_get_http_raw(sdist_url)
if not response:
logger.debug("Failed to download sdist for package %s.", package_name)
return []

buffer = io.BytesIO(response.content)
try:
with tarfile.open(fileobj=buffer, mode="r:gz") as tf:
members = [
member.name
for member in tf.getmembers()
if member.name and not member.name.startswith("PAXHeaders/")
]
except (tarfile.TarError, OSError) as error:
logger.debug("Error reading source code tar file: %s", error)
return []

return members

def get_structure_hash(self, package_name: str) -> str:
"""Get the hash of the package's file structure.
if not pypi_package_json.get_inspector_links():
return None

Parameters
----------
package_name : str
The name of the package.
# for normalizing the structure
version = pypi_package_json.component_version
if version is None:
version = pypi_package_json.get_latest_version()
if version is None:
return None

Returns
-------
str:
The hash of the package's file structure.
"""
structure = self.get_structure(package_name)
if not structure:
return ""
prefix = "./" + pypi_package_json.component_name + "-" + version
normalized_structure = set()

# try using the tarball first
tarball_link = pypi_package_json.inspector_asset.package_sdist_link
if tarball_link and pypi_package_json.inspector_asset.package_link_reachability[tarball_link]:
# all files are always prefixed with ./<package_name>-<version>/<...> in tarballs
# non-metadaata files then have <package_name>/
# prefix += "/" + pypi_package_json.component_name + "/"
structure = PyPIInspectorAsset.get_structure(tarball_link)
if structure:
for file_path in structure:
# we only consider python files. This avoids considering always package-specific files like PKG_INFO, licenses,
# build metadata, etc.
if file_path[-3:] != ".py":
continue

# remove the "/package_name" from the prefix as well, that way the structure between two packages with different
# names will be the same
normalized_structure.add(
file_path.removeprefix(prefix).removeprefix("/" + pypi_package_json.component_name)
)

# We can't compare against wheel structures if we keep setup.py in there
normalized_structure.discard("/setup.py")
return normalized_structure

wheel_links = pypi_package_json.inspector_asset.package_whl_links
if len(wheel_links) > 0:
# wheels have this extra field for package metadata
prefix += ".dist-info/"
# structure is generally going to be the same, platform-specific details may vary for pacakges
# which have platform-specific wheels
structure = PyPIInspectorAsset.get_structure(wheel_links[0])
if structure:
for file_path in structure:
# the .dist-info stuff is usually metadata
if file_path.startswith(prefix) or file_path[-3:] != ".py":
continue

# remove the "./package_name" from the prefix as well, that way the structure between
# two packages with different names will be the same
normalized_structure.add(
file_path.removeprefix(pypi_package_json.component_name + "/").removeprefix(
"./" + pypi_package_json.component_name
)
)

normalized = sorted([p.replace(package_name, "<ROOT>") for p in structure])
return normalized_structure

joined = "\n".join(normalized).encode("utf-8")
return hashlib.sha256(joined).hexdigest()
# doesn't have wheel or tarball links even made, so shouldn't get here if the first line of this
# function worked.
return None
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
import logging

from macaron.errors import HeuristicAnalyzerValueError
from macaron.json_tools import JsonType, json_extract
from macaron.json_tools import JsonType
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset
from macaron.util import send_head_http_raw

logger: logging.Logger = logging.getLogger(__name__)

Expand All @@ -23,13 +22,6 @@ class WheelAbsenceAnalyzer(BaseHeuristicAnalyzer):
heuristic fails.
"""

WHEEL: str = "bdist_wheel"
# as per https://github.com/pypi/inspector/blob/main/inspector/main.py line 125
INSPECTOR_TEMPLATE = (
"{inspector_url_scheme}://{inspector_url_netloc}/project/"
"{name}/{version}/packages/{first}/{second}/{rest}/{filename}"
)

def __init__(self) -> None:
super().__init__(
name="wheel_absence_analyzer",
Expand All @@ -53,83 +45,17 @@ def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicRes
Raises
------
HeuristicAnalyzerValueError
If there is no release information, or has other missing package information.
If there is missing package information.
"""
releases = pypi_package_json.get_releases()
if releases is None: # no release information
error_msg = "There is no information for any release of this package."
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

version = pypi_package_json.component_version
if version is None: # check latest release version
version = pypi_package_json.get_latest_version()

if version is None:
error_msg = "There is no latest version of this package."
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

# Contains a boolean field identifying if the link is reachable by this Macaron instance or not.
inspector_links: dict[str, JsonType] = {}
wheel_present: bool = False

release_distributions = json_extract(releases, [version], list)
if release_distributions is None:
error_msg = f"The version {version} is not available as a release."
if not pypi_package_json.get_inspector_links():
error_msg = "Unable to retrieve PyPI inspector information about package"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

for distribution in release_distributions:
# validate data
package_type = json_extract(distribution, ["packagetype"], str)
if package_type is None:
error_msg = f"The version {version} has no 'package type' field in a distribution"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

name = json_extract(pypi_package_json.package_json, ["info", "name"], str)
if name is None:
error_msg = f"The version {version} has no 'name' field in a distribution"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

blake2b_256 = json_extract(distribution, ["digests", "blake2b_256"], str)
if blake2b_256 is None:
error_msg = f"The version {version} has no 'blake2b_256' field in a distribution"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

filename = json_extract(distribution, ["filename"], str)
if filename is None:
error_msg = f"The version {version} has no 'filename' field in a distribution"
logger.debug(error_msg)
raise HeuristicAnalyzerValueError(error_msg)

if package_type == self.WHEEL:
wheel_present = True

inspector_link = self.INSPECTOR_TEMPLATE.format(
inspector_url_scheme=pypi_package_json.pypi_registry.inspector_url_scheme,
inspector_url_netloc=pypi_package_json.pypi_registry.inspector_url_netloc,
name=name,
version=version,
first=blake2b_256[0:2],
second=blake2b_256[2:4],
rest=blake2b_256[4:],
filename=filename,
)

# use a head request because we don't care about the response contents
inspector_links[inspector_link] = False
if send_head_http_raw(inspector_link):
inspector_links[inspector_link] = True # link was reachable

detail_info: dict[str, JsonType] = {
"inspector_links": inspector_links,
}
detail_info: dict = {"inspector_links": pypi_package_json.inspector_asset.package_link_reachability}

if wheel_present:
# At least one wheel file exists
if len(pypi_package_json.inspector_asset.package_whl_links) > 0:
return HeuristicResult.PASS, detail_info

return HeuristicResult.FAIL, detail_info
10 changes: 8 additions & 2 deletions src/macaron/repo_finder/repo_finder_pypi.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
from macaron.repo_finder.repo_finder_enums import RepoFinderInfo
from macaron.repo_finder.repo_validator import find_valid_repository_url
from macaron.slsa_analyzer.package_registry import PACKAGE_REGISTRIES, PyPIRegistry
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset, find_or_create_pypi_asset
from macaron.slsa_analyzer.package_registry.pypi_registry import (
PyPIInspectorAsset,
PyPIPackageJsonAsset,
find_or_create_pypi_asset,
)
from macaron.slsa_analyzer.specs.package_registry_spec import PackageRegistryInfo

logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -58,7 +62,9 @@ def find_repo(
pypi_registry = next((registry for registry in PACKAGE_REGISTRIES if isinstance(registry, PyPIRegistry)), None)
if not pypi_registry:
return "", RepoFinderInfo.PYPI_NO_REGISTRY
pypi_asset = PyPIPackageJsonAsset(purl.name, purl.version, False, pypi_registry, {}, "")
pypi_asset = PyPIPackageJsonAsset(
purl.name, purl.version, False, pypi_registry, {}, "", PyPIInspectorAsset("", [], {})
)

if not pypi_asset:
# This should be unreachable, as the pypi_registry has already been confirmed to be of type PyPIRegistry.
Expand Down
Loading
Loading