From 8c0a1b867509bd663ca3c71e8714b08af6780a3f Mon Sep 17 00:00:00 2001 From: Ulyssa Date: Wed, 6 Mar 2024 23:27:04 -0800 Subject: [PATCH] Load receipts for room before acquiring lock --- src/base.rs | 4 ++ src/message/mod.rs | 4 +- src/worker.rs | 91 ++++++++++++++++++++++++++++++++-------------- 3 files changed, 68 insertions(+), 31 deletions(-) diff --git a/src/base.rs b/src/base.rs index d4ca0f6..c5850b7 100644 --- a/src/base.rs +++ b/src/base.rs @@ -1119,6 +1119,10 @@ impl RoomNeeds { pub fn insert(&mut self, room_id: OwnedRoomId, need: Need) { self.needs.entry(room_id).or_default().insert(need); } + + pub fn rooms(&self) -> usize { + self.needs.len() + } } impl IntoIterator for RoomNeeds { diff --git a/src/message/mod.rs b/src/message/mod.rs index 0717ef7..504071b 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -33,7 +33,6 @@ use matrix_sdk::ruma::{ }, redaction::SyncRoomRedactionEvent, }, - AnyMessageLikeEvent, RedactContent, RedactedUnsigned, }, @@ -57,7 +56,7 @@ use ratatui_image::protocol::Protocol; use crate::config::ImagePreviewSize; use crate::{ - base::{IambResult, RoomInfo}, + base::RoomInfo, config::ApplicationSettings, message::html::{parse_matrix_html, StyleTree}, util::{space, space_span, take_width, wrapped_text}, @@ -66,7 +65,6 @@ use crate::{ mod html; mod printer; -pub type MessageFetchResult = IambResult<(Option, Vec)>; pub type MessageKey = (MessageTimeStamp, OwnedEventId); pub type Messages = BTreeMap; diff --git a/src/worker.rs b/src/worker.rs index 2d0833c..f71d673 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use std::convert::TryFrom; use std::fmt::{Debug, Formatter}; -use std::ops::Deref; +use std::ops::{Deref, DerefMut}; use std::str::FromStr; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::Arc; @@ -14,6 +14,7 @@ use std::time::{Duration, Instant}; use futures::{stream::FuturesUnordered, StreamExt}; use gethostname::gethostname; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::Semaphore; use tokio::task::JoinHandle; use tracing::{error, warn}; use url::Url; @@ -94,11 +95,11 @@ use crate::{ EventLocation, IambError, IambResult, + ProgramStore, RoomFetchStatus, RoomInfo, VerifyAction, }, - message::MessageFetchResult, ApplicationSettings, }; @@ -112,6 +113,9 @@ const IAMB_DEVICE_NAME: &str = "iamb"; const IAMB_USER_AGENT: &str = "iamb"; const MIN_MSG_LOAD: u32 = 50; +type MessageFetchResult = + IambResult<(Option, Vec<(AnyMessageLikeEvent, Vec)>)>; + fn initial_devname() -> String { format!("{} on {}", IAMB_DEVICE_NAME, gethostname().to_string_lossy()) } @@ -217,7 +221,7 @@ enum Plan { async fn load_plans(store: &AsyncProgramStore) -> Vec { let mut locked = store.lock().await; let ChatStore { need_load, rooms, .. } = &mut locked.application; - let mut plan = vec![]; + let mut plan = Vec::with_capacity(need_load.rooms() * 2); for (room_id, mut need) in std::mem::take(need_load).into_iter() { if need.contains(Need::MESSAGES) { @@ -249,21 +253,25 @@ async fn load_plans(store: &AsyncProgramStore) -> Vec { return plan; } -async fn run_plan(client: &Client, store: &AsyncProgramStore, plan: Plan) { +async fn run_plan(client: &Client, store: &AsyncProgramStore, plan: Plan, permits: &Semaphore) { + let permit = permits.acquire().await; match plan { Plan::Messages(room_id, fetch_id) => { let limit = MIN_MSG_LOAD; let client = client.clone(); - let store = store.clone(); + let store_clone = store.clone(); let res = load_older_one(&client, &room_id, fetch_id, limit).await; - load_insert(room_id, res, store).await; + let mut locked = store.lock().await; + load_insert(room_id, res, locked.deref_mut(), store_clone); }, Plan::Members(room_id) => { let res = members_load(client, &room_id).await; - members_insert(room_id, res, store).await + let mut locked = store.lock().await; + members_insert(room_id, res, locked.deref_mut()); }, } + drop(permit); } async fn load_older_one( @@ -281,22 +289,42 @@ async fn load_older_one( let Messages { end, chunk, .. } = room.messages(opts).await.map_err(IambError::from)?; - let msgs = chunk.into_iter().filter_map(|ev| { - match ev.event.deserialize() { - Ok(AnyTimelineEvent::MessageLike(msg)) => Some(msg), - Ok(AnyTimelineEvent::State(_)) => None, - Err(_) => None, - } - }); + let mut msgs = vec![]; + + for ev in chunk.into_iter() { + let msg = match ev.event.deserialize() { + Ok(AnyTimelineEvent::MessageLike(msg)) => msg, + Ok(AnyTimelineEvent::State(_)) => continue, + Err(_) => continue, + }; - Ok((end, msgs.collect())) + let event_id = msg.event_id(); + let receipts = match room + .load_event_receipts(ReceiptType::Read, ReceiptThread::Main, event_id) + .await + { + Ok(receipts) => receipts.into_iter().map(|(u, _)| u).collect(), + Err(e) => { + tracing::warn!(?event_id, "failed to get event receipts: {e}"); + vec![] + }, + }; + + msgs.push((msg, receipts)); + } + + Ok((end, msgs)) } else { Err(IambError::UnknownRoom(room_id.to_owned()).into()) } } -async fn load_insert(room_id: OwnedRoomId, res: MessageFetchResult, store: AsyncProgramStore) { - let mut locked = store.lock().await; +fn load_insert( + room_id: OwnedRoomId, + res: MessageFetchResult, + locked: &mut ProgramStore, + store: AsyncProgramStore, +) { let ChatStore { presences, rooms, worker, picker, settings, .. } = &mut locked.application; let info = rooms.get_or_default(room_id.clone()); info.fetching = false; @@ -304,12 +332,12 @@ async fn load_insert(room_id: OwnedRoomId, res: MessageFetchResult, store: Async match res { Ok((fetch_id, msgs)) => { - for msg in msgs.into_iter() { + for (msg, receipts) in msgs.into_iter() { let sender = msg.sender().to_owned(); let _ = presences.get_or_default(sender); - if let Some(room) = client.get_room(&room_id) { - update_event_receipts(info, &room, msg.event_id()).await; + for user_id in receipts { + info.set_receipt(user_id, msg.event_id().to_owned()); } match msg { @@ -345,11 +373,19 @@ async fn load_insert(room_id: OwnedRoomId, res: MessageFetchResult, store: Async } async fn load_older(client: &Client, store: &AsyncProgramStore) -> usize { + // This is an arbitrary limit on how much work we do in parallel to avoid + // spawning too many tasks at startup and overwhelming the client. We + // should normally only surpass this limit at startup when doing an initial. + // fetch for each room. + const LIMIT: usize = 15; + // Plans are run in parallel. Any room *may* have several plans. - load_plans(store) - .await + let plans = load_plans(store).await; + let permits = Semaphore::new(LIMIT); + + plans .into_iter() - .map(|plan| run_plan(client, store, plan)) + .map(|plan| run_plan(client, store, plan, &permits)) .collect::>() .count() .await @@ -366,14 +402,13 @@ async fn members_load(client: &Client, room_id: &RoomId) -> IambResult>, - store: &AsyncProgramStore, + store: &mut ProgramStore, ) { if let Ok(members) = res { - let mut locked = store.lock().await; - let ChatStore { rooms, .. } = &mut locked.application; + let ChatStore { rooms, .. } = &mut store.application; let info = rooms.get_or_default(room_id.clone()); for member in members { @@ -672,7 +707,7 @@ async fn create_client_inner( .http_client(http) .sqlite_store(settings.sqlite_dir.as_path(), None) .request_config(req_config) - .with_encryption_settings(DEFAULT_ENCRYPTION_SETTINGS.clone()); + .with_encryption_settings(DEFAULT_ENCRYPTION_SETTINGS); let builder = if let Some(url) = homeserver { // Use the explicitly specified homeserver.