Skip to content

Commit

Permalink
10.0.2
Browse files Browse the repository at this point in the history
  • Loading branch information
CERT-EDF-MAINTAINER committed Sep 13, 2024
1 parent 9d1c37f commit d8ca255
Show file tree
Hide file tree
Showing 31 changed files with 2,255 additions and 1,322 deletions.
32 changes: 11 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,16 @@

## Introduction

Generaptor is a platform-agnostic command line tool to generate a
[Velociraptor](https://github.com/velocidex/velociraptor) offline collector
based on pre-configured or customizable collection profiles.
Generaptor is a platform-agnostic command line tool to generate a [Velociraptor](https://github.com/velocidex/velociraptor) offline collector based on pre-configured or customizable collection profiles.

All platforms can generate collectors for all targets, there is no limitation
thanks to Python on the generation side and velociraptor on the configuration
repacking side.
All platforms can generate collectors for all targets, there is no limitation thanks to Python on the generation side and velociraptor on the configuration repacking side.

Generation of Darwin collector is not implemented for the moment due to the lack
of use case on our side. Feel free to open a pull request regarding this feature.
Generation of Darwin collector is not implemented for the moment due to the lack of use case on our side. Feel free to open a pull request regarding this feature.


## Dependencies

Dependencies are listed in [setup.cfg](setup.cfg) under `install_requires` option.
Dependencies are listed in [pyproject.toml](pyproject.toml) under `dependencies` option.


## Setup
Expand All @@ -35,7 +30,7 @@ python3 -m pip install git+https://github.com/cert-edf/generaptor

```bash
# First, we fetch latest stable release of velociraptor
generaptor refresh
generaptor update
# Then create a collector for windows for instance
generaptor generate -o /data/case/case-001/collectors/ windows
# keep the private key secret in a password vault to be able to decrypt the archive
Expand All @@ -60,25 +55,20 @@ generaptor generate windows --device D:
# Collector targets customization (interactive)
generaptor generate --custom windows
# Collector targets customization using a profile (non interactive)
echo '{"targets":["IISLogFiles"]}' > iis_server.json
echo '{"targets":["WebServer/IIS"]}' > iis_server.json
generaptor generate --custom-profile iis_server.json windows
```


## Expert Collector Generation

Generaptor can use optional configuration files put in `$HOME/.config/generaptor`
directory to generate collectors.
Generaptor can use optional configuration files put in `$HOME/.config/generaptor` directory to generate collectors.

Target and rules can be extended using this configuration directory.

VQL templates can also be modified to add custom artifacts or modify the
collector behavior. Please refer to
[Velociraptor documentation](https://docs.velociraptor.app/) to learn how to
master VQL and write your own configuration files.
VQL templates can also be modified to add custom artifacts or modify the collector behavior. Please refer to [Velociraptor documentation](https://docs.velociraptor.app/) to learn how to master VQL and write your own configuration files.

After starting generaptor for the first time, you can use the following commands
to initialize the configuration directory
After starting generaptor for the first time, you can use the following commands to initialize the configuration directory

```bash
# Add variables for directories in current environment
Expand All @@ -90,8 +80,8 @@ head -n 1 "${CACHE}/linux.targets.csv" > "${CONFIG}/linux.targets.csv"
head -n 1 "${CACHE}/windows.rules.csv" > "${CONFIG}/windows.rules.csv"
head -n 1 "${CACHE}/windows.targets.csv" > "${CONFIG}/windows.targets.csv"
# Copy VQL templates
cp "${CACHE}/linux.collector.yml" "${CONFIG}/"
cp "${CACHE}/windows.collector.yml" "${CONFIG}/"
cp "${CACHE}/linux.collector.yml.jinja" "${CONFIG}/"
cp "${CACHE}/windows.collector.yml.jinja" "${CONFIG}/"
```


Expand Down
7 changes: 4 additions & 3 deletions generaptor/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Generaptor API
"""
import typing as t

from typing import Optional
from .cache import Cache
from .collection import Collection, CollectionList
from .collector import CollectorConfig, Collector
Expand All @@ -21,9 +22,9 @@
def ruleset_from_targets(
cache: Cache,
config: Config,
targets: t.List[str],
targets: list[str],
operating_system: OperatingSystem,
) -> t.Optional[RuleSet]:
) -> Optional[RuleSet]:
"""Load ruleset for given targets and operating system"""
# load standard ruleset and targetset from cache
rule_set = cache.load_rule_set(operating_system)
Expand Down
34 changes: 21 additions & 13 deletions generaptor/api/cache.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Cache APIs
"""
import typing as t

from shutil import copy
from typing import Optional
from pathlib import Path
from gettext import ngettext
from platform import system, architecture
from platform import system, libc_ver, architecture
from dataclasses import dataclass
from jinja2 import FileSystemLoader, Environment, Template
from .ruleset import RuleSet
Expand All @@ -16,10 +17,14 @@
_HERE = Path(__file__).resolve()
_PKG_DATA_DIR = _HERE.parent.parent / 'data'
_PLATFORM_DISTRIBUTION_MAP = {
'Linux': Distribution(
('Linux', 'glibc'): Distribution(
operating_system=OperatingSystem.LINUX, architecture=Architecture.AMD64
),
'Windows': Distribution(
('Linux', ''): Distribution(
operating_system=OperatingSystem.LINUX,
architecture=Architecture.AMD64_MUSL,
),
('Windows', ''): Distribution(
operating_system=OperatingSystem.WINDOWS,
architecture=Architecture.AMD64,
),
Expand All @@ -42,7 +47,7 @@ def program(self):
"""Cache program directory"""
return self.directory / 'program'

def path(self, filename: str) -> Path:
def path(self, filename: str) -> Optional[Path]:
"""Generate program path for filename"""
filepath = (self.program / filename).resolve()
if not filepath.is_relative_to(self.program):
Expand All @@ -56,14 +61,14 @@ def update(self, do_not_fetch: bool = False) -> bool:
for filepath in self.program.iterdir():
filepath.unlink()
self.program.mkdir(parents=True, exist_ok=True)
_copy_pkg_data_to_cache('*.collector.yml', self.directory)
_copy_pkg_data_to_cache('*.collector.yml.jinja', self.directory)
_copy_pkg_data_to_cache('*.targets.csv', self.directory)
_copy_pkg_data_to_cache('*.rules.csv', self.directory)
return True

def load_rule_set(
self, operating_system: OperatingSystem
) -> t.Optional[RuleSet]:
) -> Optional[RuleSet]:
"""Load rules from cache matching given distribution"""
filepath = self.directory / f'{operating_system.value}.rules.csv'
if not filepath.is_file():
Expand All @@ -80,7 +85,7 @@ def load_rule_set(

def load_target_set(
self, operating_system: OperatingSystem
) -> t.Optional[TargetSet]:
) -> Optional[TargetSet]:
"""Load targets from cache matching given distribution"""
filepath = self.directory / f'{operating_system.value}.targets.csv'
if not filepath.is_file():
Expand All @@ -106,10 +111,10 @@ def vql_template(self, operating_system: OperatingSystem) -> Template:
keep_trailing_newline=True,
)
return environment.get_template(
f'{operating_system.value}.collector.yml'
f'{operating_system.value}.collector.yml.jinja'
)

def template_binary(self, dist: Distribution) -> t.Optional[Path]:
def template_binary(self, dist: Distribution) -> Optional[Path]:
"""Return template binary for distrib"""
try:
return next(self.program.glob(f'*{dist.suffix}'))
Expand All @@ -119,12 +124,15 @@ def template_binary(self, dist: Distribution) -> t.Optional[Path]:
)
return None

def platform_binary(self) -> t.Optional[Path]:
def platform_binary(self) -> Optional[Path]:
"""Platform binary to be used to produce collectors"""
if architecture()[0] != '64bit':
bits, _ = architecture()
if bits != '64bit':
LOGGER.critical("current machine architecture is not supported!")
return None
dist = _PLATFORM_DISTRIBUTION_MAP.get(system())
lib, _ = libc_ver()
platform_distrib_id = (system(), lib)
dist = _PLATFORM_DISTRIBUTION_MAP.get(platform_distrib_id)
if not dist:
LOGGER.critical("current machine distribution is not supported!")
return None
Expand Down
15 changes: 8 additions & 7 deletions generaptor/api/collection.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Collection API
"""
import typing as t

from json import loads
from typing import Optional
from pathlib import Path
from zipfile import ZipFile
from dataclasses import dataclass
Expand All @@ -20,7 +21,7 @@ class Collection:
filepath: Path

@property
def metadata(self) -> t.Mapping[str, str]:
def metadata(self) -> dict[str, str]:
"""Collection metadata"""
if not hasattr(self, '__metadata'):
with ZipFile(self.filepath) as zipf:
Expand All @@ -39,21 +40,21 @@ def checksum(self) -> str:
return getattr(self, '__checksum')

@property
def hostname(self) -> t.Optional[str]:
def hostname(self) -> Optional[str]:
"""Retrieve hostname in metadata"""
return self.metadata.get('hostname')

@property
def device(self) -> t.Optional[str]:
def device(self) -> Optional[str]:
"""Retrieve hostname in metadata"""
return self.metadata.get('device')

@property
def fingerprint(self) -> t.Optional[str]:
def fingerprint(self) -> Optional[str]:
"""Retrieve public key fingerprint in metadata"""
return self.metadata.get('fingerprint_hex')

def secret(self, private_key: RSAPrivateKey) -> t.Optional[str]:
def secret(self, private_key: RSAPrivateKey) -> Optional[str]:
"""Retrieve collection secret"""
b64_enc_secret = self.metadata.get('b64_enc_secret')
if not b64_enc_secret:
Expand Down Expand Up @@ -87,4 +88,4 @@ def extract_to(self, directory: Path, secret: str) -> bool:
return success


CollectionList = t.List[Collection]
CollectionList = list[Collection]
15 changes: 7 additions & 8 deletions generaptor/api/collector.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Collector API
"""
import typing as t

from io import StringIO
from csv import writer, QUOTE_MINIMAL
from typing import Optional
from pathlib import Path
from platform import system
from datetime import datetime
Expand Down Expand Up @@ -36,9 +37,9 @@ class CollectorConfig:
rule_set: RuleSet
certificate: Certificate
distribution: Distribution
dont_be_lazy: t.Optional[bool] = None
vss_analysis_age: t.Optional[int] = None
use_auto_accessor: t.Optional[bool] = None
dont_be_lazy: Optional[bool] = None
vss_analysis_age: Optional[int] = None
use_auto_accessor: Optional[bool] = None

@property
def context(self):
Expand All @@ -57,9 +58,7 @@ def context(self):

def generate(self, cache: Cache, config: Config, filepath: Path):
"""Generate configuration file data"""
vql_template = config.vql_template(
self.distribution.operating_system
)
vql_template = config.vql_template(self.distribution.operating_system)
if vql_template is None:
vql_template = cache.vql_template(
self.distribution.operating_system
Expand All @@ -79,7 +78,7 @@ class Collector:

def generate(
self, cache: Cache, config: Config, directory: Path
) -> t.Optional[t.Tuple[Path, Path]]:
) -> Optional[tuple[Path, Path]]:
"""Generate a configuration file and a pre-configured binary"""
platform_binary = cache.platform_binary()
if not platform_binary:
Expand Down
11 changes: 6 additions & 5 deletions generaptor/api/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Cache APIs
"""
import typing as t

from typing import Optional
from pathlib import Path
from gettext import ngettext
from dataclasses import dataclass
Expand All @@ -19,7 +20,7 @@ class Config:

def load_rule_set(
self, operating_system: OperatingSystem
) -> t.Optional[RuleSet]:
) -> Optional[RuleSet]:
"""Load rules from cache matching given distribution"""
filepath = self.directory / f'{operating_system.value}.rules.csv'
if not filepath.is_file():
Expand All @@ -35,7 +36,7 @@ def load_rule_set(

def load_target_set(
self, operating_system: OperatingSystem
) -> t.Optional[TargetSet]:
) -> Optional[TargetSet]:
"""Load targets from cache matching given distribution"""
filepath = self.directory / f'{operating_system.value}.targets.csv'
if not filepath.is_file():
Expand All @@ -51,9 +52,9 @@ def load_target_set(

def vql_template(
self, operating_system: OperatingSystem
) -> t.Optional[Template]:
) -> Optional[Template]:
"""Load jinja template matching given distribution"""
filename = f'{operating_system.value}.collector.yml'
filename = f'{operating_system.value}.collector.yml.jinja'
template = self.directory / filename
if not template.is_file():
return None
Expand Down
4 changes: 2 additions & 2 deletions generaptor/api/custom_profile.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""CustomProfile API
"""
import typing as t

from json import loads
from pathlib import Path
from dataclasses import dataclass
Expand All @@ -10,7 +10,7 @@
class CustomProfile:
"""Custom profile file"""

targets: t.List[str]
targets: list[str]

@classmethod
def from_filepath(cls, filepath: Path):
Expand Down
1 change: 1 addition & 0 deletions generaptor/api/distribution.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Distribution API
"""

from enum import Enum
from dataclasses import dataclass

Expand Down
9 changes: 7 additions & 2 deletions generaptor/api/ruleset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Rule Set API
"""
import typing as t

from pathlib import Path
from dataclasses import dataclass
from ..helper.csv import stream_csv
Expand All @@ -23,13 +23,18 @@ class Rule:
class RuleSet:
"""Set of rules"""

rules: t.Mapping[int, Rule]
rules: dict[int, Rule]

@property
def count(self) -> int:
"""Count of rules in ruleset"""
return len(self.rules)

@property
def empty(self) -> bool:
"""Determine if ruleset is empty"""
return not bool(self.rules)

@property
def max_uid(self) -> int:
"""Max uid"""
Expand Down
Loading

0 comments on commit d8ca255

Please sign in to comment.