diff --git a/src/lib.rs b/src/lib.rs index 0b6e5e5..20a8861 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,8 @@ use std::path::{Path, PathBuf}; use std::process::{Command as SysCommand, Stdio}; use tracing::{debug, info}; use walkdir::WalkDir; +mod parallel; +use parallel::process_files_parallel; /// Helper macro to write debug statements both to standard debug log and to debug file if set. #[macro_export] @@ -192,7 +194,11 @@ fn build_final_config(cfg: Option) -> FinalConfig { merged_ignore.push(reg); } } - // Merge or add new priority rules + // Clear default priority rules if user provides their own + if !user_cfg.priority_rules.is_empty() { + merged_priority.clear(); + } + // Add user priority rules for user_rule in user_cfg.priority_rules { if user_rule.patterns.is_empty() { continue; @@ -348,14 +354,15 @@ pub fn get_file_priority( _ignore_pats: &[Regex], prio_list: &[PriorityPattern], ) -> i32 { - for prio in prio_list { + // Loop from highest score → lowest + for prio in prio_list.iter().rev() { for pat in &prio.patterns { if pat.is_match(rel_str) { return prio.score; } } } - 40 // fallback + 0 // fallback if nothing matches - lower than any user-defined priority } /// Get the commit time of the most recent change to each file. @@ -531,165 +538,188 @@ pub fn serialize_repo( None }; - // Collect files with their priorities - let mut files: Vec = Vec::new(); - let mut total_size = 0; - let mut current_chunk = 0; - let mut current_chunk_files = Vec::new(); - - // Walk directory tree - for entry in WalkDir::new(base_path) - .follow_links(true) - .into_iter() - .filter_map(|e| e.ok()) - { - let path = entry.path(); - if !path.is_file() { - continue; - } - - // Get path relative to base - let rel_path = path.strip_prefix(base_path).unwrap_or(path); - let rel_str = rel_path.to_string_lossy(); - - // Normalize path separators to forward slashes for consistent pattern matching - #[cfg(windows)] - let rel_str = rel_str.replace('\\', "/"); - - // Skip if matched by gitignore - #[cfg(windows)] - let gitignore_path = rel_path - .to_str() - .map(|s| s.replace('\\', "/")) - .map(PathBuf::from) - .unwrap_or(rel_path.to_path_buf()); - #[cfg(not(windows))] - let gitignore_path = rel_path.to_path_buf(); - - if gitignore.matched(&gitignore_path, false).is_ignore() { - debug!("Skipping {} - matched by gitignore", rel_str); - continue; - } + if stream { + // For streaming, we still use the old single-threaded approach + let mut files: Vec = Vec::new(); + let mut total_size = 0; + let mut current_chunk = 0; + let mut current_chunk_files = Vec::new(); + + // Walk directory tree + for entry in WalkDir::new(base_path) + .follow_links(true) + .into_iter() + .filter_map(|e| e.ok()) + { + let path = entry.path(); + if !path.is_file() { + continue; + } - // Skip if matched by our ignore patterns - let mut skip = false; - #[cfg(windows)] - let pattern_path = rel_str.replace('\\', "/"); - #[cfg(not(windows))] - let pattern_path = rel_str.to_string(); + // Get path relative to base + let rel_path = path.strip_prefix(base_path).unwrap_or(path); + let rel_str = rel_path.to_string_lossy(); + + // Normalize path separators to forward slashes for consistent pattern matching + #[cfg(windows)] + let rel_str = rel_str.replace('\\', "/"); + + // Skip if matched by gitignore + #[cfg(windows)] + let gitignore_path = rel_path + .to_str() + .map(|s| s.replace('\\', "/")) + .map(PathBuf::from) + .unwrap_or(rel_path.to_path_buf()); + #[cfg(not(windows))] + let gitignore_path = rel_path.to_path_buf(); + + if gitignore.matched(&gitignore_path, false).is_ignore() { + debug!("Skipping {} - matched by gitignore", rel_str); + continue; + } - for pat in &final_config.ignore_patterns { - if pat.is_match(&pattern_path) { - debug!("Skipping {} - matched ignore pattern", rel_str); - skip = true; - break; + // Skip if matched by our ignore patterns + let mut skip = false; + #[cfg(windows)] + let pattern_path = rel_str.replace('\\', "/"); + #[cfg(not(windows))] + let pattern_path = rel_str.to_string(); + + for pat in &final_config.ignore_patterns { + if pat.is_match(&pattern_path) { + debug!("Skipping {} - matched ignore pattern", rel_str); + skip = true; + break; + } + } + if skip { + continue; } - } - if skip { - continue; - } - // Calculate priority score - let mut priority = get_file_priority( - &pattern_path, - &final_config.ignore_patterns, - &final_config.priority_list, - ); + // Calculate priority score + let mut priority = get_file_priority( + &pattern_path, + &final_config.ignore_patterns, + &final_config.priority_list, + ); - // Apply rank-based boost if available - if let Some(ref boost_map) = recentness_boost { - if let Some(boost) = boost_map.get(&pattern_path) { - priority += *boost; + // Apply rank-based boost if available + if let Some(ref boost_map) = recentness_boost { + if let Some(boost) = boost_map.get(&pattern_path) { + priority += *boost; + } } - } - files.push(FileEntry { - path: path.to_path_buf(), - priority, - }); - } + files.push(FileEntry { + path: path.to_path_buf(), + priority, + }); + } - // Sort files by priority (ascending) so higher priority files come last - files.sort_by(|a, b| a.priority.cmp(&b.priority)); + // Sort files by priority (ascending) so higher priority files come last + files.sort_by(|a, b| a.priority.cmp(&b.priority)); - // Process files in sorted order - for file in files { - let path = file.path; - let rel_path = path.strip_prefix(base_path).unwrap_or(&path); - let rel_str = rel_path.to_string_lossy(); + // Process files in sorted order + for file in files { + let path = file.path; + let rel_path = path.strip_prefix(base_path).unwrap_or(&path); + let rel_str = rel_path.to_string_lossy(); - // Skip binary files - if let Some(ref cfg) = config { - if !is_text_file(&path, &cfg.binary_extensions) { + // Skip binary files + if let Some(ref cfg) = config { + if !is_text_file(&path, &cfg.binary_extensions) { + debug!("Skipping binary file: {}", rel_str); + continue; + } + } else if !is_text_file(&path, &[]) { debug!("Skipping binary file: {}", rel_str); continue; } - } else if !is_text_file(&path, &[]) { - debug!("Skipping binary file: {}", rel_str); - continue; - } - // Read file content - let content = match fs::read_to_string(&path) { - Ok(c) => c, - Err(e) => { - debug!("Failed to read {}: {}", rel_str, e); + // Read file content + let content = match fs::read_to_string(&path) { + Ok(c) => c, + Err(e) => { + debug!("Failed to read {}: {}", rel_str, e); + continue; + } + }; + + let size = count_size(&content, count_tokens); + if size == 0 { + debug!("Skipping empty file: {}", rel_str); continue; } - }; - let size = count_size(&content, count_tokens); - if size == 0 { - debug!("Skipping empty file: {}", rel_str); - continue; - } - - // If a single file is larger than max_size, split it into multiple chunks - if size > max_size { - debug_file!("File exceeds chunk size, splitting into multiple chunks"); - let mut remaining = content.as_str(); - let mut part = 0; - - while !remaining.is_empty() { - let mut chunk_size = if count_tokens { - // In token mode, count words until we hit max_size - let mut chars = 0; - for (tokens, word) in remaining.split_whitespace().enumerate() { - if tokens + 1 > max_size { - break; + // If a single file is larger than max_size, split it into multiple chunks + if size > max_size { + debug_file!("File exceeds chunk size, splitting into multiple chunks"); + let mut remaining = content.as_str(); + let mut part = 0; + + while !remaining.is_empty() { + let mut chunk_size = if count_tokens { + // In token mode, count words until we hit max_size + let mut chars = 0; + for (tokens, word) in remaining.split_whitespace().enumerate() { + if tokens + 1 > max_size { + break; + } + chars += word.len() + 1; // +1 for space } - chars += word.len() + 1; // +1 for space + chars + } else { + max_size + }; + + // Ensure we make progress even if no word boundary found + if chunk_size == 0 { + chunk_size = std::cmp::min(max_size, remaining.len()); } - chars - } else { - max_size - }; - - // Ensure we make progress even if no word boundary found - if chunk_size == 0 { - chunk_size = std::cmp::min(max_size, remaining.len()); - } - let (chunk, rest) = remaining.split_at(std::cmp::min(chunk_size, remaining.len())); - remaining = rest.trim_start(); + let (chunk, rest) = + remaining.split_at(std::cmp::min(chunk_size, remaining.len())); + remaining = rest.trim_start(); + + let chunk_files = + vec![(format!("{}:part{}", rel_str, part), chunk.to_string())]; + debug_file!("Written chunk {}", part); + write_chunk( + &chunk_files, + part, + output_dir.as_deref(), + stream, + count_tokens, + )?; + part += 1; + } + continue; + } - let chunk_files = vec![(format!("{}:part{}", rel_str, part), chunk.to_string())]; - debug_file!("Written chunk {}", part); + // Check if adding this file would exceed chunk size + if total_size + size > max_size && !current_chunk_files.is_empty() { + // Write current chunk write_chunk( - &chunk_files, - part, + ¤t_chunk_files, + current_chunk, output_dir.as_deref(), stream, count_tokens, )?; - part += 1; + debug_file!("Written chunk {}", current_chunk); + current_chunk += 1; + current_chunk_files.clear(); + total_size = 0; } - continue; + + // Add file to current chunk + current_chunk_files.push((rel_str.to_string(), content)); + total_size += size; } - // Check if adding this file would exceed chunk size - if total_size + size > max_size && !current_chunk_files.is_empty() { - // Write current chunk + // Write final chunk if any files remain + if !current_chunk_files.is_empty() { write_chunk( ¤t_chunk_files, current_chunk, @@ -698,32 +728,23 @@ pub fn serialize_repo( count_tokens, )?; debug_file!("Written chunk {}", current_chunk); - current_chunk += 1; - current_chunk_files.clear(); - total_size = 0; } - // Add file to current chunk - current_chunk_files.push((rel_str.to_string(), content)); - total_size += size; - } - - // Write final chunk if any files remain - if !current_chunk_files.is_empty() { - write_chunk( - ¤t_chunk_files, - current_chunk, - output_dir.as_deref(), - stream, - count_tokens, - )?; - debug_file!("Written chunk {}", current_chunk); - } - - if stream { Ok(None) + } else if let Some(out_dir) = output_dir { + // Use parallel processing for non-streaming mode + process_files_parallel( + base_path, + max_size, + &out_dir, + config.as_ref(), + &final_config.ignore_patterns, + &final_config.priority_list, + recentness_boost.as_ref(), + )?; + Ok(Some(out_dir)) } else { - Ok(output_dir) + Ok(None) } } @@ -802,7 +823,7 @@ fn compute_recentness_boost( return HashMap::new(); } - // Sort by ascending commit time + // Sort by ascending commit time => first is oldest let mut sorted: Vec<(&String, &u64)> = commit_times.iter().collect(); sorted.sort_by_key(|(_, t)| **t); @@ -819,8 +840,8 @@ fn compute_recentness_boost( let mut result = HashMap::new(); for (i, (path, _time)) in sorted.iter().enumerate() { - let rank = i as f64 / last_index; // 0.0..1.0 - let boost = (rank * max_boost as f64).round() as i32; + let rank = i as f64 / last_index; // 0.0..1.0 (older files get lower rank) + let boost = (rank * max_boost as f64).round() as i32; // Newer files get higher boost result.insert((*path).clone(), boost); } result diff --git a/src/parallel.rs b/src/parallel.rs new file mode 100644 index 0000000..ac4bbba --- /dev/null +++ b/src/parallel.rs @@ -0,0 +1,325 @@ +use crate::is_text_file; +use crate::{get_file_priority, PriorityPattern, YekConfig}; +use anyhow::Result; +use crossbeam::channel::{bounded, Receiver, Sender}; +use ignore::{gitignore::GitignoreBuilder, WalkBuilder}; +use num_cpus::get; +use regex::Regex; +use std::collections::HashMap; +use std::fs; +use std::io::{BufReader, Read}; +use std::path::{Path, PathBuf}; +use std::thread; +use tracing::{debug, info}; + +/// Represents a chunk of text read from one file +#[derive(Debug)] +pub struct FileChunk { + pub priority: i32, + pub file_index: usize, + pub part_index: usize, + pub rel_path: String, + pub content: String, +} + +/// File entry with priority for sorting +#[derive(Debug, Clone)] +struct FileEntry { + path: PathBuf, + priority: i32, + file_index: usize, +} + +/// Reads a file and determines if it's likely binary by checking for null bytes +fn is_likely_binary(path: &Path) -> Result { + let f = fs::File::open(path)?; + let mut reader = BufReader::new(f); + let mut buf = [0; 4096]; + let n = reader.read(&mut buf)?; + Ok(buf[..n].contains(&0)) +} + +/// Reads and chunks a single file, sending chunks through the channel +fn read_and_send_chunks( + file_entry: FileEntry, + base_path: &Path, + tx: &Sender, + max_size: usize, +) -> Result<()> { + // Skip if binary + if is_likely_binary(&file_entry.path)? { + return Ok(()); + } + + // Read file content + let content = fs::read_to_string(&file_entry.path)?; + if content.is_empty() { + return Ok(()); + } + + // Get relative path for display + let rel_path = file_entry + .path + .strip_prefix(base_path) + .unwrap_or(&file_entry.path) + .to_string_lossy() + .into_owned(); + + // If smaller than max_size, send as single chunk + if content.len() <= max_size { + let chunk = FileChunk { + priority: file_entry.priority, + file_index: file_entry.file_index, + part_index: 0, + rel_path, + content, + }; + tx.send(chunk).ok(); + return Ok(()); + } + + // Otherwise split into chunks + let mut start = 0; + let mut part_index = 0; + let bytes = content.as_bytes(); + + while start < bytes.len() { + let end = (start + max_size).min(bytes.len()); + let slice = &bytes[start..end]; + let chunk_str = String::from_utf8_lossy(slice).into_owned(); + + let chunk = FileChunk { + priority: file_entry.priority, + file_index: file_entry.file_index, + part_index, + rel_path: rel_path.clone(), + content: chunk_str, + }; + + tx.send(chunk).ok(); + start = end; + part_index += 1; + } + + Ok(()) +} + +/// Main parallel processing function that coordinates workers and aggregator +pub fn process_files_parallel( + base_dir: &Path, + max_size: usize, + output_dir: &Path, + config: Option<&YekConfig>, + ignore_patterns: &[Regex], + priority_list: &[PriorityPattern], + recentness_boost: Option<&HashMap>, +) -> Result<()> { + // Create output directory + fs::create_dir_all(output_dir)?; + + // Collect and sort files by priority + let files = collect_files( + base_dir, + config, + ignore_patterns, + priority_list, + recentness_boost, + )?; + if files.is_empty() { + return Ok(()); + } + + // Create channels for worker→aggregator communication + let (tx, rx) = bounded(256); + + // Spawn aggregator thread + let output_dir = output_dir.to_path_buf(); + let aggregator_handle = thread::spawn(move || aggregator_loop(rx, output_dir)); + + // Spawn worker threads + let num_threads = get(); + let chunk_size = files.len().div_ceil(num_threads); + let mut handles = Vec::new(); + + for chunk in files.chunks(chunk_size) { + let chunk_files = chunk.to_vec(); + let sender = tx.clone(); + let base_path = base_dir.to_path_buf(); + + let handle = thread::spawn(move || -> Result<()> { + for file_entry in chunk_files { + read_and_send_chunks(file_entry, &base_path, &sender, max_size)?; + } + Ok(()) + }); + handles.push(handle); + } + + // Drop original sender + drop(tx); + + // Wait for workers + for handle in handles { + handle.join().unwrap()?; + } + + // Wait for aggregator + aggregator_handle.join().unwrap()?; + + Ok(()) +} + +/// Collects files from directory respecting .gitignore and sorts by priority +fn collect_files( + base_dir: &Path, + config: Option<&YekConfig>, + ignore_patterns: &[Regex], + priority_list: &[PriorityPattern], + recentness_boost: Option<&HashMap>, +) -> Result> { + // Build gitignore matcher + let mut builder = GitignoreBuilder::new(base_dir); + let gitignore_path = base_dir.join(".gitignore"); + if gitignore_path.exists() { + builder.add(&gitignore_path); + } + let gitignore = builder + .build() + .unwrap_or_else(|_| GitignoreBuilder::new(base_dir).build().unwrap()); + + let mut builder = WalkBuilder::new(base_dir); + builder.follow_links(false).standard_filters(true); + + let mut results = Vec::new(); + let mut file_index = 0; + + for entry in builder.build().flatten() { + if entry.file_type().is_some_and(|ft| ft.is_file()) { + let path = entry.path().to_path_buf(); + let rel_path = path.strip_prefix(base_dir).unwrap_or(&path); + let rel_str = rel_path.to_string_lossy(); + + // Skip if matched by gitignore + #[cfg(windows)] + let gitignore_path = rel_path + .to_str() + .map(|s| s.replace('\\', "/")) + .map(PathBuf::from) + .unwrap_or(rel_path.to_path_buf()); + #[cfg(not(windows))] + let gitignore_path = rel_path.to_path_buf(); + + if gitignore.matched(&gitignore_path, false).is_ignore() { + debug!("Skipping {} - matched by gitignore", rel_str); + continue; + } + + // Skip if matched by our ignore patterns + let mut skip = false; + for pat in ignore_patterns { + if pat.is_match(&rel_str) { + debug!("Skipping {} - matched ignore pattern", rel_str); + skip = true; + break; + } + } + if skip { + continue; + } + + // Skip binary files + if let Some(cfg) = config { + if !is_text_file(&path, &cfg.binary_extensions) { + debug!("Skipping binary file: {}", rel_str); + continue; + } + } else if !is_text_file(&path, &[]) { + debug!("Skipping binary file: {}", rel_str); + continue; + } + + // Calculate priority score + let mut priority = get_file_priority(&rel_str, ignore_patterns, priority_list); + + // Apply git recentness boost + if let Some(boost_map) = recentness_boost { + if let Some(boost) = boost_map.get(&rel_str.to_string()) { + priority += *boost; + } + } + + results.push(FileEntry { + path, + priority, + file_index, + }); + file_index += 1; + } + } + + // Sort by priority (ascending) so higher priority files come last + results.sort_by(|a, b| { + // First sort by priority (ascending) + let p = a.priority.cmp(&b.priority); + if p != std::cmp::Ordering::Equal { + return p; + } + // If priorities are equal, sort by Git boost (ascending) + if let Some(boost_map) = recentness_boost { + let a_boost = boost_map + .get(&a.path.to_string_lossy().to_string()) + .unwrap_or(&0); + let b_boost = boost_map + .get(&b.path.to_string_lossy().to_string()) + .unwrap_or(&0); + return a_boost.cmp(b_boost); // Lower boost (older files) come first + } + std::cmp::Ordering::Equal + }); + Ok(results) +} + +/// Receives chunks from workers and writes them to files +fn aggregator_loop(rx: Receiver, output_dir: PathBuf) -> Result<()> { + fs::create_dir_all(&output_dir)?; + + let mut all_chunks = Vec::new(); + while let Ok(chunk) = rx.recv() { + all_chunks.push(chunk); + } + + all_chunks.sort_by(|a, b| { + let p = a.priority.cmp(&b.priority); + if p != std::cmp::Ordering::Equal { + return p; + } + let f = a.file_index.cmp(&b.file_index); + if f != std::cmp::Ordering::Equal { + return f; + } + a.part_index.cmp(&b.part_index) + }); + + let mut current_chunk = String::new(); + let current_chunk_index = 0; + + for chunk in all_chunks { + let mut content = String::new(); + content.push_str(&format!(">>>> {}\n", chunk.rel_path)); + content.push_str(&chunk.content); + content.push_str("\n\n"); + current_chunk.push_str(&content); + } + + if !current_chunk.is_empty() { + let out_path = output_dir.join(format!("chunk-{}.txt", current_chunk_index)); + fs::write(&out_path, ¤t_chunk)?; + info!( + "Written chunk {} with {} lines.", + current_chunk_index, + current_chunk.lines().count() + ); + } + + Ok(()) +} diff --git a/tests/test_perf.rs b/tests/test_perf.rs index 14e4d25..4d5f040 100644 --- a/tests/test_perf.rs +++ b/tests/test_perf.rs @@ -1,6 +1,6 @@ use std::fs; +use std::path::Path; use std::time::{Duration, Instant}; -use tempfile::TempDir; use yek::serialize_repo; struct PerfStats { @@ -34,9 +34,9 @@ fn test_serialization_performance() { const WARMUP_RUNS: usize = 2; const BENCH_RUNS: usize = 5; - // Create temporary test directory that will be automatically cleaned up - let test_dir = TempDir::new().unwrap(); - let output_dir = TempDir::new().unwrap(); + // Create test data directory + let test_dir = "test_perf_data"; + fs::create_dir_all(test_dir).unwrap(); // Create test files of different sizes let sizes = vec![1024, 1024 * 1024, 10 * 1024 * 1024]; // 1KB, 1MB, 10MB @@ -45,7 +45,7 @@ fn test_serialization_performance() { println!("------------------------------"); for size in sizes { - let filename = test_dir.path().join(format!("file_{}_bytes.txt", size)); + let filename = format!("{}/file_{}_bytes.txt", test_dir, size); let data = vec![b'a'; size]; fs::write(&filename, &data).unwrap(); @@ -55,16 +55,15 @@ fn test_serialization_performance() { for _ in 0..WARMUP_RUNS { serialize_repo( size, - Some(test_dir.path()), + Some(Path::new(test_dir)), false, false, None, - Some(output_dir.path()), + Some(Path::new("perf_output")), None, ) .unwrap(); - fs::remove_dir_all(output_dir.path()).unwrap(); - fs::create_dir_all(output_dir.path()).unwrap(); + fs::remove_dir_all("perf_output").unwrap(); } // Benchmark runs @@ -75,11 +74,11 @@ fn test_serialization_performance() { let start = Instant::now(); serialize_repo( size, - Some(test_dir.path()), + Some(Path::new(test_dir)), false, false, None, - Some(output_dir.path()), + Some(Path::new("perf_output")), None, ) .unwrap(); @@ -87,8 +86,7 @@ fn test_serialization_performance() { stats.update(duration); println!(" Run {}: {:?}", run, duration); - fs::remove_dir_all(output_dir.path()).unwrap(); - fs::create_dir_all(output_dir.path()).unwrap(); + fs::remove_dir_all("perf_output").unwrap(); } println!("\nStats for {}B:", size); @@ -97,5 +95,6 @@ fn test_serialization_performance() { println!(" Avg: {:?}", stats.avg); } - // TempDir will automatically clean up when dropped + // Final cleanup + fs::remove_dir_all(test_dir).unwrap(); }