Skip to content

Commit

Permalink
Add initial MSC4186 (Simplified Sliding Sync) implementation
Browse files Browse the repository at this point in the history
Signed-off-by: morguldir <morguldir@protonmail.com>
  • Loading branch information
morguldir committed Jan 8, 2025
1 parent ef7ef9f commit 28bd3ef
Show file tree
Hide file tree
Showing 8 changed files with 1,198 additions and 90 deletions.
51 changes: 48 additions & 3 deletions src/api/client/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,31 @@
mod v3;
mod v4;
mod v5;

use conduwuit::{
utils::stream::{BroadbandExt, ReadyExt},
utils::{
stream::{BroadbandExt, ReadyExt},
IterStream,
},
PduCount,
};
use futures::StreamExt;
use ruma::{RoomId, UserId};
use ruma::{
directory::RoomTypeFilter,
events::TimelineEventType::{
self, Beacon, CallInvite, PollStart, RoomEncrypted, RoomMessage, Sticker,
},
RoomId, UserId,
};

pub(crate) use self::{v3::sync_events_route, v4::sync_events_v4_route};
pub(crate) use self::{
v3::sync_events_route, v4::sync_events_v4_route, v5::sync_events_v5_route,
};
use crate::{service::Services, Error, PduEvent, Result};

pub(crate) const DEFAULT_BUMP_TYPES: &[TimelineEventType; 6] =
&[CallInvite, PollStart, Beacon, RoomEncrypted, RoomMessage, Sticker];

async fn load_timeline(
services: &Services,
sender_user: &UserId,
Expand Down Expand Up @@ -73,3 +88,33 @@ async fn share_encrypted_room(
})
.await
}

pub(crate) async fn filter_rooms<'a>(
services: &Services,
rooms: &[&'a RoomId],
filter: &[RoomTypeFilter],
negate: bool,
) -> Vec<&'a RoomId> {
rooms
.iter()
.stream()
.filter_map(|r| async move {
let room_type = services.rooms.state_accessor.get_room_type(r).await;

if room_type.as_ref().is_err_and(|e| !e.is_not_found()) {
return None;
}

let room_type_filter = RoomTypeFilter::from(room_type.ok());

let include = if negate {
!filter.contains(&room_type_filter)
} else {
filter.is_empty() || filter.contains(&room_type_filter)
};

include.then_some(r)
})
.collect()
.await
}
82 changes: 26 additions & 56 deletions src/api/client/sync/v4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,23 @@ use ruma::{
DeviceLists, UnreadNotificationsCount,
},
},
directory::RoomTypeFilter,
events::{
room::member::{MembershipState, RoomMemberEventContent},
AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType,
TimelineEventType::{self, *},
TimelineEventType::*,
},
serde::Raw,
uint, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, UInt,
uint, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UInt,
};
use service::{rooms::read_receipt::pack_receipts, Services};
use service::rooms::read_receipt::pack_receipts;

use super::{load_timeline, share_encrypted_room};
use crate::{client::ignored_filter, Ruma};

const SINGLE_CONNECTION_SYNC: &str = "single_connection_sync";
use crate::{
client::{filter_rooms, ignored_filter, sync::v5::TodoRooms, DEFAULT_BUMP_TYPES},
Ruma,
};

const DEFAULT_BUMP_TYPES: &[TimelineEventType; 6] =
&[CallInvite, PollStart, Beacon, RoomEncrypted, RoomMessage, Sticker];
pub(crate) const SINGLE_CONNECTION_SYNC: &str = "single_connection_sync";

/// POST `/_matrix/client/unstable/org.matrix.msc3575/sync`
///
Expand Down Expand Up @@ -101,7 +100,6 @@ pub(crate) async fn sync_events_v4_route(
.rooms
.state_cache
.rooms_joined(sender_user)
.map(ToOwned::to_owned)
.collect()
.await;

Expand All @@ -113,6 +111,8 @@ pub(crate) async fn sync_events_v4_route(
.collect()
.await;

let all_invited_rooms: Vec<&RoomId> = all_invited_rooms.iter().map(AsRef::as_ref).collect();

let all_rooms = all_joined_rooms
.iter()
.chain(all_invited_rooms.iter())
Expand Down Expand Up @@ -323,7 +323,7 @@ pub(crate) async fn sync_events_v4_route(
}

let mut lists = BTreeMap::new();
let mut todo_rooms = BTreeMap::new(); // and required state
let mut todo_rooms: TodoRooms = BTreeMap::new(); // and required state

for (list_id, list) in &body.lists {
let active_rooms = match list.filters.clone().and_then(|f| f.is_invite) {
Expand All @@ -344,7 +344,7 @@ pub(crate) async fn sync_events_v4_route(
| None => active_rooms,
};

let mut new_known_rooms = BTreeSet::new();
let mut new_known_rooms: BTreeSet<OwnedRoomId> = BTreeSet::new();

let ranges = list.ranges.clone();
lists.insert(list_id.clone(), sync_events::v4::SyncList {
Expand All @@ -366,9 +366,9 @@ pub(crate) async fn sync_events_v4_route(
Vec::new()
};

new_known_rooms.extend(room_ids.iter().cloned());
new_known_rooms.extend(room_ids.clone().into_iter().map(ToOwned::to_owned));
for room_id in &room_ids {
let todo_room = todo_rooms.entry(room_id.clone()).or_insert((
let todo_room = todo_rooms.entry((*room_id).to_owned()).or_insert((
BTreeSet::new(),
0_usize,
u64::MAX,
Expand All @@ -390,7 +390,7 @@ pub(crate) async fn sync_events_v4_route(
todo_room.2 = todo_room.2.min(
known_rooms
.get(list_id.as_str())
.and_then(|k| k.get(room_id))
.and_then(|k| k.get(*room_id))
.copied()
.unwrap_or(0),
);
Expand All @@ -399,7 +399,7 @@ pub(crate) async fn sync_events_v4_route(
op: SlidingOp::Sync,
range: Some(r),
index: None,
room_ids,
room_ids: room_ids.into_iter().map(ToOwned::to_owned).collect(),
room_id: None,
}
})
Expand All @@ -409,8 +409,8 @@ pub(crate) async fn sync_events_v4_route(

if let Some(conn_id) = &body.conn_id {
services.sync.update_sync_known_rooms(
sender_user.clone(),
sender_device.clone(),
sender_user,
&sender_device,
conn_id.clone(),
list_id.clone(),
new_known_rooms,
Expand Down Expand Up @@ -455,8 +455,8 @@ pub(crate) async fn sync_events_v4_route(

if let Some(conn_id) = &body.conn_id {
services.sync.update_sync_known_rooms(
sender_user.clone(),
sender_device.clone(),
sender_user,
&sender_device,
conn_id.clone(),
"subscriptions".to_owned(),
known_subscription_rooms,
Expand All @@ -480,7 +480,8 @@ pub(crate) async fn sync_events_v4_route(
let mut timestamp: Option<_> = None;
let mut invite_state = None;
let (timeline_pdus, limited);
if all_invited_rooms.contains(room_id) {
let new_room_id: &RoomId = (*room_id).as_ref();
if all_invited_rooms.contains(&new_room_id) {
// TODO: figure out a timestamp we can use for remote invites
invite_state = services
.rooms
Expand Down Expand Up @@ -510,7 +511,7 @@ pub(crate) async fn sync_events_v4_route(
}

account_data.rooms.insert(
room_id.clone(),
room_id.to_owned(),
services
.account_data
.changes_since(Some(room_id), sender_user, *roomsince)
Expand Down Expand Up @@ -740,10 +741,9 @@ pub(crate) async fn sync_events_v4_route(
});
}

if rooms
.iter()
.all(|(_, r)| r.timeline.is_empty() && r.required_state.is_empty())
{
if rooms.iter().all(|(id, r)| {
r.timeline.is_empty() && r.required_state.is_empty() && !receipts.rooms.contains_key(id)
}) {
// Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives
let default = Duration::from_secs(30);
Expand Down Expand Up @@ -789,33 +789,3 @@ pub(crate) async fn sync_events_v4_route(
delta_token: None,
})
}

async fn filter_rooms(
services: &Services,
rooms: &[OwnedRoomId],
filter: &[RoomTypeFilter],
negate: bool,
) -> Vec<OwnedRoomId> {
rooms
.iter()
.stream()
.filter_map(|r| async move {
let room_type = services.rooms.state_accessor.get_room_type(r).await;

if room_type.as_ref().is_err_and(|e| !e.is_not_found()) {
return None;
}

let room_type_filter = RoomTypeFilter::from(room_type.ok());

let include = if negate {
!filter.contains(&room_type_filter)
} else {
filter.is_empty() || filter.contains(&room_type_filter)
};

include.then_some(r.to_owned())
})
.collect()
.await
}
Loading

0 comments on commit 28bd3ef

Please sign in to comment.