From 33855cf920a8ef76d2bfa414779b7cf96c8c8def Mon Sep 17 00:00:00 2001 From: Andrew Case Date: Sun, 29 Dec 2024 02:20:02 +0000 Subject: [PATCH] Significantly improve the smear/error handling in the netstat plugin --- .../framework/plugins/windows/netstat.py | 157 +++++++++++++----- .../symbols/windows/extensions/network.py | 8 +- 2 files changed, 120 insertions(+), 45 deletions(-) diff --git a/volatility3/framework/plugins/windows/netstat.py b/volatility3/framework/plugins/windows/netstat.py index a1521a8c6..3408c0a3a 100644 --- a/volatility3/framework/plugins/windows/netstat.py +++ b/volatility3/framework/plugins/windows/netstat.py @@ -111,8 +111,21 @@ def parse_bitmap( The list of indices at which a 1 was found. """ ret = [] + # This value is broken in many samples and was causing essentially infinite loops + # Testing showed that 8192 is the current size across all Windows versions + # We give some leeway in case it increases in later versions, while still keeping it sane + # The problematic samples had values that looked like addresses, so in the billions + if bitmap_size_in_byte > 8192 * 10: + return ret + for idx in range(bitmap_size_in_byte): - current_byte = context.layers[layer_name].read(bitmap_offset + idx, 1)[0] + try: + current_byte = context.layers[layer_name].read(bitmap_offset + idx, 1)[ + 0 + ] + except exceptions.InvalidAddressException: + continue + current_offs = idx * 8 for bit in range(8): if current_byte & (1 << bit) != 0: @@ -154,32 +167,37 @@ def enumerate_structures_by_port( ) else: # invalid argument. - return None + return vollog.debug(f"Current Port: {port}") # the given port serves as a shifted index into the port pool lists list_index = port >> 8 truncated_port = port & 0xFF - # constructing port_pool object here so callers don't have to - port_pool = context.object( - net_symbol_table + constants.BANG + "_INET_PORT_POOL", - layer_name=layer_name, - offset=port_pool_addr, - ) - - # first, grab the given port's PortAssignment (`_PORT_ASSIGNMENT`) - inpa = port_pool.PortAssignments[list_index] + try: + # constructing port_pool object here so callers don't have to + port_pool = context.object( + net_symbol_table + constants.BANG + "_INET_PORT_POOL", + layer_name=layer_name, + offset=port_pool_addr, + ) + # first, grab the given port's PortAssignment (`_PORT_ASSIGNMENT`) + inpa = port_pool.PortAssignments[list_index] - # then parse the port assignment list (`_PORT_ASSIGNMENT_LIST`) and grab the correct entry - assignment = inpa.InPaBigPoolBase.Assignments[truncated_port] + # then parse the port assignment list (`_PORT_ASSIGNMENT_LIST`) and grab the correct entry + assignment = inpa.InPaBigPoolBase.Assignments[truncated_port] + except exceptions.InvalidAddressException: + return if not assignment: - return None + return # the value within assignment.Entry is a) masked and b) points inside of the network object # first decode the pointer - netw_inside = cls._decode_pointer(assignment.Entry) + try: + netw_inside = cls._decode_pointer(assignment.Entry) + except exceptions.InvalidAddressException: + return if netw_inside: # if the value is valid, calculate the actual object address by subtracting the offset @@ -188,16 +206,30 @@ def enumerate_structures_by_port( ) yield curr_obj + try: + next_obj_address = cls._decode_pointer(curr_obj.Next) + except exceptions.InvalidAddressException: + return + # if the same port is used on different interfaces multiple objects are created # those can be found by following the pointer within the object's `Next` field until it is empty - while curr_obj.Next: - curr_obj = context.object( - obj_name, - layer_name=layer_name, - offset=cls._decode_pointer(curr_obj.Next) - ptr_offset, - ) + while next_obj_address: + try: + curr_obj = context.object( + obj_name, + layer_name=layer_name, + offset=next_obj_address - ptr_offset, + ) + except exceptions.InvalidAddressException: + return + yield curr_obj + try: + next_obj_address = cls._decode_pointer(curr_obj.Next) + except exceptions.InvalidAddressException: + return + @classmethod def get_tcpip_module( cls, @@ -243,16 +275,25 @@ def parse_hashtable( The hash table entries which are _not_ empty """ # we are looking for entries whose values are not their own address + # smear sanity check from mass testing + if ht_length > 4096: + return + for index in range(ht_length): current_addr = ht_offset + index * alignment - current_pointer = context.object( - net_symbol_table + constants.BANG + "pointer", - layer_name=layer_name, - offset=current_addr, - ) + try: + current_pointer = context.object( + net_symbol_table + constants.BANG + "pointer", + layer_name=layer_name, + offset=current_addr, + ) + except exceptions.InvalidAddressException: + continue + # check if addr of pointer is equal to the value pointed to if current_pointer.vol.offset == current_pointer: continue + yield current_pointer @classmethod @@ -292,11 +333,15 @@ def parse_partitions( tcpip_symbol_table + constants.BANG + "PartitionCount" ).address - part_table_addr = context.object( - net_symbol_table + constants.BANG + "pointer", - layer_name=layer_name, - offset=tcpip_module_offset + part_table_symbol, - ) + try: + part_table_addr = context.object( + net_symbol_table + constants.BANG + "pointer", + layer_name=layer_name, + offset=tcpip_module_offset + part_table_symbol, + ) + except exceptions.InvalidAddressException: + vollog.debug(f"`PartitionTable` not present in memory.") + return # part_table is the actual partition table offset and consists out of a dynamic amount of _PARTITION objects part_table = context.object( @@ -304,10 +349,18 @@ def parse_partitions( layer_name=layer_name, offset=part_table_addr, ) - part_count = int.from_bytes( - context.layers[layer_name].read(tcpip_module_offset + part_count_symbol, 1), - "little", - ) + + try: + part_count = int.from_bytes( + context.layers[layer_name].read( + tcpip_module_offset + part_count_symbol, 1 + ), + "little", + ) + except exceptions.InvalidAddressException: + vollog.debug(f"`PartitionCount` not present in memory.") + return + part_table.Partitions.count = part_count vollog.debug( @@ -316,9 +369,21 @@ def parse_partitions( entry_offset = context.symbol_space.get_type(obj_name).relative_child_offset( "ListEntry" ) - for ctr, partition in enumerate(part_table.Partitions): + + try: + partitions = part_table.Partitions + except exceptions.InvalidAddressException: + vollog.debug("Partitions member not present in memory") + return + + for ctr, partition in enumerate(partitions): vollog.debug(f"Parsing partition {ctr}") - if partition.Endpoints.NumEntries > 0: + try: + num_entries = partition.Endpoints.NumEntries + except exceptions.InvalidAddressException: + continue + + if num_entries > 0: for endpoint_entry in cls.parse_hashtable( context, layer_name, @@ -402,6 +467,7 @@ def find_port_pools( upp_symbol = context.symbol_space.get_symbol( tcpip_symbol_table + constants.BANG + "UdpPortPool" ).address + upp_addr = context.object( net_symbol_table + constants.BANG + "pointer", layer_name=layer_name, @@ -498,13 +564,16 @@ def list_sockets( # then, towards the UDP and TCP port pools # first, find their addresses - upp_addr, tpp_addr = cls.find_port_pools( - context, - layer_name, - net_symbol_table, - tcpip_symbol_table, - tcpip_module_offset, - ) + try: + upp_addr, tpp_addr = cls.find_port_pools( + context, + layer_name, + net_symbol_table, + tcpip_symbol_table, + tcpip_module_offset, + ) + except (exceptions.SymbolError, exceptions.InvalidAddressException): + vollog.debug("Unable to reconstruct port pools") # create port pool objects at the detected address and parse the port bitmap upp_obj = context.object( diff --git a/volatility3/framework/symbols/windows/extensions/network.py b/volatility3/framework/symbols/windows/extensions/network.py index 478deab6b..e41ac6a05 100644 --- a/volatility3/framework/symbols/windows/extensions/network.py +++ b/volatility3/framework/symbols/windows/extensions/network.py @@ -219,7 +219,13 @@ def get_remote_address(self): return None def is_valid(self): - if self.State not in self.State.choices.values(): + # netstat calls this before validating the object itself + try: + state = self.State + except exceptions.InvalidAddressException: + return False + + if state not in state.choices.values(): vollog.debug( f"{type(self)} 0x{self.vol.offset:x} invalid due to invalid tcp state {self.State}" )