Skip to content

Commit

Permalink
ui/timeline: allow subscribing to UTDs and late-decrypt events (#3206)
Browse files Browse the repository at this point in the history
This adds a new mechanism in the UI crate (since re-attempts to decrypt happen in the timeline, as of today — later that'll happen in the event cache) to notify whenever we run into a UTD (an event couldn't be decrypted) or a late-decryption event (after some time, a UTD could be decrypted).

This new hook will deduplicate pings for the same event (identifying events on their event id), and also implements an optional grace period. If an event was a UTD:

- if it's still a UTD after the grace period, then it's reported with a `None` `time_to_decrypt`,
- if it's not a UTD anymore (i.e. it's been decrypted in the meanwhile), then it's reported with a `time_to_decrypt` set to the time it took to decrypt the event (approximate, since it starts counting from the time the timeline receives it, not the time the SDK fails to decrypt it at first).

It's configurable as an optional hook on timeline builders. For the FFI, it's configurable at the sync service's level with a "delegate", and then the sync service will forward the hook to the timelines it creates, and the hook will forward the UTD info to the delegate.

Part of element-hq/element-meta#2300.

---

* ui: add a new module and trait for subscribing to unable-to-decrypt notifications

and late decryptions (i.e. the key came in after the event that required it for decryption).

* timeline: hook in (!) the unable-to-decrypt hook

* timeline: prefix some test names with test_

* utd hook: delay reporting a UTDs

* ffi: add support for configuring the utd hook

* utd hook: switch strategy, have a single hook

And have the data structure contain extra information about late-decryption events.

* utd hook: rename `SmartUtdHook` to `UtdHookManager`

* ffi: configure the UTD hook with a grace period of 60 seconds

And ignore UTDs that have been late-decrypted in less than 4 seconds.

* utd hook: update documentation and satisfy the clippy gods

* ffi: introduce another UnableToDecryptInfo FFI struct that exposes simplified fields from the SDK's version

* review: introduce type alias for pending utd reports

* review: address other review comments
  • Loading branch information
bnjbvr authored Mar 14, 2024
1 parent 7718f90 commit 73684ab
Show file tree
Hide file tree
Showing 10 changed files with 593 additions and 26 deletions.
9 changes: 9 additions & 0 deletions bindings/matrix-sdk-ffi/src/room_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use matrix_sdk_ui::{
BoxedFilterFn,
},
timeline::default_event_filter,
unable_to_decrypt_hook::UtdHookManager,
};
use tokio::sync::RwLock;

Expand Down Expand Up @@ -107,6 +108,7 @@ impl From<RoomListInput> for matrix_sdk_ui::room_list_service::Input {
#[derive(uniffi::Object)]
pub struct RoomListService {
pub(crate) inner: Arc<matrix_sdk_ui::RoomListService>,
pub(crate) utd_hook: Option<Arc<UtdHookManager>>,
}

#[uniffi::export(async_runtime = "tokio")]
Expand All @@ -128,6 +130,7 @@ impl RoomListService {

Ok(Arc::new(RoomListItem {
inner: Arc::new(RUNTIME.block_on(async { self.inner.room(room_id).await })?),
utd_hook: self.utd_hook.clone(),
}))
}

Expand Down Expand Up @@ -478,6 +481,7 @@ impl FilterWrapper {
#[derive(uniffi::Object)]
pub struct RoomListItem {
inner: Arc<matrix_sdk_ui::room_list_service::Room>,
utd_hook: Option<Arc<UtdHookManager>>,
}

#[uniffi::export(async_runtime = "tokio")]
Expand Down Expand Up @@ -549,6 +553,11 @@ impl RoomListItem {
default_event_filter(event, room_version_id) && event_type_filter.filter(event)
});
}

if let Some(utd_hook) = self.utd_hook.clone() {
timeline_builder = timeline_builder.with_unable_to_decrypt_hook(utd_hook);
}

self.inner.init_timeline_with_builder(timeline_builder).map_err(RoomListError::from).await
}

Expand Down
100 changes: 91 additions & 9 deletions bindings/matrix-sdk-ffi/src/sync_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,18 @@
// See the License for that specific language governing permissions and
// limitations under the License.

use std::{fmt::Debug, sync::Arc};
use std::{fmt::Debug, sync::Arc, time::Duration};

use futures_util::pin_mut;
use matrix_sdk::Client;
use matrix_sdk_ui::sync_service::{
State as MatrixSyncServiceState, SyncService as MatrixSyncService,
SyncServiceBuilder as MatrixSyncServiceBuilder,
use matrix_sdk_ui::{
sync_service::{
State as MatrixSyncServiceState, SyncService as MatrixSyncService,
SyncServiceBuilder as MatrixSyncServiceBuilder,
},
unable_to_decrypt_hook::{
UnableToDecryptHook, UnableToDecryptInfo as SdkUnableToDecryptInfo, UtdHookManager,
},
};

use crate::{
Expand Down Expand Up @@ -53,12 +58,16 @@ pub trait SyncServiceStateObserver: Send + Sync + Debug {
#[derive(uniffi::Object)]
pub struct SyncService {
pub(crate) inner: Arc<MatrixSyncService>,
utd_hook: Option<Arc<UtdHookManager>>,
}

#[uniffi::export(async_runtime = "tokio")]
impl SyncService {
pub fn room_list_service(&self) -> Arc<RoomListService> {
Arc::new(RoomListService { inner: self.inner.room_list_service() })
Arc::new(RoomListService {
inner: self.inner.room_list_service(),
utd_hook: self.utd_hook.clone(),
})
}

pub async fn start(&self) {
Expand All @@ -85,11 +94,13 @@ impl SyncService {
#[derive(Clone, uniffi::Object)]
pub struct SyncServiceBuilder {
builder: MatrixSyncServiceBuilder,

utd_hook: Option<Arc<UtdHookManager>>,
}

impl SyncServiceBuilder {
pub(crate) fn new(client: Client) -> Arc<Self> {
Arc::new(Self { builder: MatrixSyncService::builder(client) })
Arc::new(Self { builder: MatrixSyncService::builder(client), utd_hook: None })
}
}

Expand All @@ -101,17 +112,88 @@ impl SyncServiceBuilder {
) -> Arc<Self> {
let this = unwrap_or_clone_arc(self);
let builder = this.builder.with_unified_invites_in_room_list(with_unified_invites);
Arc::new(Self { builder })
Arc::new(Self { builder, utd_hook: this.utd_hook })
}

pub fn with_cross_process_lock(self: Arc<Self>, app_identifier: Option<String>) -> Arc<Self> {
let this = unwrap_or_clone_arc(self);
let builder = this.builder.with_cross_process_lock(app_identifier);
Arc::new(Self { builder })
Arc::new(Self { builder, utd_hook: this.utd_hook })
}

pub fn with_utd_hook(self: Arc<Self>, delegate: Box<dyn UnableToDecryptDelegate>) -> Arc<Self> {
// UTDs detected before this duration may be reclassified as "late decryption"
// events (or discarded, if they get decrypted fast enough).
const UTD_HOOK_GRACE_PERIOD: Duration = Duration::from_secs(60);

let this = unwrap_or_clone_arc(self);
let utd_hook = Some(Arc::new(
UtdHookManager::new(Arc::new(UtdHook { delegate }))
.with_max_delay(UTD_HOOK_GRACE_PERIOD),
));
Arc::new(Self { builder: this.builder, utd_hook })
}

pub async fn finish(self: Arc<Self>) -> Result<Arc<SyncService>, ClientError> {
let this = unwrap_or_clone_arc(self);
Ok(Arc::new(SyncService { inner: Arc::new(this.builder.build().await?) }))
Ok(Arc::new(SyncService {
inner: Arc::new(this.builder.build().await?),
utd_hook: this.utd_hook,
}))
}
}

#[uniffi::export(callback_interface)]
pub trait UnableToDecryptDelegate: Sync + Send {
fn on_utd(&self, info: UnableToDecryptInfo);
}

struct UtdHook {
delegate: Box<dyn UnableToDecryptDelegate>,
}

impl std::fmt::Debug for UtdHook {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UtdHook").finish_non_exhaustive()
}
}

impl UnableToDecryptHook for UtdHook {
fn on_utd(&self, info: SdkUnableToDecryptInfo) {
const IGNORE_UTD_PERIOD: Duration = Duration::from_secs(4);

// UTDs that have been decrypted in the `IGNORE_UTD_PERIOD` are just ignored and
// not considered UTDs.
if let Some(duration) = &info.time_to_decrypt {
if *duration < IGNORE_UTD_PERIOD {
return;
}
}

// Report the UTD to the client.
self.delegate.on_utd(info.into());
}
}

#[derive(uniffi::Record)]
pub struct UnableToDecryptInfo {
/// The identifier of the event that couldn't get decrypted.
event_id: String,

/// If the event could be decrypted late (that is, the event was encrypted
/// at first, but could be decrypted later on), then this indicates the
/// time it took to decrypt the event. If it is not set, this is
/// considered a definite UTD.
///
/// If set, this is in milliseconds.
pub time_to_decrypt_ms: Option<u64>,
}

impl From<SdkUnableToDecryptInfo> for UnableToDecryptInfo {
fn from(value: SdkUnableToDecryptInfo) -> Self {
Self {
event_id: value.event_id.to_string(),
time_to_decrypt_ms: value.time_to_decrypt.map(|ttd| ttd.as_millis() as u64),
}
}
}
1 change: 1 addition & 0 deletions crates/matrix-sdk-ui/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod notification_client;
pub mod room_list_service;
pub mod sync_service;
pub mod timeline;
pub mod unable_to_decrypt_hook;

pub use self::{room_list_service::RoomListService, timeline::Timeline};

Expand Down
25 changes: 22 additions & 3 deletions crates/matrix-sdk-ui/src/timeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use super::{
queue::send_queued_messages,
BackPaginationStatus, Timeline, TimelineDropHandle,
};
use crate::unable_to_decrypt_hook::UtdHookManager;

/// Builder that allows creating and configuring various parts of a
/// [`Timeline`].
Expand All @@ -45,11 +46,29 @@ pub struct TimelineBuilder {
room: Room,
prev_token: Option<String>,
settings: TimelineInnerSettings,

/// An optional hook to call whenever we run into an unable-to-decrypt or a
/// late-decryption event.
unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
}

impl TimelineBuilder {
pub(super) fn new(room: &Room) -> Self {
Self { room: room.clone(), prev_token: None, settings: TimelineInnerSettings::default() }
Self {
room: room.clone(),
prev_token: None,
settings: TimelineInnerSettings::default(),
unable_to_decrypt_hook: None,
}
}

/// Sets up a hook to catch unable-to-decrypt (UTD) events for the timeline
/// we're building.
///
/// If it was previously set before, will overwrite the previous one.
pub fn with_unable_to_decrypt_hook(mut self, hook: Arc<UtdHookManager>) -> Self {
self.unable_to_decrypt_hook = Some(hook);
self
}

/// Add initial events to the timeline.
Expand Down Expand Up @@ -119,7 +138,7 @@ impl TimelineBuilder {
)
)]
pub async fn build(self) -> event_cache::Result<Timeline> {
let Self { room, prev_token, settings } = self;
let Self { room, prev_token, settings, unable_to_decrypt_hook } = self;

let client = room.client();
let event_cache = client.event_cache();
Expand All @@ -133,7 +152,7 @@ impl TimelineBuilder {
let has_events = !events.is_empty();
let track_read_marker_and_receipts = settings.track_read_receipts;

let mut inner = TimelineInner::new(room).with_settings(settings);
let mut inner = TimelineInner::new(room, unable_to_decrypt_hook).with_settings(settings);

if track_read_marker_and_receipts {
inner.populate_initial_user_receipt(ReceiptType::Read).await;
Expand Down
8 changes: 8 additions & 0 deletions crates/matrix-sdk-ui/src/timeline/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,14 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {
AnyMessageLikeEventContent::RoomEncrypted(c) => {
// TODO: Handle replacements if the replaced event is also UTD
self.add(true, TimelineItemContent::unable_to_decrypt(c));

// Let the hook know that we ran into an unable-to-decrypt that is added to the
// timeline.
if let Some(hook) = self.meta.unable_to_decrypt_hook.as_ref() {
if let Flow::Remote { event_id, .. } = &self.ctx.flow {
hook.on_utd(event_id);
}
}
}
AnyMessageLikeEventContent::Sticker(content) => {
self.add(should_add, TimelineItemContent::Sticker(Sticker { content }));
Expand Down
18 changes: 15 additions & 3 deletions crates/matrix-sdk-ui/src/timeline/inner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use super::{
AnnotationKey, EventSendState, EventTimelineItem, InReplyToDetails, Message, Profile,
RepliedToEvent, TimelineDetails, TimelineItem, TimelineItemContent, TimelineItemKind,
};
use crate::timeline::TimelineEventFilterFn;
use crate::{timeline::TimelineEventFilterFn, unable_to_decrypt_hook::UtdHookManager};

mod state;

Expand Down Expand Up @@ -210,8 +210,12 @@ pub fn default_event_filter(event: &AnySyncTimelineEvent, room_version: &RoomVer
}

impl<P: RoomDataProvider> TimelineInner<P> {
pub(super) fn new(room_data_provider: P) -> Self {
let state = TimelineInnerState::new(room_data_provider.room_version());
pub(super) fn new(
room_data_provider: P,
unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
) -> Self {
let state =
TimelineInnerState::new(room_data_provider.room_version(), unable_to_decrypt_hook);
Self {
state: Arc::new(RwLock::new(state)),
room_data_provider,
Expand Down Expand Up @@ -786,11 +790,13 @@ impl<P: RoomDataProvider> TimelineInner<P> {
let settings = self.settings.clone();
let room_data_provider = self.room_data_provider.clone();
let push_rules_context = room_data_provider.push_rules_and_context().await;
let unable_to_decrypt_hook = state.unable_to_decrypt_hook.clone();

matrix_sdk::executor::spawn(async move {
let retry_one = |item: Arc<TimelineItem>| {
let decryptor = decryptor.clone();
let should_retry = &should_retry;
let unable_to_decrypt_hook = unable_to_decrypt_hook.clone();
async move {
let event_item = item.as_event()?;

Expand Down Expand Up @@ -824,6 +830,12 @@ impl<P: RoomDataProvider> TimelineInner<P> {
trace!(
"Successfully decrypted event that previously failed to decrypt"
);

// Notify observers that we managed to eventually decrypt an event.
if let Some(hook) = unable_to_decrypt_hook {
hook.on_late_decrypt(&remote_event.event_id);
}

Some(event)
}
Err(e) => {
Expand Down
17 changes: 14 additions & 3 deletions crates/matrix-sdk-ui/src/timeline/inner/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::{
AnnotationKey, Error as TimelineError, Profile, ReactionSenderData, TimelineItem,
TimelineItemKind, VirtualTimelineItem,
},
unable_to_decrypt_hook::UtdHookManager,
};

#[derive(Debug)]
Expand All @@ -63,13 +64,16 @@ pub(in crate::timeline) struct TimelineInnerState {
}

impl TimelineInnerState {
pub(super) fn new(room_version: RoomVersionId) -> Self {
pub(super) fn new(
room_version: RoomVersionId,
unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
) -> Self {
Self {
// Upstream default capacity is currently 16, which is making
// sliding-sync tests with 20 events lag. This should still be
// small enough.
items: ObservableVector::with_capacity(32),
meta: TimelineInnerMetadata::new(room_version),
meta: TimelineInnerMetadata::new(room_version, unable_to_decrypt_hook),
}
}

Expand Down Expand Up @@ -806,10 +810,16 @@ pub(in crate::timeline) struct TimelineInnerMetadata {
///
/// Private because it's not needed by `TimelineEventHandler`.
back_pagination_tokens: VecDeque<(OwnedEventId, String)>,

/// The hook to call whenever we run into a unable-to-decrypt event.
pub(crate) unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
}

impl TimelineInnerMetadata {
fn new(room_version: RoomVersionId) -> TimelineInnerMetadata {
fn new(
room_version: RoomVersionId,
unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
) -> Self {
Self {
all_events: Default::default(),
next_internal_id: Default::default(),
Expand All @@ -824,6 +834,7 @@ impl TimelineInnerMetadata {
in_flight_reaction: Default::default(),
room_version,
back_pagination_tokens: VecDeque::new(),
unable_to_decrypt_hook,
}
}

Expand Down
Loading

0 comments on commit 73684ab

Please sign in to comment.