Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load receipts for room before acquiring lock #213

Merged
merged 1 commit into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,10 @@ impl RoomNeeds {
pub fn insert(&mut self, room_id: OwnedRoomId, need: Need) {
self.needs.entry(room_id).or_default().insert(need);
}

pub fn rooms(&self) -> usize {
self.needs.len()
}
}

impl IntoIterator for RoomNeeds {
Expand Down
4 changes: 1 addition & 3 deletions src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use matrix_sdk::ruma::{
},
redaction::SyncRoomRedactionEvent,
},
AnyMessageLikeEvent,
RedactContent,
RedactedUnsigned,
},
Expand All @@ -57,7 +56,7 @@ use ratatui_image::protocol::Protocol;

use crate::config::ImagePreviewSize;
use crate::{
base::{IambResult, RoomInfo},
base::RoomInfo,
config::ApplicationSettings,
message::html::{parse_matrix_html, StyleTree},
util::{space, space_span, take_width, wrapped_text},
Expand All @@ -66,7 +65,6 @@ use crate::{
mod html;
mod printer;

pub type MessageFetchResult = IambResult<(Option<String>, Vec<AnyMessageLikeEvent>)>;
pub type MessageKey = (MessageTimeStamp, OwnedEventId);
pub type Messages = BTreeMap<MessageKey, Message>;

Expand Down
91 changes: 63 additions & 28 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt::{Debug, Formatter};
use std::ops::Deref;
use std::ops::{Deref, DerefMut};
use std::str::FromStr;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::Arc;
Expand All @@ -14,6 +14,7 @@ use std::time::{Duration, Instant};
use futures::{stream::FuturesUnordered, StreamExt};
use gethostname::gethostname;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::Semaphore;
use tokio::task::JoinHandle;
use tracing::{error, warn};
use url::Url;
Expand Down Expand Up @@ -94,11 +95,11 @@ use crate::{
EventLocation,
IambError,
IambResult,
ProgramStore,
RoomFetchStatus,
RoomInfo,
VerifyAction,
},
message::MessageFetchResult,
ApplicationSettings,
};

Expand All @@ -112,6 +113,9 @@ const IAMB_DEVICE_NAME: &str = "iamb";
const IAMB_USER_AGENT: &str = "iamb";
const MIN_MSG_LOAD: u32 = 50;

type MessageFetchResult =
IambResult<(Option<String>, Vec<(AnyMessageLikeEvent, Vec<OwnedUserId>)>)>;

fn initial_devname() -> String {
format!("{} on {}", IAMB_DEVICE_NAME, gethostname().to_string_lossy())
}
Expand Down Expand Up @@ -217,7 +221,7 @@ enum Plan {
async fn load_plans(store: &AsyncProgramStore) -> Vec<Plan> {
let mut locked = store.lock().await;
let ChatStore { need_load, rooms, .. } = &mut locked.application;
let mut plan = vec![];
let mut plan = Vec::with_capacity(need_load.rooms() * 2);

for (room_id, mut need) in std::mem::take(need_load).into_iter() {
if need.contains(Need::MESSAGES) {
Expand Down Expand Up @@ -249,21 +253,25 @@ async fn load_plans(store: &AsyncProgramStore) -> Vec<Plan> {
return plan;
}

async fn run_plan(client: &Client, store: &AsyncProgramStore, plan: Plan) {
async fn run_plan(client: &Client, store: &AsyncProgramStore, plan: Plan, permits: &Semaphore) {
let permit = permits.acquire().await;
match plan {
Plan::Messages(room_id, fetch_id) => {
let limit = MIN_MSG_LOAD;
let client = client.clone();
let store = store.clone();
let store_clone = store.clone();

let res = load_older_one(&client, &room_id, fetch_id, limit).await;
load_insert(room_id, res, store).await;
let mut locked = store.lock().await;
load_insert(room_id, res, locked.deref_mut(), store_clone);
},
Plan::Members(room_id) => {
let res = members_load(client, &room_id).await;
members_insert(room_id, res, store).await
let mut locked = store.lock().await;
members_insert(room_id, res, locked.deref_mut());
},
}
drop(permit);
}

async fn load_older_one(
Expand All @@ -281,35 +289,55 @@ async fn load_older_one(

let Messages { end, chunk, .. } = room.messages(opts).await.map_err(IambError::from)?;

let msgs = chunk.into_iter().filter_map(|ev| {
match ev.event.deserialize() {
Ok(AnyTimelineEvent::MessageLike(msg)) => Some(msg),
Ok(AnyTimelineEvent::State(_)) => None,
Err(_) => None,
}
});
let mut msgs = vec![];

for ev in chunk.into_iter() {
let msg = match ev.event.deserialize() {
Ok(AnyTimelineEvent::MessageLike(msg)) => msg,
Ok(AnyTimelineEvent::State(_)) => continue,
Err(_) => continue,
};

Ok((end, msgs.collect()))
let event_id = msg.event_id();
let receipts = match room
.load_event_receipts(ReceiptType::Read, ReceiptThread::Main, event_id)
.await
{
Ok(receipts) => receipts.into_iter().map(|(u, _)| u).collect(),
Err(e) => {
tracing::warn!(?event_id, "failed to get event receipts: {e}");
vec![]
},
};

msgs.push((msg, receipts));
}

Ok((end, msgs))
} else {
Err(IambError::UnknownRoom(room_id.to_owned()).into())
}
}

async fn load_insert(room_id: OwnedRoomId, res: MessageFetchResult, store: AsyncProgramStore) {
let mut locked = store.lock().await;
fn load_insert(
room_id: OwnedRoomId,
res: MessageFetchResult,
locked: &mut ProgramStore,
store: AsyncProgramStore,
) {
let ChatStore { presences, rooms, worker, picker, settings, .. } = &mut locked.application;
let info = rooms.get_or_default(room_id.clone());
info.fetching = false;
let client = &worker.client;

match res {
Ok((fetch_id, msgs)) => {
for msg in msgs.into_iter() {
for (msg, receipts) in msgs.into_iter() {
let sender = msg.sender().to_owned();
let _ = presences.get_or_default(sender);

if let Some(room) = client.get_room(&room_id) {
update_event_receipts(info, &room, msg.event_id()).await;
for user_id in receipts {
info.set_receipt(user_id, msg.event_id().to_owned());
}

match msg {
Expand Down Expand Up @@ -345,11 +373,19 @@ async fn load_insert(room_id: OwnedRoomId, res: MessageFetchResult, store: Async
}

async fn load_older(client: &Client, store: &AsyncProgramStore) -> usize {
// This is an arbitrary limit on how much work we do in parallel to avoid
// spawning too many tasks at startup and overwhelming the client. We
// should normally only surpass this limit at startup when doing an initial.
// fetch for each room.
const LIMIT: usize = 15;

// Plans are run in parallel. Any room *may* have several plans.
load_plans(store)
.await
let plans = load_plans(store).await;
let permits = Semaphore::new(LIMIT);

plans
.into_iter()
.map(|plan| run_plan(client, store, plan))
.map(|plan| run_plan(client, store, plan, &permits))
.collect::<FuturesUnordered<_>>()
.count()
.await
Expand All @@ -366,14 +402,13 @@ async fn members_load(client: &Client, room_id: &RoomId) -> IambResult<Vec<RoomM
}
}

async fn members_insert(
fn members_insert(
room_id: OwnedRoomId,
res: IambResult<Vec<RoomMember>>,
store: &AsyncProgramStore,
store: &mut ProgramStore,
) {
if let Ok(members) = res {
let mut locked = store.lock().await;
let ChatStore { rooms, .. } = &mut locked.application;
let ChatStore { rooms, .. } = &mut store.application;
let info = rooms.get_or_default(room_id.clone());

for member in members {
Expand Down Expand Up @@ -672,7 +707,7 @@ async fn create_client_inner(
.http_client(http)
.sqlite_store(settings.sqlite_dir.as_path(), None)
.request_config(req_config)
.with_encryption_settings(DEFAULT_ENCRYPTION_SETTINGS.clone());
.with_encryption_settings(DEFAULT_ENCRYPTION_SETTINGS);

let builder = if let Some(url) = homeserver {
// Use the explicitly specified homeserver.
Expand Down
Loading