Skip to content

Commit

Permalink
automatically retry returning data in syncv3 (#652)
Browse files Browse the repository at this point in the history
* automatically retry returning data in syncv3

* reference service

* clippy fixes
  • Loading branch information
JadedBlueEyes authored Jan 4, 2025
1 parent e5049ca commit 8c74e35
Showing 1 changed file with 31 additions and 19 deletions.
50 changes: 31 additions & 19 deletions src/api/client/sync/v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,33 @@ pub(crate) async fn sync_events_route(
// Setup watchers, so if there's no response, we can wait for them
let watcher = services.sync.watch(sender_user, sender_device);

let response = build_sync_events(&services, &body).await?;
if body.body.full_state
|| !(response.rooms.is_empty()

This comment has been minimized.

Copy link
@sqlerrorthing

sqlerrorthing Jan 4, 2025

good

&& response.presence.is_empty()
&& response.account_data.is_empty()
&& response.device_lists.is_empty()
&& response.to_device.is_empty())
{
return Ok(response);
}

// Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives
let default = Duration::from_secs(30);
let duration = cmp::min(body.body.timeout.unwrap_or(default), default);
_ = tokio::time::timeout(duration, watcher).await;

// Retry returning data
build_sync_events(&services, &body).await
}

pub(crate) async fn build_sync_events(
services: &Services,
body: &Ruma<sync_events::v3::Request>,
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
let (sender_user, sender_device) = body.sender();

let next_batch = services.globals.current_count()?;
let next_batch_string = next_batch.to_string();

Expand Down Expand Up @@ -163,7 +190,7 @@ pub(crate) async fn sync_events_route(
.map(ToOwned::to_owned)
.broad_filter_map(|room_id| {
load_joined_room(
&services,
services,
sender_user,
sender_device,
room_id.clone(),
Expand Down Expand Up @@ -196,7 +223,7 @@ pub(crate) async fn sync_events_route(
.rooms_left(sender_user)
.broad_filter_map(|(room_id, _)| {
handle_left_room(
&services,
services,
since,
room_id.clone(),
sender_user,
Expand Down Expand Up @@ -242,7 +269,7 @@ pub(crate) async fn sync_events_route(
let presence_updates: OptionFuture<_> = services
.globals
.allow_local_presence()
.then(|| process_presence_updates(&services, since, sender_user))
.then(|| process_presence_updates(services, since, sender_user))
.into();

let account_data = services
Expand Down Expand Up @@ -292,7 +319,7 @@ pub(crate) async fn sync_events_route(
.stream()
.broad_filter_map(|user_id| async move {
let no_shared_encrypted_room =
!share_encrypted_room(&services, sender_user, &user_id, None).await;
!share_encrypted_room(services, sender_user, &user_id, None).await;
no_shared_encrypted_room.then_some(user_id)
})
.ready_fold(HashSet::new(), |mut device_list_left, user_id| {
Expand Down Expand Up @@ -327,21 +354,6 @@ pub(crate) async fn sync_events_route(
to_device: ToDevice { events: to_device_events },
};

// TODO: Retry the endpoint instead of returning
if !full_state
&& response.rooms.is_empty()
&& response.presence.is_empty()
&& response.account_data.is_empty()
&& response.device_lists.is_empty()
&& response.to_device.is_empty()
{
// Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives
let default = Duration::from_secs(30);
let duration = cmp::min(body.body.timeout.unwrap_or(default), default);
_ = tokio::time::timeout(duration, watcher).await;
}

Ok(response)
}

Expand Down

0 comments on commit 8c74e35

Please sign in to comment.