Skip to content

Commit

Permalink
Fix historic escape errors (#52)
Browse files Browse the repository at this point in the history
* add mode 'mod' and 'pub' #51

* fix date parse #51

* Update README.md

---------

Co-authored-by: Fadl <chaos@efqr.dev>
  • Loading branch information
himynamesdave and fqrious authored Dec 16, 2024
1 parent 5969dd6 commit 3255ab6
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 39 deletions.
41 changes: 24 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,29 +69,29 @@ To see more information about how to set the variables, and what they do, read t
The script to get CVEs can now be executed (in the second terminal window) using;

```shell
python3 cve2stix.py \
python3 cve2stix.py MODE \
--last_modified_earliest date \
--last_modified_latest date \
--file_time_range dictionary
```

* `last_modified_earliest` (required, date in format `YYYY-MM-DDThh:mm:ss`): earliest modified data
* default: none
* `last_modified_latest` (required, date in format `YYYY-MM-DDThh:mm:ss`): used in the the cve2stix/cpe2stix config
* default: none
* `MODE`: either
* `mod`: uses modified date for `--earliest` and `--latest` flags (recommended for daily updates)
* `pub`: uses published date for `--earliest` and `--latest` flags (recommended for backfill -- see note later)
* `earliest` (required, date in format `YYYY-MM-DDThh:mm:ss`): earliest datetime you want
* `latest` (required, date in format `YYYY-MM-DDThh:mm:ss`): latest datetime you want
* `file_time_range` (required): defines how much data should be packed in each output bundle. Use `d` for days, `m` for months, `y` for years. Note, if no results are found for a time period, a bundle will not be generated. This usually explains why you see "missing" bundles for a day or month.
* default `1d` (1 day)
* default `1m` (1 month)

IMPORTANT: if the time between `--last_modified_earliest` and `--last_modified_latest` is greater than 120 days and you select `--file_time_range` = `1y`, the script will batch celery jobs with different `lastModStartDate` and `lastModEndDate` as NVD only allows for a range of 120 days to be specified in a request.

The script will also filter the data created using any values entered in the `.env` file on each run.

e.g. get all cves for the first week of December 2024 (and place into daily bundles)
e.g. get all cves with modified times that are in the first week of December 2024 (and place into daily bundles)

```shell
python3 cve2stix.py \
--last_modified_earliest 2024-12-01T00:00:00 \
--last_modified_latest 2024-12-07T23:59:59 \
mod \
--earliest 2024-12-01T00:00:00 \
--latest 2024-12-07T23:59:59 \
--file_time_range 1d
```

Expand All @@ -100,9 +100,9 @@ Will generate bundle files in directories as follows:
```txt
output
└── bundles
├── cve-bundle-2024_08_01-00_00_00-2024_08_01-23_59_59.json
├── cve-bundle-2024_08_02-00_00_00-2024_08_02-23_59_59.json
├── cve-bundle-2024_08_03-00_00_00-2024_08_03-23_59_59.json
├── cve-bundle-2024_12_01-00_00_00-2024_08_01-23_59_59.json
├── cve-bundle-2024_12_02-00_00_00-2024_08_02-23_59_59.json
├── cve-bundle-2024_12_03-00_00_00-2024_08_03-23_59_59.json
├── ...
```

Expand All @@ -114,15 +114,22 @@ Between 2024-11-19 and 2024-11-21 most of the NVD dataset was modified as part o

You can read more about this at https://www.nist.gov/itl/nvd#november1524.

This is problematic for us, as will result in huge bundles using the normal `modDate` approach.
This is problematic for us, as will result in huge bundles using `mod` mode.

As such, we have build in the `pub` to handle this data more graciously. Because the `pubDate` are more spread out, the resulting will be more manageable sizes.

As such, we have build in the `--all_time` flag to handle this data more graciously. All time mode uses `pubDate` instead of `modDate` to bundle the files. This will start the run from 1988 (first CVE `pubDate` though to day script is executed). e.g.
Recommended backfill (to December 2024);

```shell
python3 cve2stix.py \
--all_time
pub \
--earliest 1988-10-01T00:00:00 \
--latest 2024-11-30T23:59:59 \
--file_time_range 1d
```

(earliest CVE, CVE-1999-0095 was published `1988-10-01T04:00:00.000`).

## Useful supporting tools

* To generate STIX 2.1 Objects: [stix2 Python Lib](https://stix2.readthedocs.io/en/latest/)
Expand Down
64 changes: 48 additions & 16 deletions cve2stix.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
import os
import dotenv
from tqdm import tqdm
import pytz

dotenv.load_dotenv()

PUB_START_DATE = dt(1988, 10, 1, tzinfo=timezone.utc)

def valid_date(s):
try:
return dt.strptime(s, "%Y-%m-%dT%H:%M:%S")
return pytz.utc.localize(dt.strptime(s, "%Y-%m-%dT%H:%M:%S"))
except ValueError:
msg = f"Not a valid date: {s}. Please use the format `YYYY-MM-DDThh:mm:ss`."
raise argparse.ArgumentTypeError(msg)
Expand All @@ -44,26 +47,52 @@ def parse_time_range(s):
raise argparse.ArgumentTypeError(f"Prefix cannot be zero or negative: {s}")
return s

class _HelpAction(argparse._HelpAction):

def __call__(self, parser, namespace, values, option_string=None):
parser.print_help()

# retrieve subparsers from parser
subparsers_actions = [
action for action in parser._actions
if isinstance(action, argparse._SubParsersAction)]
# there will probably only be one subparser_action,
# but better save than sorry
for subparsers_action in subparsers_actions:
# get all subparsers and print help
for choice, subparser in subparsers_action.choices.items():
print(" ========= Mode '{}' ========= ".format(choice))
print(subparser.format_help())

parser.exit()

def parse_args():
parser = argparse.ArgumentParser(description="Helper script for converting CVE and CPE data to STIX format.", allow_abbrev=True)
parser = argparse.ArgumentParser(description="Helper script for converting CVE and CPE data to STIX format.", allow_abbrev=True, add_help=False)
parser.add_argument('--help', action=_HelpAction, help='help for help if you need some help') # add custom help

# Create an argument group for last modified filters (conditionally required)
last_modified_group = parser.add_argument_group('last_modified_filters', 'Filters for the time range of CVE data')
subparsers = parser.add_subparsers(dest='mode', required=True)

mod_group = subparsers.add_parser('mod', help='Filters for the time range of CVE data by lastModStartDate & lastModEndDate')
all_time = subparsers.add_parser('pub', help='Filters for the time range of CVE data by pubStartDate & pubEndDate')

# Add arguments to the group
last_modified_group.add_argument("--last_modified_earliest", help="Earliest date for last modified filter", metavar="YYYY-MM-DDThh:mm:ss", type=valid_date)
last_modified_group.add_argument("--last_modified_latest", help="Latest date for last modified filter", metavar="YYYY-MM-DDThh:mm:ss", type=valid_date)
parser.add_argument("--file_time_range", help="Time range for file processing (e.g., 1m)", default="1m", type=parse_time_range)
parser.add_argument("--all_time", action='store_true', help="If set, ignores last modified filters")
# mod_group.
mod_group.add_argument("--earliest", help="Earliest date for last modified filter", metavar="YYYY-MM-DDThh:mm:ss", type=valid_date, required=True)
mod_group.add_argument("--latest", help="Latest date for last modified filter", metavar="YYYY-MM-DDThh:mm:ss", type=valid_date, required=True)

all_time.add_argument("--earliest", help=f"Earliest date for pubDate filter, default: {PUB_START_DATE.isoformat()}", metavar="YYYY-MM-DDThh:mm:ss", type=valid_date, default=PUB_START_DATE)
all_time.add_argument("--latest", help=f"Latest date for pubDate filter, default: {yesterday().isoformat()}", metavar="YYYY-MM-DDThh:mm:ss", type=valid_date, default=yesterday())

for p in subparsers.choices.values():
p.add_argument("--file_time_range", help="Time range for file processing (e.g., 1m)", default="1m", type=parse_time_range)
# parser.add_argument("--all_time", action='store_true', help="If set, ignores last modified filters")

args = parser.parse_args()

if not args.all_time:
if not args.last_modified_earliest or not args.last_modified_latest:
parser.error("--last_modified_earliest and --last_modified_latest are required unless --all_time is set")

if args.last_modified_latest < args.last_modified_earliest:
raise argparse.ArgumentError(last_modified_group, "--last_modified_latest must not be earlier than --last_modified_earliest")
if args.latest < args.earliest:
raise argparse.ArgumentError(mod_group, "--latest must not be earlier than --earliest")

return args

Expand Down Expand Up @@ -110,12 +139,12 @@ def run():
BUNDLE_PATH = PARENT_PATH / "bundles"

filter_mode = FilterMode.MOD_DATE
if args.all_time:
if args.mode == 'pub':
filter_mode = FilterMode.PUB_DATE
args.last_modified_earliest = dt(1988, 10, 1, tzinfo=timezone.utc)
args.last_modified_latest = (dt.now(timezone.utc) - timedelta(days=1)).replace(hour=23, minute=59, second=59)
args.earliest = args.earliest or PUB_START_DATE
args.latest = args.latest or yesterday()

for time_unit, start_date, end_date in tqdm(get_time_ranges(args.file_time_range, args.last_modified_earliest, args.last_modified_latest)):
for time_unit, start_date, end_date in tqdm(get_time_ranges(args.file_time_range, args.earliest, args.latest)):
start_day, end_day = start_date.strftime('%Y_%m_%d-%H_%M_%S'), end_date.strftime('%Y_%m_%d-%H_%M_%S')
subdir = start_date.strftime('%Y-%m') if time_unit == 'd' else start_date.strftime('%Y')
file_system = OBJECTS_PARENT / f"cve_objects-{start_day}-{end_day}"
Expand All @@ -140,5 +169,8 @@ def run():

celery_process.kill()

def yesterday():
return (dt.now(timezone.utc) - timedelta(days=1)).replace(hour=23, minute=59, second=59, microsecond=0)

if __name__ == "__main__":
run()
4 changes: 2 additions & 2 deletions cve2stix/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import os
import redis
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv
Expand Down Expand Up @@ -37,7 +37,7 @@ class FilterMode(StrEnum):
@dataclass
class Config:
type: str = "cve"
filter_mode: FilterMode = FilterMode.MOD_DATE
filter_mode: FilterMode = field(default=FilterMode.MOD_DATE)
CVE2STIX_FOLDER = Path(os.path.abspath(__file__)).parent
REPO_FOLDER = CVE2STIX_FOLDER.parent
LAST_MODIFIED_TIME = os.getenv('CVE_LAST_MODIFIED_EARLIEST')
Expand Down
5 changes: 3 additions & 2 deletions cve2stix/cpe_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def get_cpematch(criteria_id: str) -> list[tuple[str, str]]:
@lru_cache(maxsize=None)
def get_cpe_match(match_string: str) -> list[str]:
matches = retrieve_cpematch(datetime.now(timezone('EST')).date())
return matches[match_string]
return matches.get(match_string, [match_string])

@lru_cache(maxsize=1)
def retrieve_cpematch(d: date):
Expand All @@ -111,9 +111,10 @@ def retrieve_cpematch(d: date):
with zipfile.ZipFile(io.BytesIO(resp.content)) as zip:
with zip.open("nvdcpematch-1.0.json") as f:
matches = ijson.items(f, 'matches.item')
for match in matches:
for count, match in enumerate(matches):
match_spec = match["cpe23Uri"]
retval[match_spec] = [m["cpe23Uri"] for m in match['cpe_name']]
logging.info(f"retrieve_cpematch: {count=}, {len(retval)=}")
return retval


Expand Down
6 changes: 4 additions & 2 deletions cve2stix/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import dataclasses
import math
import pytz
import requests
import time
from datetime import datetime, timedelta, date
Expand Down Expand Up @@ -53,9 +54,9 @@ def map_identity(config, object_list):

def _parse_date(d: str|datetime|date):
if isinstance(d, str):
d = datetime.strptime(d, "%Y-%m-%dT%H:%M:%S")
d = pytz.utc.localize(datetime.strptime(d, "%Y-%m-%dT%H:%M:%S"))
elif isinstance(d, date):
d = datetime.fromtimestamp(d.timestamp())
d = datetime.fromtimestamp(d.timestamp(), tz=pytz.utc)
return d

def main(c_start_date=None, c_end_date=None, filename=None, config = Config()):
Expand All @@ -80,6 +81,7 @@ def main(c_start_date=None, c_end_date=None, filename=None, config = Config()):
)
current_date = end_date

print(params, current_date, c_start_date, type(c_end_date))
tasks = [cve_syncing_task.s(param[0], param[1], dataclasses.asdict(config)) for param in params]
res = chord(group(tasks))(preparing_results.s(dataclasses.asdict(config), filename))
resp = res.get()
Expand Down

0 comments on commit 3255ab6

Please sign in to comment.