Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

main loop refactor: remove redundant queue and no active waiting for results #690

Merged
merged 3 commits into from
Oct 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 110 additions & 121 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use binwalk::AnalysisResults;
use log::{debug, error};
use std::collections::VecDeque;
use std::panic;
use std::process;
use std::sync::mpsc;
Expand All @@ -22,23 +22,12 @@ fn main() {
// Only use one thread if unable to auto-detect available core info
const DEFAULT_WORKER_COUNT: usize = 1;

let binwalker: binwalk::Binwalk;
let mut output_directory: Option<String> = None;

// Statistics variables; keeps track of analyzed file count and total analysis run time
let mut file_count: usize = 0;
let run_time = time::Instant::now();

// Boolean flag to indicate if a result should be displayed to screen or not
let mut display_results: bool;

// Queue of files to be analyzed
let mut target_files = VecDeque::new();

// Thread pool related variables
let mut pending_jobs = 0;
let mut available_workers = DEFAULT_WORKER_COUNT;

// Initialize logging
env_logger::init();

Expand Down Expand Up @@ -82,34 +71,27 @@ fn main() {
}

// Initialize binwalk
match binwalk::Binwalk::configure(
let binwalker = binwalk::Binwalk::configure(
cliargs.file_name,
output_directory,
cliargs.include,
cliargs.exclude,
None,
) {
Err(_) => {
panic!("Binwalk initialization failed");
}
Ok(bw) => {
binwalker = bw;
}
}
)
.expect("Binwalk initialization failed");

// If the user specified --threads, honor that request; else, auto-detect available parallelism
match cliargs.threads {
Some(threads) => {
available_workers = threads;
}
None => {
// Get CPU core info
match thread::available_parallelism() {
Err(e) => error!("Failed to retrieve CPU core info: {}", e),
Ok(coreinfo) => available_workers = coreinfo.get(),
let available_workers = cliargs.threads.unwrap_or_else(|| {
// Get CPU core info
match thread::available_parallelism() {
// In case of error use the default
Err(e) => {
error!("Failed to retrieve CPU core info: {e}");
DEFAULT_WORKER_COUNT
}
Ok(coreinfo) => coreinfo.get(),
}
}
});

// Sanity check the number of available worker threads
if available_workers < 1 {
Expand All @@ -124,13 +106,6 @@ fn main() {
let workers = ThreadPool::new(available_workers);
let (worker_tx, worker_rx) = mpsc::channel();

// Queue the specified file for analysis
debug!(
"Queuing initial target file: {}",
binwalker.base_target_file
);
target_files.insert(target_files.len(), binwalker.base_target_file.clone());

/*
* Set a custom panic handler.
* This ensures that when any thread panics, the default panic handler will be invoked
Expand All @@ -142,105 +117,100 @@ fn main() {
process::exit(-1);
}));

// Spawn the first worker with the base file
debug!(
"Queuing initial target file: {}",
binwalker.base_target_file
);
spawn_worker(
&workers,
binwalker.clone(),
binwalker.base_target_file.clone(),
cliargs.extract,
worker_tx.clone(),
);

// Keep track of results expected, start with 1 for the base target file
let mut expected_results: usize = 1;

/*
* Main loop.
* Loop until all pending thread jobs are complete and there are no more files in the queue.
*/
while target_files.is_empty() == false || pending_jobs > 0 {
// If there are files in the queue and there is at least one worker not doing anything
if target_files.is_empty() == false && pending_jobs < available_workers {
// Get the next file in the list
let target_file = target_files
.pop_front()
.expect("Failed to retrieve the name of the next file to scan");

// Clone the transmit channel so the worker thread can send response data back to this main thread
let worker_tx = worker_tx.clone();

// Clone binwalk config data for worker thread
let binworker = binwalker.clone();

/* Start of worker thread code */
workers.execute(move || {
// Analyze target file, with extraction, if specified
let results = binworker.analyze(&target_file, cliargs.extract);
// Report file results back to main thread
if let Err(e) = worker_tx.send(results) {
panic!(
"Worker thread for {} failed to send results back to main thread: {}",
target_file, e
);
}
});
/* End of worker thread code */

// Increment pending jobs counter
pending_jobs += 1;
loop {
// If no further results are expected, exit the loop.
if expected_results < 1 {
break;
}

// Don't let the main loop eat up CPU cycles if there's no back log of work to be done
if target_files.is_empty() == true {
let sleep_time = time::Duration::from_millis(1);
thread::sleep(sleep_time);
}
// Wait for a result from a worker
let results = worker_rx
.recv()
.expect("Failed to read from worker channel");

// If there are pending jobs, check to see if any have responded with some results
if pending_jobs > 0 {
// Read in analysis results from a worker thread
if let Ok(results) = worker_rx.try_recv() {
// If we got some results that means a worker thread has completed, decrement pending jobs counter
pending_jobs -= 1;
expected_results -= 1;

// Keep a tally of how many files have been analyzed
file_count += 1;
// Keep a tally of how many files have been analyzed
file_count += 1;

// Log analysis results to JSON file
json::log(&cliargs.log, json::JSONType::Analysis(results.clone()));
// Log analysis results to JSON file
json::log(&cliargs.log, json::JSONType::Analysis(results.clone()));

// Nothing found? Nothing else to do for this file.
if results.file_map.len() == 0 {
debug!("Found no results for file {}", results.file_path);
continue;
}
// Nothing found? Nothing else to do for this file.
if results.file_map.len() == 0 {
debug!("Found no results for file {}", results.file_path);
continue;
}

/*
* For brevity, when analyzing more than one file only display subsequent files whose results
* contain signatures that we always want displayed, or which contain extractable signatures.
* This can be overridden with the --verbose command line flag.
*/
if file_count == 1 || cliargs.verbose == true {
display_results = true;
} else {
display_results = false;

if results.extractions.len() > 0 {
// Boolean flag to indicate if a result should be displayed to screen or not
let mut display_results: bool;

/*
* For brevity, when analyzing more than one file only display subsequent files whose results
* contain signatures that we always want displayed, or which contain extractable signatures.
* This can be overridden with the --verbose command line flag.
*/
if file_count == 1 || cliargs.verbose == true {
display_results = true;
} else {
display_results = false;

if results.extractions.len() > 0 {
display_results = true;
} else {
for signature in &results.file_map {
if signature.always_display == true {
display_results = true;
} else {
for signature in &results.file_map {
if signature.always_display == true {
display_results = true;
break;
}
}
break;
}
}
}
};

// Print signature & extraction results
if display_results == true {
display::print_analysis_results(cliargs.quiet, cliargs.extract, &results);
}
// Print signature & extraction results
if display_results == true {
display::print_analysis_results(cliargs.quiet, cliargs.extract, &results);
}

// If running recursively, add extraction results to list of files to analyze
if cliargs.matryoshka {
for (_signature_id, extraction_result) in results.extractions.into_iter() {
if extraction_result.do_not_recurse == false {
for file_path in extractors::common::get_extracted_files(
&extraction_result.output_directory,
) {
debug!("Queuing {} for analysis", file_path);
target_files.insert(target_files.len(), file_path.clone());
}
}
// If running recursively, add extraction results to list of files to analyze
if cliargs.matryoshka {
for (_signature_id, extraction_result) in results.extractions.into_iter() {
if extraction_result.do_not_recurse == false {
for file_path in
extractors::common::get_extracted_files(&extraction_result.output_directory)
{
debug!("Queuing {file_path} for analysis");

// Spawn a new worker for the new file
spawn_worker(
&workers,
binwalker.clone(),
file_path,
cliargs.extract,
worker_tx.clone(),
);

expected_results += 1;
}
}
}
Expand All @@ -256,3 +226,22 @@ fn main() {
binwalker.patterns.len(),
);
}

fn spawn_worker(
pool: &ThreadPool,
bw: binwalk::Binwalk,
target_file: String,
do_extraction: bool,
worker_tx: mpsc::Sender<AnalysisResults>,
) {
pool.execute(move || {
// Analyze target file, with extraction, if specified
let results = bw.analyze(&target_file, do_extraction);
// Report file results back to main thread
if let Err(e) = worker_tx.send(results) {
panic!(
"Worker thread for {target_file} failed to send results back to main thread: {e}"
);
}
});
}