From 6c4e0e4176b769114387df9c1e9467740607889f Mon Sep 17 00:00:00 2001 From: Luc Georges Date: Fri, 17 Nov 2023 18:05:45 +0100 Subject: [PATCH] feat: parallelise at hole level (#44) * feat: parallelise at hole level * fix(ci): move strategy to testbed job * feat: output json results file * fix(ci): install jq * fix(ci): add missing `runs-on` * fix(ci): add dependency to testbed job * fix(ci): invalid artifact key name * fix(ci): add missing i in fastapi key * feat(ci): make CI run different # of threads per repo * fix(ci): results.json not in markdown * feat: round output values * fix: avoid creating zombie processes * fix: check on word instead of line * feat: recreate holes for long CI --- .github/workflows/test.yml | 121 +++++- .gitignore | 2 +- crates/llm-ls/src/document.rs | 1 - crates/testbed/README.md | 11 +- crates/testbed/holes/async-executor.json | 2 +- crates/testbed/holes/cached.json | 2 +- crates/testbed/holes/constrandom.json | 2 +- crates/testbed/holes/fastapi.json | 2 +- crates/testbed/holes/helix.json | 2 +- crates/testbed/holes/huggingface_hub.json | 2 +- crates/testbed/holes/io-ts.json | 2 +- crates/testbed/holes/lance.json | 2 +- crates/testbed/holes/lancedb.json | 2 +- crates/testbed/holes/picklescan.json | 2 +- crates/testbed/holes/simple.json | 2 +- crates/testbed/holes/starlette.json | 2 +- crates/testbed/holes/zod.json | 2 +- crates/testbed/src/main.rs | 433 ++++++++++++---------- crates/testbed/src/runner.rs | 8 +- 19 files changed, 381 insertions(+), 221 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b576125..33b4220 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -11,6 +11,22 @@ concurrency: jobs: testbed: + strategy: + matrix: + repo: + - { name: simple, key: simple, parallel: 8 } + - { name: mmaitre314/picklescan, key: picklescan, parallel: 8 } + - { name: huggingface/huggingface_hub, key: huggingface_hub, parallel: 8 } + - { name: tiangolo/fastapi, key: fastapi, parallel: 8 } + - { name: encode/starlette, key: starlette, parallel: 8 } + - { name: lancedb/lancedb, key: lancedb, parallel: 2 } + - { name: lancedb/lance, key: lance, parallel: 2 } + - { name: tkaitchuck/constrandom, key: constrandom, parallel: 8 } + - { name: jaemk/cached, key: cached, parallel: 4 } + - { name: smol-rs/async-executor, key: async-executor, parallel: 4 } + - { name: gcanti/io-ts, key: io-ts, parallel: 8 } + - { name: colinhacks/zod, key: zod, parallel: 8 } + - { name: helix-editor/helix, key: helix, parallel: 2 } runs-on: [self-hosted, intel-cpu, 8-cpu, ci] container: image: ubuntu:22.04 @@ -60,17 +76,118 @@ jobs: run: cargo build -r - name: Run testbed - run: cargo run --bin testbed -r -- --api-token $API_TOKEN -r `pwd`/crates/testbed/repositories-ci.yaml + run: 'cargo run --bin testbed -r -- --api-token $API_TOKEN -r `pwd`/crates/testbed/repositories-ci.yaml -f ${{ matrix.repo.name }} -p ${{ matrix.repo.parallel }}' if: github.event_name == 'push' || github.event_name == 'pull_request' env: API_TOKEN: ${{ secrets.API_TOKEN }} - name: Run testbed - run: cargo run --bin testbed -r -- --api-token $API_TOKEN + run: 'cargo run --bin testbed -r -- --api-token $API_TOKEN -f ${{ matrix.repo.name }} -p ${{ matrix.repo.parallel }}' if: github.event_name == 'workflow_dispatch' env: API_TOKEN: ${{ secrets.API_TOKEN }} + - name: Upload artifacts + uses: actions/upload-artifact@v1 + with: + name: results-${{ matrix.repo.key }} + path: ./results.json + + comment_results: + needs: [testbed] + runs-on: [self-hosted, intel-cpu, 8-cpu, ci] + container: + image: ubuntu:22.04 + steps: + - name: Install dependencies + run: | + apt update + apt install -y jq + + - uses: actions/download-artifact@v1 + with: + name: results-simple + path: results-simple + + - uses: actions/download-artifact@v1 + with: + name: results-picklescan + path: results-picklescan + + - uses: actions/download-artifact@v1 + with: + name: results-huggingface_hub + path: results-huggingface_hub + + - uses: actions/download-artifact@v1 + with: + name: results-fastapi + path: results-fastapi + + - uses: actions/download-artifact@v1 + with: + name: results-starlette + path: results-starlette + + - uses: actions/download-artifact@v1 + with: + name: results-lancedb + path: results-lancedb + + - uses: actions/download-artifact@v1 + with: + name: results-lance + path: results-lance + + - uses: actions/download-artifact@v1 + with: + name: results-constrandom + path: results-constrandom + + - uses: actions/download-artifact@v1 + with: + name: results-cached + path: results-cached + + - uses: actions/download-artifact@v1 + with: + name: results-async-executor + path: results-async-executor + + - uses: actions/download-artifact@v1 + with: + name: results-io-ts + path: results-io-ts + + - uses: actions/download-artifact@v1 + with: + name: results-zod + path: results-zod + + - uses: actions/download-artifact@v1 + with: + name: results-helix + path: results-helix + + - name: Display structure of downloaded files + run: ls -R + + - name: output to markdown + run: | + cat > results.md <> results.md + cat >> results.md <, + /// Concurrent hole completions number + #[arg(short, long, default_value_t = 8)] + parallel_hole_completions: usize, + /// Path to the local repositories/ directory #[arg(short = 'R', long)] repos_dir_path: Option, @@ -227,6 +231,52 @@ impl HoleCompletionResult { } } +struct SetupCache { + cache: HashMap>, +} + +impl SetupCache { + fn new(repositories: &Vec) -> Self { + let mut cache = HashMap::new(); + for repo in repositories { + cache.insert(repo.name(), OnceCell::new()); + } + Self { cache } + } + + async fn get_setup_cache( + &self, + repos_dir_path: PathBuf, + repo: Repository, + ) -> anyhow::Result<&(TempDir, PathBuf)> { + self.cache + .get(&repo.name()) + .ok_or(anyhow!( + "failed to find setup cache for repo {}", + repo.name() + ))? + .get_or_try_init(|| async move { + let (temp_dir, repo_path) = setup_repo_dir(&repos_dir_path, &repo.source).await?; + if let Some(commands) = &repo.setup_commands { + run_setup(commands, &repo.env, &repo_path).await?; + } + Ok((temp_dir, repo_path)) + }) + .await + } + + async fn create_cache_copy( + &self, + repos_dir_path: PathBuf, + repo: Repository, + ) -> anyhow::Result { + let (_cached_dir, path_in_dir) = self.get_setup_cache(repos_dir_path, repo).await?; + let temp_dir = TempDir::new()?; + copy_dir_contents(path_in_dir, temp_dir.path()).await?; + Ok(temp_dir) + } +} + async fn get_api_token(args_token: Option) -> anyhow::Result> { if args_token.is_some() { Ok(args_token) @@ -271,6 +321,11 @@ async fn download_repo_from_github( } async fn copy_dir_contents(source: &Path, dest: &Path) -> anyhow::Result<()> { + debug!( + "copying files from {} to {}", + source.to_str().unwrap(), + dest.to_str().unwrap() + ); let mut stack = VecDeque::new(); stack.push_back((source.to_path_buf(), dest.to_path_buf())); while let Some((src, dst)) = stack.pop_back() { @@ -286,6 +341,9 @@ async fn copy_dir_contents(source: &Path, dest: &Path) -> anyhow::Result<()> { stack.push_back((src_path, dst_path)); } else if entry_type.is_file() { fs::copy(&src_path, &dst_path).await?; + } else if entry_type.is_symlink() { + let link_target = fs::read_link(&src_path).await?; + fs::symlink(link_target, dst_path.clone()).await?; } } } @@ -339,7 +397,11 @@ async fn run_setup( for (name, value) in &parsed_env { status_cmd.env(name, value); } - debug!("running setup command: {} {:?}", command.0, command.1); + debug!( + "running setup command: {} {}", + command.0, + command.1.join(" ") + ); let status = status_cmd .args(&command.1) .current_dir(&repo_path) @@ -385,155 +447,130 @@ async fn build( #[allow(clippy::too_many_arguments)] async fn complete_holes( + hole: Hole, repo: Repository, client: Arc, file_cache: Arc>>, - holes_dir_path: PathBuf, repos_dir_path: PathBuf, repos_config: RepositoriesConfig, api_token: Option, semaphore: Arc, -) -> anyhow::Result> { + setup_cache: Arc, +) -> anyhow::Result { let permit = semaphore.acquire_owned().await?; let span = info_span!("complete_hole", repo_name = repo.name()); + let RepositoriesConfig { + context_window, + fim, + model, + request_params, + tls_skip_verify_insecure, + tokenizer_config, + tokens_to_clear, + .. + } = repos_config; async move { - let holes_file_path = holes_dir_path.join(&repo.holes_file); - let mut holes = String::new(); - File::open(holes_file_path) - .await? - .read_to_string(&mut holes) + let tmp_dir = setup_cache + .create_cache_copy(repos_dir_path, repo.clone()) .await?; - let holes: Vec = serde_json::from_str(&holes)?; - let ten_percent = if holes.len() >= 10 { - holes.len() / 10 + let repo_path = tmp_dir.path(); + let hole_instant = Instant::now(); + let file_path = repo_path.join(&hole.file); + let file_path_str = file_path + .to_str() + .ok_or(anyhow!("failed to convert file to str"))?; + let mut file_content = if file_cache.read().await.contains_key(&file_path) { + file_cache + .read() + .await + .get(&file_path) + .ok_or(anyhow!("failed to find {} in file cache", file_path_str))? + .to_owned() } else { - 1 + let file_content = Rope::from_str(&read_to_string(&file_path).await?); + file_cache + .write() + .await + .insert(file_path.clone(), file_content.clone()); + file_content }; - info!("running {} hole completions", holes.len()); - let RepositoriesConfig { - context_window, - fim, - model, - request_params, - tls_skip_verify_insecure, - tokenizer_config, - tokens_to_clear, - .. - } = repos_config; - let (_temp_dir, repo_path) = setup_repo_dir(&repos_dir_path, &repo.source).await?; - if let Some(commands) = &repo.setup_commands { - run_setup(commands, &repo.env, &repo_path).await?; - } - let mut hole_completions_result = Vec::with_capacity(holes.len()); - for (idx, hole) in holes.iter().enumerate() { - let hole_instant = Instant::now(); - let file_path = repo_path.join(&hole.file); - let file_path_str = file_path - .to_str() - .ok_or(anyhow!("failed to convert file to str"))?; - let mut file_content = if file_cache.read().await.contains_key(&file_path) { - file_cache - .read() - .await - .get(&file_path) - .ok_or(anyhow!("failed to find {} in file cache", file_path_str))? - .to_owned() - } else { - let file_content = Rope::from_str(&read_to_string(&file_path).await?); - file_cache - .write() - .await - .insert(file_path.clone(), file_content.clone()); - file_content - }; - let original_content = file_content.clone(); - let hole_start = file_content.line_to_char(hole.cursor.line as usize) - + hole.cursor.character as usize; - let hole_end = hole_start - + file_content - .line(hole.cursor.line as usize) - .slice(hole.cursor.character as usize..) - .len_chars() - - 1; // NOTE: -1 to preserve the trailing `\n` - file_content.remove(hole_start..hole_end); - - let uri = Url::parse(&format!("file:/{file_path_str}"))?; - client.send_notification::( - DidOpenTextDocumentParams { - text_document: TextDocumentItem { - uri: uri.clone(), - language_id: repo.language.to_string(), - version: 0, - text: file_content.to_string(), - }, + let original_content = file_content.clone(); + let hole_start = + file_content.line_to_char(hole.cursor.line as usize) + hole.cursor.character as usize; + let hole_end = hole_start + + file_content + .line(hole.cursor.line as usize) + .slice(hole.cursor.character as usize..) + .len_chars() + - 1; + file_content.remove(hole_start..hole_end); + + let uri = Url::parse(&format!("file:/{file_path_str}"))?; + client.send_notification::( + DidOpenTextDocumentParams { + text_document: TextDocumentItem { + uri: uri.clone(), + language_id: repo.language.to_string(), + version: 0, + text: file_content.to_string(), }, - ); - let response = client - .send_request::(GetCompletionsParams { - api_token: api_token.clone(), - context_window, - fim: fim.clone(), - ide: Ide::default(), - model: model.clone(), - request_params: request_params.clone(), - text_document_position: TextDocumentPositionParams { - position: hole.cursor, - text_document: TextDocumentIdentifier { uri }, - }, - tls_skip_verify_insecure, - tokens_to_clear: tokens_to_clear.clone(), - tokenizer_config: tokenizer_config.clone(), - }) - .await?; - let (_, result): (RequestId, GetCompletionsResult) = match response.extract() { - Ok(res) => res, - Err(err) => { - error!("llm-ls response error: {err}"); - continue; - } - }; + }, + ); + let response = client + .send_request::(GetCompletionsParams { + api_token: api_token.clone(), + context_window, + fim: fim.clone(), + ide: Ide::default(), + model: model.clone(), + request_params: request_params.clone(), + text_document_position: TextDocumentPositionParams { + position: hole.cursor, + text_document: TextDocumentIdentifier { uri }, + }, + tls_skip_verify_insecure, + tokens_to_clear: tokens_to_clear.clone(), + tokenizer_config: tokenizer_config.clone(), + }) + .await?; + let (_, result): (RequestId, GetCompletionsResult) = response.extract()?; - file_content.insert(hole_start, &result.completions[0].generated_text); - let mut file = OpenOptions::new() - .write(true) - .truncate(true) - .open(&file_path) - .await?; - file.write_all(file_content.to_string().as_bytes()).await?; - let test_percentage = - if build(&repo.build_command, &repo.build_args, &repo.env, &repo_path).await? { - run_test( - repo.runner, - &repo.runner_command, - &repo.runner_args, - &mut repo.runner_extra_args.clone(), - &repo.env, - &repo_path, - ) - .await? - } else { - 0f32 - }; - debug!("{} passed {}%", hole.to_string(), test_percentage * 100f32); - hole_completions_result.push(HoleCompletionResult::new( - repo.name(), - repo.source.source_type(), - test_percentage, - hole_instant.elapsed().as_millis(), - )); - let mut file = OpenOptions::new() - .write(true) - .truncate(true) - .open(&file_path) - .await?; - file.write_all(original_content.to_string().as_bytes()) - .await?; - if (idx + 1) % ten_percent == 0 { - info!("completed {}%", (idx + 1) / ten_percent * 10); - } - } + file_content.insert(hole_start, &result.completions[0].generated_text); + let mut file = OpenOptions::new() + .write(true) + .truncate(true) + .open(&file_path) + .await?; + file.write_all(file_content.to_string().as_bytes()).await?; + let test_percentage = + if build(&repo.build_command, &repo.build_args, &repo.env, &repo_path).await? { + run_test( + repo.runner, + &repo.runner_command, + &repo.runner_args, + &mut repo.runner_extra_args.clone(), + &repo.env, + repo_path, + ) + .await? + } else { + 0f32 + }; + debug!("{} passed {}%", hole.to_string(), test_percentage * 100f32); + let hole_completions_result = HoleCompletionResult::new( + repo.name(), + repo.source.source_type(), + test_percentage, + hole_instant.elapsed().as_millis(), + ); + let mut file = OpenOptions::new() + .write(true) + .truncate(true) + .open(&file_path) + .await?; + file.write_all(original_content.to_string().as_bytes()) + .await?; drop(permit); - info!("finished running hole completions"); Ok(hole_completions_result) } .instrument(span) @@ -616,39 +653,57 @@ async fn main() -> anyhow::Result<()> { let mut passing_tests_percentage = vec![]; let repositories = repos_config.repositories.clone(); + let setup_cache = Arc::new(SetupCache::new(&repositories)); let mut handles = FuturesUnordered::new(); - // Query the model by batches of 64 - let semaphore = Arc::new(Semaphore::new(8)); + let semaphore = Arc::new(Semaphore::new(args.parallel_hole_completions)); for repo in repositories { if filter_repos && !filter_list.contains(&repo.name()) { continue; } - let client = client.clone(); - let file_cache = file_cache.clone(); - let holes_dir_path = holes_dir_path.clone(); - let repos_dir_path = repos_dir_path.clone(); - let repos_config = repos_config.clone(); - let api_token = api_token.clone(); - let semaphore = semaphore.clone(); - handles.push(tokio::spawn(async move { - complete_holes( - repo, - client, - file_cache, - holes_dir_path, - repos_dir_path, - repos_config, - api_token, - semaphore, - ) - .await - })); + let holes_file_path = holes_dir_path.join(&repo.holes_file); + let mut holes = String::new(); + File::open(holes_file_path) + .await? + .read_to_string(&mut holes) + .await?; + let holes: Vec = serde_json::from_str(&holes)?; + info!("running {} hole completions", holes.len()); + for hole in holes { + let repo = repo.clone(); + let client = client.clone(); + let file_cache = file_cache.clone(); + let repos_dir_path = repos_dir_path.clone(); + let repos_config = repos_config.clone(); + let api_token = api_token.clone(); + let semaphore = semaphore.clone(); + let setup_cache = setup_cache.clone(); + handles.push(tokio::spawn(async move { + complete_holes( + hole, + repo, + client, + file_cache, + repos_dir_path, + repos_config, + api_token, + semaphore, + setup_cache, + ) + .await + })); + } } while let Some(res) = handles.next().await { match res { - Ok(Ok(res)) => passing_tests_percentage.extend(res), - Ok(Err(err)) => return Err(err), + Ok(Ok(res)) => passing_tests_percentage.push(res), + Ok(Err(err)) => { + if let Some(extract_err) = err.downcast_ref::() { + error!("llm-ls response error: {extract_err}"); + } else { + return Err(err); + } + } Err(err) => return Err(err.into()), } } @@ -663,51 +718,33 @@ async fn main() -> anyhow::Result<()> { }) .or_insert((res.completion_time_ms, res.pass_percentage, 1f32)); } - let mut results_table = - "| Repository name | Source type | Average hole completion time (s) | Pass percentage |\n| :-------------- | :---------- | -------------------------------: | --------------: |\n".to_owned(); - let mut total_time = 0; - let mut total_percentage = 0f32; - let mut total_count = 0f32; - for (k, v) in results_map.iter() { - let avg = v.1 / v.2; - let avg_time = v.0 as f32 / v.2; - results_table.push_str(&format!( - "| {} | {} | {} | {}% |\n", - k.0, - k.1, - avg_time / 1_000f32, - avg * 100f32 - )); - total_percentage += v.1; - total_count += v.2; - total_time += v.0; - } - let total_avg = total_percentage / total_count; - let total_time_avg = total_time as f32 / total_count; - results_table.push_str(&format!( - "| **Total** | -- | {} | {}% |\n\n", - total_time_avg / 1_000f32, - total_avg * 100f32 - )); - results_table.push_str( - &[ - "**Note:** The \"hole completion time\" represents the full process of:", - " - replacing the code from the file with a completion from the model", - " - building the project", - " - running the tests", - ] - .join("\n"), - ); - info!("llm-ls results:\n{}", results_table); + let json_result = results_map + .iter() + .map(|(k, v)| { + let avg_hole_completion_time_ms = v.0 as f32 / v.2 / 1_000f32; + let pass_percentage = v.1 / v.2 * 100f32; + info!( + "{} from {} obtained {:.2}% in {:.3}s", + k.0, k.1, pass_percentage, avg_hole_completion_time_ms + ); + serde_json::json!({ + "repo_name": k.0, + "source_type": k.1, + "avg_hole_completion_time_ms": format!("{:.3}", avg_hole_completion_time_ms), + "pass_percentage": format!("{:.2}", pass_percentage), + }) + }) + .collect::>(); OpenOptions::new() .create(true) .write(true) .truncate(true) - .open("results.md") + .open("results.json") .await? - .write_all(results_table.as_bytes()) + .write_all(serde_json::to_string(&json_result)?.as_bytes()) .await?; + info!("all tests were run, exiting"); client.shutdown().await?; match Arc::into_inner(client) { Some(client) => client.exit().await, diff --git a/crates/testbed/src/runner.rs b/crates/testbed/src/runner.rs index aabee32..226c20a 100644 --- a/crates/testbed/src/runner.rs +++ b/crates/testbed/src/runner.rs @@ -52,6 +52,7 @@ async fn pytest_runner( .ok_or(anyhow!("failed to take stdout"))? .read_to_string(&mut stdout) .await?; + child.wait().await?; // XXX: the pytest command can still fail even after the compilation check // the above check should prevent an error, but better safe than sorry @@ -124,6 +125,7 @@ async fn cargo_runner( .ok_or(anyhow!("failed to take stdout"))? .read_to_string(&mut stdout) .await?; + child.wait().await?; let lines = stdout.split_terminator('\n'); let mut passed = 0; let mut failed = 0; @@ -173,6 +175,7 @@ async fn jest_runner( .ok_or(anyhow!("failed to take stderr"))? .read_to_string(&mut stderr) .await?; + child.wait().await?; let lines = stderr.split_terminator('\n'); let mut passed = 0f32; let mut failed = 0f32; @@ -183,7 +186,7 @@ async fn jest_runner( for word in words { if word.contains("passed") { passed = prev.parse::()? as f32; - } else if line.contains("failed") { + } else if word.contains("failed") { failed = prev.parse::()? as f32; } prev = word; @@ -228,6 +231,7 @@ async fn vitest_runner( .ok_or(anyhow!("failed to take stdout"))? .read_to_string(&mut stdout) .await?; + child.wait().await?; let lines = stdout.split_terminator('\n'); let mut passed = 0f32; let mut failed = 0f32; @@ -238,7 +242,7 @@ async fn vitest_runner( for word in words { if word.contains("passed") { passed = prev.parse::()? as f32; - } else if line.contains("failed") { + } else if word.contains("failed") { failed = prev.parse::()? as f32; } prev = word;