Skip to content

Commit

Permalink
feat: Unpublish device if session cannot be started
Browse files Browse the repository at this point in the history
Refs #19
  • Loading branch information
nesium committed Apr 29, 2024
1 parent f156c79 commit d68bdad
Showing 1 changed file with 52 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use anyhow::{anyhow, bail, Context, Result};
use async_trait::async_trait;
use futures::future::join_all;
use rand::prelude::SliceRandom;
use tracing::{error, info};
use tracing::{error, info, warn};

use prose_proc_macros::DependenciesStruct;
use prose_xmpp::TimeProvider;
Expand Down Expand Up @@ -631,7 +631,16 @@ impl EncryptionDomainService {
.await?;

join_all(device_ids.into_iter().map(|device_id| async move {
self.start_session_with_device_if_needed(user_id, device_id.clone())
if self
.session_repo
.get_session(user_id, &device_id)
.await?
.is_some()
{
return Ok(());
}

self.start_session_with_device(user_id, device_id.clone())
.await
.with_context(|| format!("Failed to start session with {user_id} ({})", device_id))
}))
Expand All @@ -642,20 +651,7 @@ impl EncryptionDomainService {
Ok(())
}

async fn start_session_with_device_if_needed(
&self,
user_id: &UserId,
device_id: DeviceId,
) -> Result<()> {
if self
.session_repo
.get_session(user_id, &device_id)
.await?
.is_some()
{
return Ok(());
}

async fn start_session_with_device(&self, user_id: &UserId, device_id: DeviceId) -> Result<()> {
info!("Starting OMEMO session with {user_id} ({device_id})…");

let Some(bundle) = self
Expand All @@ -665,6 +661,11 @@ impl EncryptionDomainService {
.with_context(|| format!("Failed to load device bundle for {user_id} ({device_id})"))?
else {
info!("No device bundle found for {user_id} ({device_id}).");

if user_id == &self.ctx.connected_id()?.into_user_id() {
_ = self.unpublish_device(&device_id).await
}

return Ok(());
};

Expand All @@ -679,12 +680,20 @@ impl EncryptionDomainService {
.clone(),
};

self.encryption_service
match self
.encryption_service
.process_pre_key_bundle(&user_id, pre_key_bundle)
.await
.with_context(|| {
format!("Failed to process PreKey bundle for {user_id} ({device_id})")
})?;
.with_context(|| format!("Failed to process PreKey bundle for {user_id} ({device_id})"))
{
Ok(_) => (),
Err(err) => {
if user_id == &self.ctx.connected_id()?.into_user_id() {
_ = self.unpublish_device(&device_id).await
}
return Err(err);
}
}

Ok(())
}
Expand All @@ -701,4 +710,27 @@ impl EncryptionDomainService {
})
.collect())
}

async fn unpublish_device(&self, device_id: &DeviceId) -> Result<()> {
let mut devices = self
.user_device_repo
.get_all(&self.ctx.connected_id()?.into_user_id())
.await?;
let num_devices = devices.len();

devices.retain(|device| &device.id != device_id);

if devices.len() == num_devices {
warn!("Could not find device {device_id} for removal.");
return Ok(());
}

info!("Removing device {device_id} from our list of devices…");
self.user_device_service
.publish_device_list(DeviceList { devices })
.await
.context("Failed to publish our device list")?;

Ok(())
}
}

0 comments on commit d68bdad

Please sign in to comment.