Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use SignalWebSocket #92

Merged
merged 5 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ authors = ["Gabriel Féron <g@leirbag.net>"]
edition = "2021"

[dependencies]
libsignal-service = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "456bf6eb2e4454ce35bdb4ca75b8dba7941c2923" }
libsignal-service-hyper = { git = "https://github.com/whisperfish/libsignal-service-rs.git", rev = "456bf6eb2e4454ce35bdb4ca75b8dba7941c2923" }
libsignal-service = { git = "https://github.com/whisperfish/libsignal-service-rs", rev = "ddccf7431d7fe848ef8fbed487fc0d33fe385b23" }
libsignal-service-hyper = { git = "https://github.com/whisperfish/libsignal-service-rs.git", rev = "ddccf7431d7fe848ef8fbed487fc0d33fe385b23" }

async-trait = "0.1"
base64 = "0.12"
Expand Down
4 changes: 2 additions & 2 deletions examples/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ async fn receive<C: Store + MessageStore>(
let group_changes = manager.decrypt_group_context(group_v2)?;
println!("Group v2: {:?}", group.title);
println!("Group change: {:?}", group_changes);
println!("Group master key: {:?}", hex::encode(&master_key_bytes));
println!("Group master key: {:?}", hex::encode(master_key_bytes));
}
}

Expand Down Expand Up @@ -502,7 +502,7 @@ async fn run<C: Store + MessageStore>(subcommand: Cmd, config_store: C) -> anyho
let group = manager.get_group_v2(group_master_key).await?;
println!("{:#?}", DebugGroup(&group));
for member in &group.members {
let profile_key = base64::encode(&member.profile_key.bytes);
let profile_key = base64::encode(member.profile_key.bytes);
println!("{member:#?} => profile_key = {profile_key}",);
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub enum Error {
LinkError,
#[error("missing key {0} in config DB")]
MissingKeyError(Cow<'static, str>),
#[error("message pipe not started, you need to start receiving messages before you can send anything back")]
MessagePipeNotStarted,
#[error("receiving pipe was interrupted")]
MessagePipeInterruptedError,
#[error("failed to parse contact information: {0}")]
Expand Down
84 changes: 49 additions & 35 deletions src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::UNIX_EPOCH;

use futures::{channel::mpsc, channel::oneshot, future, AsyncReadExt, Stream, StreamExt};
use futures::{channel::mpsc, channel::oneshot, future, pin_mut, AsyncReadExt, Stream, StreamExt};
use log::{debug, error, info, trace};
use rand::{distributions::Alphanumeric, prelude::ThreadRng, Rng, RngCore};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -34,6 +34,7 @@ use libsignal_service::{
receiver::MessageReceiver,
sender::{AttachmentSpec, AttachmentUploadError},
utils::{serde_private_key, serde_public_key, serde_signaling_key},
websocket::SignalWebSocket,
AccountManager, Profile, ServiceAddress,
};
use libsignal_service_hyper::push_service::HyperPushService;
Expand Down Expand Up @@ -74,7 +75,10 @@ pub struct Confirmation {
#[derive(Clone, Serialize, Deserialize)]
pub struct Registered {
#[serde(skip)]
cache: CacheCell<HyperPushService>,
push_service_cache: CacheCell<HyperPushService>,
#[serde(skip)]
websocket: Option<SignalWebSocket>,

pub signal_servers: SignalServers,
pub device_name: Option<String>,
pub phone_number: PhoneNumber,
Expand Down Expand Up @@ -304,7 +308,8 @@ impl<C: Store> Manager<C, Linking> {
let mut manager = Manager {
config_store,
state: Registered {
cache: CacheCell::default(),
push_service_cache: CacheCell::default(),
websocket: None,
signal_servers,
device_name: Some(device_name),
phone_number,
Expand Down Expand Up @@ -418,7 +423,8 @@ impl<C: Store> Manager<C, Confirmation> {
let mut manager = Manager {
config_store: self.config_store,
state: Registered {
cache: CacheCell::default(),
push_service_cache: CacheCell::default(),
websocket: None,
signal_servers: self.state.signal_servers,
device_name: None,
phone_number,
Expand Down Expand Up @@ -517,6 +523,28 @@ impl<C: Store> Manager<C, Registered> {
Ok(())
}

async fn wait_for_contacts_sync(
&mut self,
mut messages: impl Stream<Item = Content> + Unpin,
) -> Result<(), Error> {
let mut message_receiver = MessageReceiver::new(self.push_service()?);
while let Some(Content { body, .. }) = messages.next().await {
if let ContentBody::SynchronizeMessage(SyncMessage {
contacts: Some(contacts),
..
}) = body
{
let contacts = message_receiver.retrieve_contacts(&contacts).await?;
let _ = self.config_store.clear_contacts();
self.config_store
.save_contacts(contacts.filter_map(Result::ok))?;
info!("saved contacts");
return Ok(());
}
}
Ok(())
}

/// Request that the primary device to encrypt & send all of its contacts as a message to ourselves
/// which can be then received, decrypted and stored in the message receiving loop.
///
Expand All @@ -536,10 +564,18 @@ impl<C: Store> Manager<C, Registered> {
.expect("Time went backwards")
.as_millis() as u64;

// start waiting for the contacts sync
info!("waiting for contacts sync for 3 minutes");
gferon marked this conversation as resolved.
Show resolved Hide resolved
let messages = self.receive_messages().await?;
pin_mut!(messages);

// first request the sync
self.send_message(self.state.uuid, sync_message, timestamp)
.await?;

trace!("requested contacts sync");
// wait for it to arrive
self.wait_for_contacts_sync(messages).await?;

Ok(())
}

Expand Down Expand Up @@ -587,30 +623,29 @@ impl<C: Store> Manager<C, Registered> {
}

async fn receive_messages_encrypted(
&self,
&mut self,
) -> Result<impl Stream<Item = Result<Envelope, ServiceError>>, Error> {
let credentials = self.credentials()?.ok_or(Error::NotYetRegisteredError)?;
let pipe = MessageReceiver::new(self.push_service()?)
.create_message_pipe(credentials)
.await?;
self.state.websocket.replace(pipe.ws());
Ok(pipe.stream())
}

/// Starts receiving and storing messages.
///
/// Returns a [Stream] of messages to consume. Messages will also be stored by the implementation of the [MessageStore].
pub async fn receive_messages(&self) -> Result<impl Stream<Item = Content>, Error> {
pub async fn receive_messages(&mut self) -> Result<impl Stream<Item = Content>, Error> {
struct StreamState<S, C> {
encrypted_messages: S,
service_cipher: ServiceCipher<C>,
push_service: HyperPushService,
config_store: C,
}

let init = StreamState {
encrypted_messages: Box::pin(self.receive_messages_encrypted().await?),
service_cipher: self.new_service_cipher()?,
push_service: self.push_service()?,
config_store: self.config_store.clone(),
};

Expand All @@ -620,31 +655,6 @@ impl<C: Store> Manager<C, Registered> {
Some(Ok(envelope)) => {
match state.service_cipher.open_envelope(envelope).await {
// contacts synchronization sent from the primary device (happens after linking, or on demand)
Ok(Some(Content {
metadata: Metadata { .. },
body:
ContentBody::SynchronizeMessage(SyncMessage {
contacts: Some(contacts),
..
}),
..
})) => {
let mut message_receiver =
MessageReceiver::new(state.push_service.clone());
match message_receiver.retrieve_contacts(&contacts).await {
Ok(contacts_iter) => {
let _ = state.config_store.clear_contacts();
if let Err(e) = state
.config_store
.save_contacts(contacts_iter.filter_map(Result::ok))
{
error!("failed to save contacts: {}", e)
}
info!("saved contacts");
}
Err(e) => error!("failed to retrieve contacts: {}", e),
}
}
Ok(Some(content)) => {
if let Ok(thread) = Thread::try_from(&content) {
// TODO: handle reactions here, we should update the original message?
Expand Down Expand Up @@ -845,7 +855,7 @@ impl<C: Store> Manager<C, Registered> {
///
/// If no service is yet cached, it will create and cache one.
fn push_service(&self) -> Result<HyperPushService, Error> {
self.state.cache.get(|| {
self.state.push_service_cache.get(|| {
let credentials = self.credentials()?;
let service_configuration: ServiceConfiguration = self.state.signal_servers.into();

Expand All @@ -866,6 +876,10 @@ impl<C: Store> Manager<C, Registered> {
};

Ok(MessageSender::new(
self.state
.websocket
.clone()
.ok_or(Error::MessagePipeNotStarted)?,
self.push_service()?,
self.new_service_cipher()?,
rand::thread_rng(),
Expand Down
6 changes: 3 additions & 3 deletions src/store/sled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ impl SenderKeyStore for SledStore {
sender.device_id(),
distribution_id
);
self.insert(SLED_TREE_SENDER_KEYS, &key, record.serialize()?)
self.insert(SLED_TREE_SENDER_KEYS, key, record.serialize()?)
.map_err(Error::into_signal_error)
}

Expand All @@ -650,7 +650,7 @@ impl SenderKeyStore for SledStore {
sender.device_id(),
distribution_id
);
self.get(SLED_TREE_SENDER_KEYS, &key)
self.get(SLED_TREE_SENDER_KEYS, key)
.map_err(Error::into_signal_error)?
.map(|b: Vec<u8>| SenderKeyRecord::deserialize(&b))
.transpose()
Expand All @@ -670,7 +670,7 @@ impl MessageStore for SledStore {
let proto: ContentProto = message.into();

let tree = self.messages_thread_tree_name(thread);
let key = self.key(&tree, &timestamp_bytes);
let key = self.key(&tree, timestamp_bytes);

let value = proto.encode_to_vec();
let value = self.cipher.as_ref().map_or_else(
Expand Down