Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Significantly improve the smear/error handling in the netstat plugin #1492

Merged
merged 2 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 113 additions & 44 deletions volatility3/framework/plugins/windows/netstat.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,21 @@ def parse_bitmap(
The list of indices at which a 1 was found.
"""
ret = []
# This value is broken in many samples and was causing essentially infinite loops
# Testing showed that 8192 is the current size across all Windows versions
# We give some leeway in case it increases in later versions, while still keeping it sane
# The problematic samples had values that looked like addresses, so in the billions
if bitmap_size_in_byte > 8192 * 10:
return ret

for idx in range(bitmap_size_in_byte):
current_byte = context.layers[layer_name].read(bitmap_offset + idx, 1)[0]
try:
current_byte = context.layers[layer_name].read(bitmap_offset + idx, 1)[
0
]
except exceptions.InvalidAddressException:
continue

current_offs = idx * 8
for bit in range(8):
if current_byte & (1 << bit) != 0:
Expand Down Expand Up @@ -154,32 +167,37 @@ def enumerate_structures_by_port(
)
else:
# invalid argument.
return None
return

vollog.debug(f"Current Port: {port}")
# the given port serves as a shifted index into the port pool lists
list_index = port >> 8
truncated_port = port & 0xFF

# constructing port_pool object here so callers don't have to
port_pool = context.object(
net_symbol_table + constants.BANG + "_INET_PORT_POOL",
layer_name=layer_name,
offset=port_pool_addr,
)

# first, grab the given port's PortAssignment (`_PORT_ASSIGNMENT`)
inpa = port_pool.PortAssignments[list_index]
try:
# constructing port_pool object here so callers don't have to
port_pool = context.object(
net_symbol_table + constants.BANG + "_INET_PORT_POOL",
layer_name=layer_name,
offset=port_pool_addr,
)
# first, grab the given port's PortAssignment (`_PORT_ASSIGNMENT`)
inpa = port_pool.PortAssignments[list_index]

# then parse the port assignment list (`_PORT_ASSIGNMENT_LIST`) and grab the correct entry
assignment = inpa.InPaBigPoolBase.Assignments[truncated_port]
# then parse the port assignment list (`_PORT_ASSIGNMENT_LIST`) and grab the correct entry
assignment = inpa.InPaBigPoolBase.Assignments[truncated_port]
except exceptions.InvalidAddressException:
return

if not assignment:
return None
return

# the value within assignment.Entry is a) masked and b) points inside of the network object
# first decode the pointer
netw_inside = cls._decode_pointer(assignment.Entry)
try:
netw_inside = cls._decode_pointer(assignment.Entry)
except exceptions.InvalidAddressException:
return

if netw_inside:
# if the value is valid, calculate the actual object address by subtracting the offset
Expand All @@ -188,16 +206,30 @@ def enumerate_structures_by_port(
)
yield curr_obj

try:
next_obj_address = cls._decode_pointer(curr_obj.Next)
except exceptions.InvalidAddressException:
return

# if the same port is used on different interfaces multiple objects are created
# those can be found by following the pointer within the object's `Next` field until it is empty
while curr_obj.Next:
curr_obj = context.object(
obj_name,
layer_name=layer_name,
offset=cls._decode_pointer(curr_obj.Next) - ptr_offset,
)
while next_obj_address:
try:
curr_obj = context.object(
obj_name,
layer_name=layer_name,
offset=next_obj_address - ptr_offset,
)
except exceptions.InvalidAddressException:
return

yield curr_obj

try:
next_obj_address = cls._decode_pointer(curr_obj.Next)
ikelos marked this conversation as resolved.
Show resolved Hide resolved
except exceptions.InvalidAddressException:
return

@classmethod
def get_tcpip_module(
cls,
Expand Down Expand Up @@ -243,16 +275,25 @@ def parse_hashtable(
The hash table entries which are _not_ empty
"""
# we are looking for entries whose values are not their own address
# smear sanity check from mass testing
if ht_length > 4096:
return

for index in range(ht_length):
current_addr = ht_offset + index * alignment
current_pointer = context.object(
net_symbol_table + constants.BANG + "pointer",
layer_name=layer_name,
offset=current_addr,
)
try:
current_pointer = context.object(
net_symbol_table + constants.BANG + "pointer",
layer_name=layer_name,
offset=current_addr,
)
except exceptions.InvalidAddressException:
continue

# check if addr of pointer is equal to the value pointed to
if current_pointer.vol.offset == current_pointer:
continue

yield current_pointer

@classmethod
Expand Down Expand Up @@ -292,22 +333,34 @@ def parse_partitions(
tcpip_symbol_table + constants.BANG + "PartitionCount"
).address

part_table_addr = context.object(
net_symbol_table + constants.BANG + "pointer",
layer_name=layer_name,
offset=tcpip_module_offset + part_table_symbol,
)
try:
part_table_addr = context.object(
net_symbol_table + constants.BANG + "pointer",
layer_name=layer_name,
offset=tcpip_module_offset + part_table_symbol,
)
except exceptions.InvalidAddressException:
vollog.debug("`PartitionTable` not present in memory.")
return

# part_table is the actual partition table offset and consists out of a dynamic amount of _PARTITION objects
part_table = context.object(
net_symbol_table + constants.BANG + "_PARTITION_TABLE",
layer_name=layer_name,
offset=part_table_addr,
)
part_count = int.from_bytes(
context.layers[layer_name].read(tcpip_module_offset + part_count_symbol, 1),
"little",
)

try:
part_count = int.from_bytes(
context.layers[layer_name].read(
tcpip_module_offset + part_count_symbol, 1
),
"little",
)
except exceptions.InvalidAddressException:
vollog.debug("`PartitionCount` not present in memory.")
return

part_table.Partitions.count = part_count

vollog.debug(
Expand All @@ -316,9 +369,21 @@ def parse_partitions(
entry_offset = context.symbol_space.get_type(obj_name).relative_child_offset(
"ListEntry"
)
for ctr, partition in enumerate(part_table.Partitions):

try:
partitions = part_table.Partitions
except exceptions.InvalidAddressException:
vollog.debug("Partitions member not present in memory")
return

for ctr, partition in enumerate(partitions):
vollog.debug(f"Parsing partition {ctr}")
if partition.Endpoints.NumEntries > 0:
try:
num_entries = partition.Endpoints.NumEntries
except exceptions.InvalidAddressException:
continue

if num_entries > 0:
for endpoint_entry in cls.parse_hashtable(
context,
layer_name,
Expand Down Expand Up @@ -402,6 +467,7 @@ def find_port_pools(
upp_symbol = context.symbol_space.get_symbol(
tcpip_symbol_table + constants.BANG + "UdpPortPool"
).address

upp_addr = context.object(
net_symbol_table + constants.BANG + "pointer",
layer_name=layer_name,
Expand Down Expand Up @@ -498,13 +564,16 @@ def list_sockets(

# then, towards the UDP and TCP port pools
# first, find their addresses
upp_addr, tpp_addr = cls.find_port_pools(
context,
layer_name,
net_symbol_table,
tcpip_symbol_table,
tcpip_module_offset,
)
try:
upp_addr, tpp_addr = cls.find_port_pools(
context,
layer_name,
net_symbol_table,
tcpip_symbol_table,
tcpip_module_offset,
)
except (exceptions.SymbolError, exceptions.InvalidAddressException):
vollog.debug("Unable to reconstruct port pools")

# create port pool objects at the detected address and parse the port bitmap
upp_obj = context.object(
Expand Down
8 changes: 7 additions & 1 deletion volatility3/framework/symbols/windows/extensions/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,13 @@ def get_remote_address(self):
return None

def is_valid(self):
if self.State not in self.State.choices.values():
# netstat calls this before validating the object itself
try:
state = self.State
except exceptions.InvalidAddressException:
return False

if state not in state.choices.values():
vollog.debug(
f"{type(self)} 0x{self.vol.offset:x} invalid due to invalid tcp state {self.State}"
)
Expand Down
Loading