diff --git a/README.md b/README.md index 439d563..a01e5ac 100644 --- a/README.md +++ b/README.md @@ -69,29 +69,29 @@ To see more information about how to set the variables, and what they do, read t The script to get CVEs can now be executed (in the second terminal window) using; ```shell -python3 cve2stix.py \ +python3 cve2stix.py MODE \ --last_modified_earliest date \ --last_modified_latest date \ --file_time_range dictionary ``` -* `last_modified_earliest` (required, date in format `YYYY-MM-DDThh:mm:ss`): earliest modified data - * default: none -* `last_modified_latest` (required, date in format `YYYY-MM-DDThh:mm:ss`): used in the the cve2stix/cpe2stix config - * default: none +* `MODE`: either + * `mod`: uses modified date for `--earliest` and `--latest` flags (recommended for daily updates) + * `pub`: uses published date for `--earliest` and `--latest` flags (recommended for backfill -- see note later) +* `earliest` (required, date in format `YYYY-MM-DDThh:mm:ss`): earliest datetime you want +* `latest` (required, date in format `YYYY-MM-DDThh:mm:ss`): latest datetime you want * `file_time_range` (required): defines how much data should be packed in each output bundle. Use `d` for days, `m` for months, `y` for years. Note, if no results are found for a time period, a bundle will not be generated. This usually explains why you see "missing" bundles for a day or month. - * default `1d` (1 day) + * default `1m` (1 month) IMPORTANT: if the time between `--last_modified_earliest` and `--last_modified_latest` is greater than 120 days and you select `--file_time_range` = `1y`, the script will batch celery jobs with different `lastModStartDate` and `lastModEndDate` as NVD only allows for a range of 120 days to be specified in a request. -The script will also filter the data created using any values entered in the `.env` file on each run. - -e.g. get all cves for the first week of December 2024 (and place into daily bundles) +e.g. get all cves with modified times that are in the first week of December 2024 (and place into daily bundles) ```shell python3 cve2stix.py \ - --last_modified_earliest 2024-12-01T00:00:00 \ - --last_modified_latest 2024-12-07T23:59:59 \ + mod \ + --earliest 2024-12-01T00:00:00 \ + --latest 2024-12-07T23:59:59 \ --file_time_range 1d ``` @@ -100,9 +100,9 @@ Will generate bundle files in directories as follows: ```txt output └── bundles - ├── cve-bundle-2024_08_01-00_00_00-2024_08_01-23_59_59.json - ├── cve-bundle-2024_08_02-00_00_00-2024_08_02-23_59_59.json - ├── cve-bundle-2024_08_03-00_00_00-2024_08_03-23_59_59.json + ├── cve-bundle-2024_12_01-00_00_00-2024_08_01-23_59_59.json + ├── cve-bundle-2024_12_02-00_00_00-2024_08_02-23_59_59.json + ├── cve-bundle-2024_12_03-00_00_00-2024_08_03-23_59_59.json ├── ... ``` @@ -114,15 +114,22 @@ Between 2024-11-19 and 2024-11-21 most of the NVD dataset was modified as part o You can read more about this at https://www.nist.gov/itl/nvd#november1524. -This is problematic for us, as will result in huge bundles using the normal `modDate` approach. +This is problematic for us, as will result in huge bundles using `mod` mode. + +As such, we have build in the `pub` to handle this data more graciously. Because the `pubDate` are more spread out, the resulting will be more manageable sizes. -As such, we have build in the `--all_time` flag to handle this data more graciously. All time mode uses `pubDate` instead of `modDate` to bundle the files. This will start the run from 1988 (first CVE `pubDate` though to day script is executed). e.g. +Recommended backfill (to December 2024); ```shell python3 cve2stix.py \ - --all_time + pub \ + --earliest 1988-10-01T00:00:00 \ + --latest 2024-11-30T23:59:59 \ + --file_time_range 1d ``` +(earliest CVE, CVE-1999-0095 was published `1988-10-01T04:00:00.000`). + ## Useful supporting tools * To generate STIX 2.1 Objects: [stix2 Python Lib](https://stix2.readthedocs.io/en/latest/) diff --git a/cve2stix.py b/cve2stix.py index 8b26b4c..a114381 100644 --- a/cve2stix.py +++ b/cve2stix.py @@ -20,12 +20,15 @@ import os import dotenv from tqdm import tqdm +import pytz dotenv.load_dotenv() +PUB_START_DATE = dt(1988, 10, 1, tzinfo=timezone.utc) + def valid_date(s): try: - return dt.strptime(s, "%Y-%m-%dT%H:%M:%S") + return pytz.utc.localize(dt.strptime(s, "%Y-%m-%dT%H:%M:%S")) except ValueError: msg = f"Not a valid date: {s}. Please use the format `YYYY-MM-DDThh:mm:ss`." raise argparse.ArgumentTypeError(msg) @@ -44,26 +47,52 @@ def parse_time_range(s): raise argparse.ArgumentTypeError(f"Prefix cannot be zero or negative: {s}") return s +class _HelpAction(argparse._HelpAction): + + def __call__(self, parser, namespace, values, option_string=None): + parser.print_help() + + # retrieve subparsers from parser + subparsers_actions = [ + action for action in parser._actions + if isinstance(action, argparse._SubParsersAction)] + # there will probably only be one subparser_action, + # but better save than sorry + for subparsers_action in subparsers_actions: + # get all subparsers and print help + for choice, subparser in subparsers_action.choices.items(): + print(" ========= Mode '{}' ========= ".format(choice)) + print(subparser.format_help()) + + parser.exit() def parse_args(): - parser = argparse.ArgumentParser(description="Helper script for converting CVE and CPE data to STIX format.", allow_abbrev=True) + parser = argparse.ArgumentParser(description="Helper script for converting CVE and CPE data to STIX format.", allow_abbrev=True, add_help=False) + parser.add_argument('--help', action=_HelpAction, help='help for help if you need some help') # add custom help + # Create an argument group for last modified filters (conditionally required) - last_modified_group = parser.add_argument_group('last_modified_filters', 'Filters for the time range of CVE data') + subparsers = parser.add_subparsers(dest='mode', required=True) + + mod_group = subparsers.add_parser('mod', help='Filters for the time range of CVE data by lastModStartDate & lastModEndDate') + all_time = subparsers.add_parser('pub', help='Filters for the time range of CVE data by pubStartDate & pubEndDate') # Add arguments to the group - last_modified_group.add_argument("--last_modified_earliest", help="Earliest date for last modified filter", metavar="YYYY-MM-DDThh:mm:ss", type=valid_date) - last_modified_group.add_argument("--last_modified_latest", help="Latest date for last modified filter", metavar="YYYY-MM-DDThh:mm:ss", type=valid_date) - parser.add_argument("--file_time_range", help="Time range for file processing (e.g., 1m)", default="1m", type=parse_time_range) - parser.add_argument("--all_time", action='store_true', help="If set, ignores last modified filters") + # mod_group. + mod_group.add_argument("--earliest", help="Earliest date for last modified filter", metavar="YYYY-MM-DDThh:mm:ss", type=valid_date, required=True) + mod_group.add_argument("--latest", help="Latest date for last modified filter", metavar="YYYY-MM-DDThh:mm:ss", type=valid_date, required=True) + + all_time.add_argument("--earliest", help=f"Earliest date for pubDate filter, default: {PUB_START_DATE.isoformat()}", metavar="YYYY-MM-DDThh:mm:ss", type=valid_date, default=PUB_START_DATE) + all_time.add_argument("--latest", help=f"Latest date for pubDate filter, default: {yesterday().isoformat()}", metavar="YYYY-MM-DDThh:mm:ss", type=valid_date, default=yesterday()) + + for p in subparsers.choices.values(): + p.add_argument("--file_time_range", help="Time range for file processing (e.g., 1m)", default="1m", type=parse_time_range) + # parser.add_argument("--all_time", action='store_true', help="If set, ignores last modified filters") args = parser.parse_args() - if not args.all_time: - if not args.last_modified_earliest or not args.last_modified_latest: - parser.error("--last_modified_earliest and --last_modified_latest are required unless --all_time is set") - if args.last_modified_latest < args.last_modified_earliest: - raise argparse.ArgumentError(last_modified_group, "--last_modified_latest must not be earlier than --last_modified_earliest") + if args.latest < args.earliest: + raise argparse.ArgumentError(mod_group, "--latest must not be earlier than --earliest") return args @@ -110,12 +139,12 @@ def run(): BUNDLE_PATH = PARENT_PATH / "bundles" filter_mode = FilterMode.MOD_DATE - if args.all_time: + if args.mode == 'pub': filter_mode = FilterMode.PUB_DATE - args.last_modified_earliest = dt(1988, 10, 1, tzinfo=timezone.utc) - args.last_modified_latest = (dt.now(timezone.utc) - timedelta(days=1)).replace(hour=23, minute=59, second=59) + args.earliest = args.earliest or PUB_START_DATE + args.latest = args.latest or yesterday() - for time_unit, start_date, end_date in tqdm(get_time_ranges(args.file_time_range, args.last_modified_earliest, args.last_modified_latest)): + for time_unit, start_date, end_date in tqdm(get_time_ranges(args.file_time_range, args.earliest, args.latest)): start_day, end_day = start_date.strftime('%Y_%m_%d-%H_%M_%S'), end_date.strftime('%Y_%m_%d-%H_%M_%S') subdir = start_date.strftime('%Y-%m') if time_unit == 'd' else start_date.strftime('%Y') file_system = OBJECTS_PARENT / f"cve_objects-{start_day}-{end_day}" @@ -140,5 +169,8 @@ def run(): celery_process.kill() +def yesterday(): + return (dt.now(timezone.utc) - timedelta(days=1)).replace(hour=23, minute=59, second=59, microsecond=0) + if __name__ == "__main__": run() \ No newline at end of file diff --git a/cve2stix/config.py b/cve2stix/config.py index 3631750..baf782e 100644 --- a/cve2stix/config.py +++ b/cve2stix/config.py @@ -3,7 +3,7 @@ import json import os import redis -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from dotenv import load_dotenv @@ -37,7 +37,7 @@ class FilterMode(StrEnum): @dataclass class Config: type: str = "cve" - filter_mode: FilterMode = FilterMode.MOD_DATE + filter_mode: FilterMode = field(default=FilterMode.MOD_DATE) CVE2STIX_FOLDER = Path(os.path.abspath(__file__)).parent REPO_FOLDER = CVE2STIX_FOLDER.parent LAST_MODIFIED_TIME = os.getenv('CVE_LAST_MODIFIED_EARLIEST') diff --git a/cve2stix/cpe_match.py b/cve2stix/cpe_match.py index 5bac86b..3eaffba 100644 --- a/cve2stix/cpe_match.py +++ b/cve2stix/cpe_match.py @@ -100,7 +100,7 @@ def get_cpematch(criteria_id: str) -> list[tuple[str, str]]: @lru_cache(maxsize=None) def get_cpe_match(match_string: str) -> list[str]: matches = retrieve_cpematch(datetime.now(timezone('EST')).date()) - return matches[match_string] + return matches.get(match_string, [match_string]) @lru_cache(maxsize=1) def retrieve_cpematch(d: date): @@ -111,9 +111,10 @@ def retrieve_cpematch(d: date): with zipfile.ZipFile(io.BytesIO(resp.content)) as zip: with zip.open("nvdcpematch-1.0.json") as f: matches = ijson.items(f, 'matches.item') - for match in matches: + for count, match in enumerate(matches): match_spec = match["cpe23Uri"] retval[match_spec] = [m["cpe23Uri"] for m in match['cpe_name']] + logging.info(f"retrieve_cpematch: {count=}, {len(retval)=}") return retval diff --git a/cve2stix/main.py b/cve2stix/main.py index d82e090..8e28ca3 100644 --- a/cve2stix/main.py +++ b/cve2stix/main.py @@ -4,6 +4,7 @@ import dataclasses import math +import pytz import requests import time from datetime import datetime, timedelta, date @@ -53,9 +54,9 @@ def map_identity(config, object_list): def _parse_date(d: str|datetime|date): if isinstance(d, str): - d = datetime.strptime(d, "%Y-%m-%dT%H:%M:%S") + d = pytz.utc.localize(datetime.strptime(d, "%Y-%m-%dT%H:%M:%S")) elif isinstance(d, date): - d = datetime.fromtimestamp(d.timestamp()) + d = datetime.fromtimestamp(d.timestamp(), tz=pytz.utc) return d def main(c_start_date=None, c_end_date=None, filename=None, config = Config()): @@ -80,6 +81,7 @@ def main(c_start_date=None, c_end_date=None, filename=None, config = Config()): ) current_date = end_date + print(params, current_date, c_start_date, type(c_end_date)) tasks = [cve_syncing_task.s(param[0], param[1], dataclasses.asdict(config)) for param in params] res = chord(group(tasks))(preparing_results.s(dataclasses.asdict(config), filename)) resp = res.get()