Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow configuration via command-line options #41

Merged
merged 10 commits into from
Dec 11, 2024
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ To check whether the installation succeeded, run the following command and verif

```bash
$ scfw --version
1.0.2
1.1.0
```

### Post-installation steps
Expand Down
2 changes: 1 addition & 1 deletion scfw/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
A supply-chain "firewall" for preventing the installation of vulnerable or malicious `pip` and `npm` packages.
"""

__version__ = "1.0.2"
__version__ = "1.1.0"
58 changes: 47 additions & 11 deletions scfw/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
Defines the supply-chain firewall's command-line interface and performs argument parsing.
"""

from argparse import Namespace
from argparse import ArgumentError, Namespace
from enum import Enum
import logging
import sys
from typing import Callable, Optional

import scfw
from scfw.ecosystem import ECOSYSTEM
from scfw.parser import ArgumentError, ArgumentParser
from scfw.logger import FirewallAction
from scfw.parser import ArgumentParser

_LOG_LEVELS = list(
map(
Expand All @@ -28,7 +29,34 @@ def _add_configure_cli(parser: ArgumentParser) -> None:
Args:
parser: The `ArgumentParser` to which the `configure` command line will be added.
"""
return
parser.add_argument(
"--alias-pip",
action="store_true",
help="Add shell aliases to always run pip commands through Supply-Chain Firewall"
)

parser.add_argument(
"--alias-npm",
action="store_true",
help="Add shell aliases to always run npm commands through Supply-Chain Firewall"
)

parser.add_argument(
"--dd-api-key",
type=str,
default=None,
metavar="KEY",
help="API key to use when forwarding logs to Datadog"
)

parser.add_argument(
"--dd-log-level",
type=str,
default=None,
choices=[str(action) for action in FirewallAction],
metavar="LEVEL",
help="Desired logging level for Datadog log forwarding (options: %(choices)s)"
)


def _add_run_cli(parser: ArgumentParser) -> None:
Expand Down Expand Up @@ -60,6 +88,15 @@ class Subcommand(Enum):
Configure = "configure"
Run = "run"

def __str__(self) -> str:
"""
Format a `Subcommand` for printing.

Returns:
A `str` representing the given `Subcommand` suitable for printing.
"""
return self.value

def _parser_spec(self) -> dict:
"""
Return the `ArgumentParser` configuration for the given subcommand's parser.
Expand All @@ -72,13 +109,13 @@ def _parser_spec(self) -> dict:
case Subcommand.Configure:
return {
"exit_on_error": False,
"description": "Configure the environment for using the supply-chain firewall."
"description": "Configure the environment for using Supply-Chain Firewall."
}
case Subcommand.Run:
return {
"usage": "%(prog)s [options] COMMAND",
"exit_on_error": False,
"description": "Run a package manager command through the supply-chain firewall."
"description": "Run a package manager command through Supply-Chain Firewall."
}

def _cli_spec(self) -> Callable[[ArgumentParser], None]:
Expand Down Expand Up @@ -133,7 +170,7 @@ def _cli() -> ArgumentParser:
subparsers = parser.add_subparsers(dest="subcommand", required=True)

for subcommand in Subcommand:
subparser = subparsers.add_parser(subcommand.value, **subcommand._parser_spec())
subparser = subparsers.add_parser(str(subcommand), **subcommand._parser_spec())
subcommand._cli_spec()(subparser)

return parser
Expand All @@ -158,7 +195,7 @@ def _parse_command_line(argv: list[str]) -> tuple[Optional[Namespace], str]:
hinge = len(argv)
for ecosystem in ECOSYSTEM:
try:
hinge = min(hinge, argv.index(ecosystem.value))
hinge = min(hinge, argv.index(str(ecosystem)))
except ValueError:
pass

Expand All @@ -172,14 +209,13 @@ def _parse_command_line(argv: list[str]) -> tuple[Optional[Namespace], str]:
# the user selected the `run` subcommand
match Subcommand(args.subcommand), argv[hinge:]:
case Subcommand.Run, []:
raise ArgumentError
raise ArgumentError(None, "Missing required package manager command")
case Subcommand.Run, _:
args_dict = vars(args)
args_dict["command"] = argv[hinge:]
args.command = argv[hinge:]
case _, []:
pass
case _:
raise ArgumentError
raise ArgumentError(None, "Received unexpected package manager command")

return args, help_msg

Expand Down
17 changes: 10 additions & 7 deletions scfw/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ def get_package_manager_command(
"""
if not command:
raise ValueError("Missing package manager command")
match command[0]:
case ECOSYSTEM.PIP.value:
return ECOSYSTEM.PIP, PipCommand(command, executable)
case ECOSYSTEM.NPM.value:
return ECOSYSTEM.NPM, NpmCommand(command, executable)
case other:
raise ValueError(f"Unsupported package manager '{other}'")

try:
match (ecosystem := ECOSYSTEM(command[0])):
ikretz marked this conversation as resolved.
Show resolved Hide resolved
case ECOSYSTEM.PIP:
return ecosystem, PipCommand(command, executable)
case ECOSYSTEM.NPM:
return ecosystem, NpmCommand(command, executable)

except ValueError:
raise ValueError(f"Unsupported package manager '{command[0]}'")
39 changes: 29 additions & 10 deletions scfw/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@

from argparse import Namespace
import inquirer # type: ignore
import logging
import os
from pathlib import Path
import re
import tempfile

from scfw.logger import FirewallAction

_log = logging.getLogger(__name__)

DD_API_KEY_VAR = "DD_API_KEY"
"""
The environment variable under which the firewall looks for a Datadog API key.
Expand All @@ -25,10 +30,10 @@
_BLOCK_END = "# END SCFW MANAGED BLOCK"

_GREETING = (
"Thank you for using scfw, Datadog's supply-chain firewall!\n\n"
"Thank you for using scfw, the Supply-Chain Firewall by Datadog!\n\n"
"scfw is a tool for preventing the installation of malicious PyPI and npm packages.\n\n"
"This script will walk you through setting up your environment to get the most out\n"
"of the firewall. You can rerun this script at any time.\n"
"of scfw. You can rerun this script at any time.\n"
)

_EPILOGUE = (
Expand All @@ -47,16 +52,30 @@ def run_configure(args: Namespace) -> int:
Returns:
An integer status code, 0 or 1.
"""
print(_GREETING)
try:
interactive = not any({args.alias_pip, args.alias_npm, args.dd_api_key, args.dd_log_level})

if interactive:
print(_GREETING)
answers = inquirer.prompt(_get_questions())
else:
answers = vars(args)

if not answers:
return 0

for file in [Path.home() / file for file in _CONFIG_FILES]:
if file.exists():
_update_config_file(file, _format_answers(answers))

answers = inquirer.prompt(_get_questions())
for file in [Path.home() / file for file in _CONFIG_FILES]:
if file.exists():
_update_config_file(file, _format_answers(answers))
if interactive:
print(_EPILOGUE)

print(_EPILOGUE)
return 0

return 0
except Exception as e:
_log.error(e)
return 1


def _get_questions() -> list[inquirer.questions.Question]:
Expand Down Expand Up @@ -95,7 +114,7 @@ def _get_questions() -> list[inquirer.questions.Question]:
inquirer.List(
name="dd_log_level",
message="Select the desired log level for Datadog logging",
choices=["BLOCK", "ABORT", "ALLOW"],
choices=[str(action) for action in FirewallAction],
ignore=lambda answers: not has_dd_api_key and not answers["enable_dd_logs"]
)
]
Expand Down
9 changes: 9 additions & 0 deletions scfw/ecosystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,12 @@ class ECOSYSTEM(Enum):
"""
PIP = "pip"
NPM = "npm"

def __str__(self) -> str:
"""
Format an `ECOSYSTEM` for printing.

Returns:
A `str` representing the given `ECOSYSTEM` suitable for printing.
"""
return self.value
9 changes: 9 additions & 0 deletions scfw/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ def __lt__(self, other) -> bool:
case _:
return False

def __str__(self) -> str:
"""
Format a `FirewallAction` for printing.

Returns:
A `str` representing the given `FirewallAction` suitable for printing.
"""
return self.value


class FirewallLogger(metaclass=ABCMeta):
"""
Expand Down
5 changes: 4 additions & 1 deletion scfw/loggers/dd_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from datadog_api_client.v2.model.http_log_item import HTTPLogItem
import dotenv

_log = logging.getLogger(__name__)

_DD_LOG_FORMAT = "%(asctime)s %(levelname)s [%(name)s] [%(filename)s:%(lineno)d] - %(message)s"

_DD_LOG_LEVEL_DEFAULT = FirewallAction.BLOCK
Expand Down Expand Up @@ -96,6 +98,7 @@ def __init__(self):
try:
self._level = FirewallAction(os.getenv(DD_LOG_LEVEL_VAR))
except ValueError:
_log.warning(f"Undefined or invalid Datadog log level: using default level {_DD_LOG_LEVEL_DEFAULT}")
self._level = _DD_LOG_LEVEL_DEFAULT

def log(
Expand Down Expand Up @@ -127,7 +130,7 @@ def log(

self._logger.info(
message,
extra={"ecosystem": ecosystem.value, "targets": map(str, targets)}
extra={"ecosystem": str(ecosystem), "targets": map(str, targets)}
)


Expand Down
9 changes: 1 addition & 8 deletions scfw/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,6 @@
import argparse


class ArgumentError(Exception):
"""
An exception for `ArgumentParser` to raise.
"""
pass


class ArgumentParser(argparse.ArgumentParser):
"""
A drop-in replacement for `argparse.ArgumentParser` with a patched
Expand All @@ -26,4 +19,4 @@ def error(self, message):
Args:
message: The error message.
"""
raise ArgumentError(message)
raise argparse.ArgumentError(None, message)
22 changes: 20 additions & 2 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,25 @@ def test_cli_no_options_no_command():
"""
argv = ["scfw"]
args, _ = _parse_command_line(argv)
assert args == None
assert args is None


def test_cli_all_options_no_command():
"""
Invocation with all top-level options and no subcommand.
"""
argv = ["scfw", "--log-level", "DEBUG"]
args, _ = _parse_command_line(argv)
assert args is None


def test_cli_incorrect_subcommand():
"""
Invocation with a nonexistent subcommand.
"""
argv = ["scfw", "nonexistent"]
args, _ = _parse_command_line(argv)
assert args is None


def test_cli_all_options_no_command():
Expand All @@ -43,7 +61,7 @@ def test_cli_all_options_no_command():
executable = "/usr/bin/python"
argv = ["scfw", "run", "--executable", executable, "--dry-run"]
args, _ = _parse_command_line(argv)
assert args == None
assert args is None


def test_cli_all_options_pip():
Expand Down
Loading