Skip to content

Commit

Permalink
Check path (#31)
Browse files Browse the repository at this point in the history
adds check_path flag to cwe_checker for finding paths from user input functions to CWE hits.
  • Loading branch information
tbarabosch authored and Enkelmann committed Sep 11, 2019
1 parent 56bf497 commit 75d6c2b
Show file tree
Hide file tree
Showing 27 changed files with 438 additions and 56 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dev
- Refactoring of logging and JSON support via --json (PR #30)
- Added file output support via --out (PR #30)
- Surpress logging of info, error and warning to STDOUT via --no-logging (PR #32)
- Added check-path feature via --check-path that searches paths between interesting input functions and cwe hits (PR #31)

0.2 (2019-06-25)
=====
Expand Down
29 changes: 28 additions & 1 deletion cwe_checker_to_ida/CweCheckerParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@
'CWE787': RED,
}

class CheckPath(object):

def __init__(self, source, source_addr, destination, destination_addr, path_str):
self.source = source
self.source_addr = self.__fix_address(source_addr)
self.destination = self.__fix_address(destination)
self.destination_addr = self.__fix_address(destination_addr)
self.path_str = self.__fix_address(path_str)
self.color = None
self.highlight = False

@staticmethod
def __fix_address(address):
return address.replace(':32u', '').replace(':64u', '')

class CweWarning(object):

Expand Down Expand Up @@ -57,7 +71,20 @@ def _parse_cwe_warnings(j):

return result

@staticmethod
def _parse_check_path(j):
result = []

if 'check_path' in j:
for p in j['check_path']:
check_path = CheckPath(p['source'], p['source_addr'], p['destination'], p['destination_addr'], p['path_str'])
result.append(check_path)

return result

def parse(self):
with open(self._result_path) as fhandle:
j = json.load(fhandle)
return self._parse_cwe_warnings(j)
warnings = self._parse_cwe_warnings(j)
check_path = self._parse_check_path(j)
return warnings + check_path
20 changes: 14 additions & 6 deletions cwe_checker_to_ida/Generator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from CweCheckerParser import CweWarning

class IdaGenerator(object):

def __init__(self, results):
Expand All @@ -6,11 +8,17 @@ def __init__(self, results):
def generate(self):
script = "import sark\nimport idaapi\n"
for res in self._results:
if res.highlight and res.address:
first_address = res.address[0]
script += "sark.Line(%s).color = %s\n" % (first_address, res.color)
script += "sark.Line(%s).comments.regular = '%s'\n" % (first_address, res.description)
script += "print('[ %s ] %s')\n" % (first_address, res.description)
if isinstance(res, CweWarning):
if res.highlight and res.address:
first_address = res.address[0]
script += "sark.Line(%s).color = %s\n" % (first_address, res.color)
script += "sark.Line(%s).comments.regular = '%s'\n" % (first_address, res.description)
script += "print('[ %s ] %s')\n" % (first_address, res.description)
else:
script += "print('[ GENERAL ] %s')\n" % res.description
else:
script += "print('[ GENERAL ] %s')\n" % res.description
script += "print('[CheckPath] %s ( %s ) -> %s via %s')\n" % (res.source,
res.source_addr,
res.destination,
res.path_str)
return script
2 changes: 2 additions & 0 deletions index.mld
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ If you want to print the output to a file with [--cwe-checker-out], you also nee

All command line options have to be prefixed with [--cwe-checker] (so that BAP knows to forward them to the {i cwe_checker} plugin).
The available command line options are:
- [-check-path] Find paths between input functions (configurable in the configuration file) and CWE hits.
Should be used together with the [-partial] command line option if you are only interested in paths to specific CWEs.
- [-config=[FILE]] Use [[FILE]] as the configuration file.
If you omit this option, {i cwe_checker} uses a standard configuration file located at [src/config.json].
- [-module-versions] Prints the version numbers of each check.
Expand Down
20 changes: 15 additions & 5 deletions plugins/cwe_checker/cwe_checker.ml
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ let full_run project config =
List.iter known_modules ~f:(fun cwe -> execute_cwe_module cwe json program project tid_address_map)
end

let main config module_versions partial_update json_output file_output no_logging project =

let main config module_versions partial_update check_path json_output file_output no_logging project =
if no_logging then
begin
Log_utils.turn_off_logging ()
Expand Down Expand Up @@ -105,14 +105,23 @@ let main config module_versions partial_update json_output file_output no_loggin
full_run project config
else
partial_run project config partial_update;
if check_path then
begin
let prog = Project.program project in
let tid_address_map = Address_translation.generate_tid_map prog in
let json = Yojson.Basic.from_file config in
let check_path_sources = Json_utils.get_symbols_from_json json "check_path" in
let check_path_sinks = Log_utils.get_cwe_warnings () in
Check_path.check_path prog tid_address_map check_path_sources check_path_sinks
end;
if json_output then
begin
match Project.get project filename with
| Some fname -> Log_utils.emit_cwe_warnings_json fname file_output
| None -> Log_utils.emit_cwe_warnings_json "" file_output
| Some fname -> Log_utils.emit_json fname file_output
| None -> Log_utils.emit_json "" file_output
end
else
Log_utils.emit_cwe_warnings_native file_output
Log_utils.emit_native file_output
end
end

Expand All @@ -123,8 +132,9 @@ module Cmdline = struct
let json_output = flag "json" ~doc:"Outputs the result as JSON."
let file_output = param string "out" ~doc:"Path to output file."
let no_logging = flag "no-logging" ~doc:"Outputs no logging (info, error, warning). This does not pollute STDOUT when output json to it."
let check_path = flag "check-path" ~doc:"Checks if there is a path from an input function to a CWE hit."
let partial_update = param string "partial" ~doc:"Comma separated list of modules to apply on binary, e.g. 'CWE332,CWE476,CWE782'"
let () = when_ready (fun ({get=(!!)}) -> Project.register_pass' ~deps:["callsites"] (main !!config !!module_versions !!partial_update !!json_output !!file_output !!no_logging))
let () = when_ready (fun ({get=(!!)}) -> Project.register_pass' ~deps:["callsites"] (main !!config !!module_versions !!partial_update !!check_path !!json_output !!file_output !!no_logging))
let () = manpage [
`S "DESCRIPTION";
`P "This plugin checks various CWEs such as Insufficient Entropy in PRNG (CWE-332) or Use of Potentially Dangerous Function (CWE-676)"
Expand Down
6 changes: 3 additions & 3 deletions plugins/cwe_checker_emulation/cwe_checker_emulation.ml
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ let main json_output file_output proj =
if json_output then
begin
match Project.get proj filename with
| Some fname -> Log_utils.emit_cwe_warnings_json fname file_output
| None -> Log_utils.emit_cwe_warnings_json "" file_output
| Some fname -> Log_utils.emit_json fname file_output
| None -> Log_utils.emit_json "" file_output
end
else
Log_utils.emit_cwe_warnings_native file_output
Log_utils.emit_native file_output

module Cmdline = struct
open Config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ let main json_output file_output project =
if json_output then
begin
match Project.get project filename with
| Some fname -> Log_utils.emit_cwe_warnings_json fname file_output
| None -> Log_utils.emit_cwe_warnings_json "" file_output
| Some fname -> Log_utils.emit_json fname file_output
| None -> Log_utils.emit_json "" file_output
end
else
Log_utils.emit_cwe_warnings_native file_output
Log_utils.emit_native file_output

module Cmdline = struct
open Config
Expand Down
141 changes: 141 additions & 0 deletions src/analysis/check_path.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
open Core_kernel
open Bap.Std
open Graphlib.Std
open Format

include Self()

module CG = Graphs.Callgraph
module CFG = Graphs.Tid

type proof =
| Calls of CG.edge path
| Sites of CFG.edge path

(** Taken from https://stackoverflow.com/questions/8373460/substring-check-in-ocaml *)
let contains_substring search target =
String.substr_index ~pattern:search target <> None

let format_path get_source get_destination path tid_map =
let e_count = List.length (Seq.to_list (Path.edges path)) in
if e_count = 0 then "()" else
begin
let format_node n = sprintf "%s" (Address_translation.translate_tid_to_assembler_address_string n tid_map) in
let formated_start_node = format_node (get_source (Path.start path)) in
let formated_rest_nodes = List.map (Seq.to_list @@ Path.edges path) ~f:(fun e -> format_node (get_destination e)) in
let formated_full_path = "(" ^ formated_start_node ^ ", " ^ (String.concat ~sep:", " formated_rest_nodes) ^ ")" in
formated_full_path
end

let find_subfunction_name program name =
Term.enum sub_t program
|> Seq.find_map ~f:(fun s -> Option.some_if (contains_substring name (Sub.name s)) (Term.tid s))

let get_tids_from_cwe_hit (cwe_hit: Log_utils.CweWarning.t) =
cwe_hit.tids

let reaches cg callee target =
Graphlib.is_reachable (module CG) cg callee target

(* ignores indirect calls and jumps as well as return statements and interupts *)
let callsites cg target sub =
Term.enum blk_t sub |>
Seq.concat_map ~f:(fun blk ->
Term.enum jmp_t blk |> Seq.filter_map ~f:(fun j ->
match Jmp.kind j with
| Goto _ | Ret _ | Int (_,_) -> None
| Call destination -> begin match Call.target destination with
| Direct tid when reaches cg tid target -> Some (Term.tid blk)
| _ -> None
end))

let verify source destination program : proof option =
let cg = Program.to_graph program in
match Graphlib.shortest_path (module CG) cg source destination with
| Some path -> Some (Calls path)
| None ->
Term.enum sub_t program |> Seq.find_map ~f:(fun sub ->
let g = Sub.to_graph sub in
Seq.find_map (callsites cg source sub) ~f:(fun sc ->
Seq.find_map (callsites cg destination sub) ~f:(fun dc ->
if Tid.equal sc dc then None
else Graphlib.shortest_path (module CFG) g sc dc))) |>
Option.map ~f:(fun p -> Sites p)

let get_fst_tid_from_cwe_hit (cwe_hit: Log_utils.CweWarning.t) =
match cwe_hit.tids with
| [] -> None
| hd :: _ -> Some (Bap.Std.Tid.from_string_exn hd)

let cwe_hit_fst_addr cwe_hit =
match get_tids_from_cwe_hit cwe_hit with
| [] -> Bap.Std.Tid.from_string_exn "0x00"
| hd :: _ -> Bap.Std.Tid.from_string_exn hd

let block_has_callsite blk t =
Term.enum jmp_t blk |>
Seq.exists ~f:(fun j ->
match Jmp.kind j with
| Goto _ | Ret _ | Int (_,_) -> false
| Call destination -> begin match Call.target destination with
| Direct tid -> tid = t
| _ -> false
end)

let collect_callsites program t =
Term.enum sub_t program
|> Seq.filter_map ~f:(fun s -> if Term.enum blk_t s |>
Seq.exists ~f:(fun b -> block_has_callsite b t) then Some s else None)
|> Seq.map ~f:(fun s -> Term.tid s)

let sub_has_tid sub tid =
Term.enum blk_t sub
|> Seq.exists ~f:(fun blk -> Term.tid blk = tid || Blk.elts blk
|> Seq.exists ~f:(fun e -> match e with
| `Def d -> Term.tid d = tid
| `Jmp j -> Term.tid j = tid
| `Phi p -> Term.tid p = tid ))

let find_sub_tid_of_term_tid program tid =
match tid with
| Some t -> let s = Term.enum sub_t program
|> Seq.find ~f:(fun s -> sub_has_tid s t) in
begin
match s with
| Some f -> Some (Term.tid f)
| None -> None
end
| None -> None

let log_path p source source_tid destination tid_map =
let source_addr = Address_translation.translate_tid_to_assembler_address_string source_tid tid_map in
let destination_addr = Address_translation.translate_tid_to_assembler_address_string
(cwe_hit_fst_addr destination) tid_map in
begin match p with
| Calls p ->
let path_str = format_path CG.Edge.src CG.Edge.dst p tid_map in
let current_path = Log_utils.check_path_factory source source_addr destination_addr destination_addr ~path:[] ~path_str:path_str in
Log_utils.collect_check_path current_path
| Sites p -> let path_str = format_path CFG.Edge.src CFG.Edge.dst p tid_map in
let current_path = Log_utils.check_path_factory source source_addr destination_addr destination_addr ~path:[] ~path_str:path_str in
Log_utils.collect_check_path current_path
end

let verify_one program tid_map source destination source_tid destination_tid =
match verify source_tid destination_tid program with
| None -> ()
| Some p -> log_path p source source_tid destination tid_map

let find_source_sink_pathes source destination program tid_map =
match Option.both (find_subfunction_name program source) (find_sub_tid_of_term_tid program (get_fst_tid_from_cwe_hit destination)) with
| None -> () (*one or both functions are not utilized.*)
| Some (callsite_tid, destination_tid) ->
begin
collect_callsites program callsite_tid
|> Seq.iter ~f:(fun source_tid -> verify_one program tid_map source destination source_tid destination_tid )
end


let check_path prog tid_map input_functions cwe_hits =
List.iter input_functions ~f:(fun f ->
List.iter cwe_hits ~f:(fun h -> find_source_sink_pathes f h prog tid_map))
14 changes: 14 additions & 0 deletions src/analysis/check_path.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
(**
This analyzer module checks if there exists a path from an input function (e.g. recv or scanf) to a location of a CWE hit. The existence of such a path is great news: we might trigger the CWE hit via user provided input. This helps analysts to further priotize the CWE hits.
The analysis prooves that there is a path that fulfills the following constraints:
- it starts at the location of an input function
- it ends at a CWE hit
This module is loosely based on the BAP tutorial (https://github.com/BinaryAnalysisPlatform/bap-tutorial/blob/master/src/path_check/path_check.ml).
*)

val name : string
val version : string

val check_path : Bap.Std.program Bap.Std.term -> Bap.Std.word Bap.Std.Tid.Map.t -> string list -> Log_utils.CweWarning.t list -> unit
9 changes: 5 additions & 4 deletions src/checkers/cwe_190.ml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ let contains_multiplication d =
let check_multiplication_before_symbol _proj _prog _sub blk jmp tid_map symbols =
Seq.iter (Term.enum def_t blk)
~f:(fun d -> if contains_multiplication d then
let address = (Address_translation.translate_tid_to_assembler_address_string (Term.tid blk) tid_map) in
let symbol = (Symbol_utils.get_symbol_name_from_jmp jmp symbols) in
let description = sprintf "(Integer Overflow or Wraparound) Potential overflow due to multiplication at %s (%s)" address symbol in
let cwe_warning = cwe_warning_factory name version description ~addresses:[address] ~symbols:[symbol] in
let description = "(Integer Overflow or Wraparound) Potential overflow due to multiplication" in
let addresses = [(Address_translation.translate_tid_to_assembler_address_string (Term.tid blk) tid_map)] in
let tids = [Address_translation.tid_to_string @@ Term.tid blk] in
let symbols = [(Symbol_utils.get_symbol_name_from_jmp jmp symbols)] in
let cwe_warning = cwe_warning_factory name version description ~addresses ~tids ~symbols in
collect_cwe_warning cwe_warning)

let check_cwe prog proj tid_map symbol_names _ =
Expand Down
5 changes: 3 additions & 2 deletions src/checkers/cwe_243.ml
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ let check_subfunction prog tid_map sub pathes =
let path_checks = List.map pathes ~f:(fun path -> check_path prog tid_map sub path) in
if not (List.exists path_checks ~f:(fun x -> x = true)) then
let address = (Address_translation.translate_tid_to_assembler_address_string (Term.tid sub) tid_map) in
let symbol = (Term.name sub) in
let tid = Address_translation.tid_to_string @@ Term.tid sub in
let symbol = Term.name sub in
let description = sprintf
"(The program utilizes chroot without dropping privileges and/or changing the directory) at %s (%s)"
address
symbol in
let cwe_warning = cwe_warning_factory name version description ~addresses:[address] ~symbols:[symbol] in
let cwe_warning = cwe_warning_factory name version description ~addresses:[address] ~tids:[tid] ~symbols:[symbol] in
collect_cwe_warning cwe_warning
end

Expand Down
2 changes: 2 additions & 0 deletions src/checkers/cwe_367.ml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ let handle_sub sub program tid_map _symbols source_sink_pair =
Seq.iter sink_calls ~f:(fun sink_call ->
if is_reachable sub source_call sink_call then
let address = (Address_translation.translate_tid_to_assembler_address_string (Term.tid sub) tid_map) in
let tid = Address_translation.tid_to_string @@ Term.tid sub in
let symbol = (Term.name sub) in
let other = [["source"; source]; ["sink"; sink]] in
let description = sprintf
Expand All @@ -61,6 +62,7 @@ let handle_sub sub program tid_map _symbols source_sink_pair =
description
~other:other
~addresses:[address]
~tids:[tid]
~symbols:[symbol] in
collect_cwe_warning cwe_warning
else
Expand Down
3 changes: 2 additions & 1 deletion src/checkers/cwe_426.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ let handle_sub sub program tid_map symbols =
if Symbol_utils.sub_calls_symbol program sub "system" then
let symbol = Term.name sub in
let address = Address_translation.translate_tid_to_assembler_address_string (Term.tid sub) tid_map in
let tid = Address_translation.tid_to_string @@ Term.tid sub in
let description = sprintf "(Untrusted Search Path) sub %s at %s may be vulnerable to PATH manipulation."
symbol
address in
let cwe_warning = cwe_warning_factory name version ~addresses:[address] ~symbols:[symbol] description in
let cwe_warning = cwe_warning_factory name version ~addresses:[address] ~tids:[tid] ~symbols:[symbol] description in
collect_cwe_warning cwe_warning
else
()
Expand Down
Loading

0 comments on commit 75d6c2b

Please sign in to comment.