Skip to content

Commit

Permalink
Merge pull request #460 from target/ScanXMLOversensitivityFix
Browse files Browse the repository at this point in the history
Add mimetype check for XML files
  • Loading branch information
phutelmyer authored May 28, 2024
2 parents cc3aeaa + 6394bdb commit 68199ec
Showing 1 changed file with 42 additions and 16 deletions.
58 changes: 42 additions & 16 deletions src/python/strelka/scanners/scan_xml.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, Set
from typing import Any, Dict

from lxml import etree

Expand All @@ -9,64 +9,77 @@
class ScanXml(strelka.Scanner):
"""
Collects metadata and extracts embedded files from XML files.
This scanner parses XML files to collect metadata and extract embedded files based on specified tags.
It is used in forensic and malware analysis to extract and analyze structured data within XML documents.
Scanner Type: Collection
Attributes:
None
Options:
extract_tags (list[str]): Tags whose content is extracted as child files.
metadata_tags (list[str]): Tags whose content is logged as metadata.
## Detection Use Cases
!!! info "Detection Use Cases"
- **Embedded File Extraction**
- Extracts files embedded within specific XML tags.
- **Metadata Extraction**:
- Collects metadata from specific XML tags.
## Known Limitations
!!! warning "Known Limitations"
- Complex or malformed XML structures might lead to incomplete parsing or errors.
- Excessive files may be scanned / collected if XML mimetypes are set in the `backend.yml`
## To Do
!!! question "To Do"
- Improve error handling for malformed XML structures.
- Better extraction of tags / metadata tags
## References
!!! quote "References"
- XML File Format Specification (https://www.w3.org/XML/)
## Contributors
!!! example "Contributors"
- [Josh Liburdi](https://github.com/jshlbrd)
- [Paul Hutelmyer](https://github.com/phutelmyer)
- [Sara Kalupa](https://github.com/skalupa)
"""

def scan(
self, data: bytes, file: strelka.File, options: dict, expire_at: int
) -> None:
"""
Parses XML data to extract metadata and files.
Args:
data: XML data as bytes.
file: File object containing metadata about the scan.
options: Dictionary of scanner options.
expire_at: Time when the scan should be considered expired.
Scans the XML file, extracting data and metadata based on the specified tags,
and emits files as necessary.
If given file is not a XML file, then the scanner will append a flag denoting this and exit
"""

# Prepare options with case-insensitive tag matching
xml_options = {
"extract_tags": [tag.lower() for tag in options.get("extract_tags", [])],
"metadata_tags": [tag.lower() for tag in options.get("metadata_tags", [])],
}

# Initialize scan event data
self.event.setdefault("tags", set())
self.event.setdefault("tag_data", [])
self.event.setdefault("namespaces", set())
self.event["tags"] = []
self.event["tag_data"] = []
self.event["namespaces"] = []
self.event["total"] = {"tags": 0, "extracted": 0}
self.emitted_files: Set[str] = (
set()
) # Tracks emitted files to prevent duplicates
self.emitted_files: list[str] = []

# Parse the XML content
try:
Expand All @@ -81,25 +94,36 @@ def scan(
# Recursively process each node in the XML
self._recurse_node(xml, xml_options)

except etree.XMLSyntaxError as e:
self.flags.append(f"syntax_error: {str(e)}")
except Exception as e:
# If file given is not an XML file, do not proceed with ScanXML
if "text/xml" not in file.flavors.get("mime", []):
self.flags.append(
f"{self.__class__.__name__}: xml_file_format_error: File given to ScanXML is not an XML file, "
f"scanner did not run."
)
else:
self.flags.append(
f"{self.__class__.__name__}: xml_parsing_error: Unable to scan XML file with error: {e}."
)
return

# Finalize the event data for reporting
self.event["tags"] = list(self.event["tags"])
self.event["tag_data"] = list(self.event["tag_data"])
self.event["tags"] = list(set(self.event["tags"]))
self.event["total"]["tags"] = len(self.event["tags"])
self.event["namespaces"] = list(self.event["namespaces"])
self.event["emitted_content"] = list(self.emitted_files)
self.event["namespaces"] = list(set(self.event["namespaces"]))
self.event["emitted_content"] = list(set(self.emitted_files))

# Extract and add Indicators of Compromise (IOCs)
self.add_iocs(extract_iocs_from_string(data.decode("utf-8")))

def _recurse_node(self, node: etree._Element, xml_options: Dict[str, Any]) -> None:
"""
Recursively processes each XML node to extract data and metadata.
Args:
node: The current XML node to process.
xml_options: Options for data extraction and metadata logging.
Iterates through XML nodes, extracting data and collecting metadata as specified
by the scanner options.
"""
Expand All @@ -109,16 +133,16 @@ def _recurse_node(self, node: etree._Element, xml_options: Dict[str, Any]) -> No
tag = tag.lower()

if tag:
self.event["tags"].add(tag)
self.event["tags"].append(tag)
if namespace:
self.event["namespaces"].add(namespace)
self.event["namespaces"].append(namespace)

# Handle specific content extraction and emission
if tag in xml_options["extract_tags"]:
content = node.text.strip() if node.text else ""
if content:
self.emit_file(content, name=tag)
self.emitted_files.add(content)
self.emitted_files.append(content)
self.event["total"]["extracted"] += 1

# Always process attributes to capture any relevant metadata or data for emission
Expand All @@ -133,10 +157,12 @@ def _process_attributes(
) -> None:
"""
Processes XML node attributes to extract or log data.
Args:
node: XML node whose attributes are being processed.
xml_options: Configuration options for the scan.
tag: The tag of the current XML node being processed.
Extracts data from attributes specified in the extract_tags list and logs data
from attributes specified in the metadata_tags list.
"""
Expand Down

0 comments on commit 68199ec

Please sign in to comment.