Skip to content

Commit

Permalink
changes to make searching in bulk.
Browse files Browse the repository at this point in the history
some leftover stuff to remove
  • Loading branch information
zawedcvg committed Jul 9, 2024
1 parent 1aefecc commit 89494e6
Showing 1 changed file with 50 additions and 16 deletions.
66 changes: 50 additions & 16 deletions ranked/parser/src/inserting_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,34 @@ pub async fn inserting_info(game: Game) -> Result<(), Box<dyn std::error::Error>
println!("{}", database_url);
// Create a connection pool
let pool = PgPoolOptions::new()
.max_connections(20)
.max_connections(10)
.connect(&database_url)
.await?;

let mut all_player_id_search: Vec<_> = Vec::new();

let match_id_future = insert_into_match(&game, pool.clone());

for player in game.get_player_vec() {
//all_player_id_search.spawn(search_for_player(player, pool.clone()));
all_player_id_search.push(search_for_player(player, pool.clone()));
}
use std::time::Instant;
let now = Instant::now();

let bulk_search_future = bulk_search_player_ids(game.get_player_vec(), pool.clone());

let match_id_future = insert_into_match(&game, pool.clone());

let mut already_added_steam_ids: HashMap<i64, i32> = HashMap::new();

let match_id = match_id_future.await;
let player_id_search_output = join_all(all_player_id_search).await;
let mut fps_bulk_insert: Vec<_> = Vec::new();
let mut commander_bulk_insert: Vec<_> = Vec::new();
let mut commander_details_futures: Vec<_> = Vec::new();

println!("Done waiting for search");
let match_id = match_id_future.await;

for (&player, query_output) in game.get_player_vec().iter().zip(player_id_search_output) {
let bulk_search_players = bulk_search_future.await;
println!("done searching at {:?}", now.elapsed());

for (&player, query_output) in game.get_player_vec().iter().zip(bulk_search_players) {
let db_player_id: i32;
if let Some(id) = query_output {
db_player_id = id
if query_output != -1 {
db_player_id = query_output
} else {
match already_added_steam_ids.get(&player.player_id) {
Some(&id) => db_player_id = id,
Expand All @@ -76,14 +77,16 @@ pub async fn inserting_info(game: Game) -> Result<(), Box<dyn std::error::Error>
}
}

println!("waiting for the inserts");

let returned_elos = join_all(commander_details_futures).await;
println!("done sending stuff for inserts at {:?}", now.elapsed());

let mut insert_new_commander: Vec<_> = Vec::new();

let mut win_list: Vec<_> = Vec::new();

let returned_elos = join_all(commander_details_futures).await;

println!("{:?} done waiting for elos ", now.elapsed());

let mut elo_list = Vec::new();
for ((db_player_id, _, player), returned_elo) in zip!(commander_bulk_insert, returned_elos) {
match returned_elo {
Expand Down Expand Up @@ -113,9 +116,13 @@ pub async fn inserting_info(game: Game) -> Result<(), Box<dyn std::error::Error>
bulk_insert_into_matches_players_commander(&commander_bulk_insert, pool.clone());

update_commander_elo(&new_elos, &commander_bulk_insert, &win_list, pool.clone()).await;
println!("{:?} updating the elos", now.elapsed());

bulk_fps_insert_future.await;
bulk_commander_insert_future.await;
join_all(insert_new_commander).await;

println!("{:?} at the end", now.elapsed());

Ok(())
}
Expand Down Expand Up @@ -302,6 +309,7 @@ async fn search_for_commander(
.fetch_one(&pool)
.await;


match id_future {
Ok(id) => Some(id.ELO),
Err(_) => None,
Expand Down Expand Up @@ -478,3 +486,29 @@ async fn update_commander_elo(
Err(e) => panic!("Could not add commanders due to {e}"),
};
}

async fn bulk_search_player_ids(players: Vec<&Player>, pool: Pool<Postgres>) -> Vec<i32> {
let all_search_ids = sqlx::query!(
r#"
SELECT COALESCE(u.id, -1) AS id
FROM UNNEST($1::BigInt[]) WITH ORDINALITY as p(id, ord)
LEFT JOIN players u ON u.steam_id = p.id
ORDER BY p.ord
"#,
&players
.iter()
.map(|player| player.player_id)
.collect::<Vec<i64>>()
)
.fetch_all(&pool)
.await;


match all_search_ids {
Ok(all_searched) => all_searched
.iter()
.map(|x| x.id.unwrap_or_else(|| panic!("Could not unwrap id")))
.collect::<Vec<_>>(),
Err(e) => panic!("could not update due to {e}"),
}
}

0 comments on commit 89494e6

Please sign in to comment.