Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Add Docs<S> which wraps the Engine<S> #18

Merged
merged 8 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ futures-util = { version = "0.3.25" }
hex = "0.4"
iroh-base = { version = "0.29" }
iroh-blobs = { version = "0.29.0", optional = true, features = ["downloader"] }
iroh-gossip = { version = "0.29.0", optional = true }
iroh-gossip = { version = "0.29.0", optional = true, features = ["net"] }
iroh-metrics = { version = "0.29.0", default-features = false }
iroh = { version = "0.29", optional = true }
num_enum = "0.7"
Expand Down
22 changes: 9 additions & 13 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,21 @@ const SUBSCRIBE_CHANNEL_CAP: usize = 256;

/// The sync engine coordinates actors that manage open documents, set-reconciliation syncs with
/// peers and a gossip swarm for each syncing document.
#[derive(derive_more::Debug, Clone)]
#[derive(derive_more::Debug)]
pub struct Engine<D> {
/// [`Endpoint`] used by the engine.
pub endpoint: Endpoint,
/// Handle to the actor thread.
pub sync: SyncHandle,
/// The persistent default author for this engine.
pub default_author: Arc<DefaultAuthor>,
pub default_author: DefaultAuthor,
to_live_actor: mpsc::Sender<ToLiveActor>,
#[allow(dead_code)]
actor_handle: Arc<AbortOnDropHandle<()>>,
actor_handle: AbortOnDropHandle<()>,
#[debug("ContentStatusCallback")]
content_status_cb: ContentStatusCallback,
local_pool_handle: LocalPoolHandle,
blob_store: D,
#[cfg(feature = "rpc")]
pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
}

impl<D: iroh_blobs::store::Store> Engine<D> {
Expand Down Expand Up @@ -116,24 +114,22 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
endpoint,
sync,
to_live_actor: live_actor_tx,
actor_handle: Arc::new(AbortOnDropHandle::new(actor_handle)),
actor_handle: AbortOnDropHandle::new(actor_handle),
content_status_cb,
default_author: Arc::new(default_author),
default_author,
local_pool_handle,
blob_store: bao_store,
#[cfg(feature = "rpc")]
rpc_handler: Default::default(),
})
}

/// Return a callback that can be added to blobs to protect the content of
/// all docs from garbage collection.
pub fn protect_cb(&self) -> ProtectCb {
let this = self.clone();
let sync = self.sync.clone();
Box::new(move |live| {
let this = this.clone();
let sync = sync.clone();
Box::pin(async move {
let doc_hashes = match this.sync.content_hashes().await {
let doc_hashes = match sync.content_hashes().await {
Ok(hashes) => hashes,
Err(err) => {
tracing::warn!("Error getting doc hashes: {}", err);
Expand Down Expand Up @@ -202,7 +198,7 @@ impl<D: iroh_blobs::store::Store> Engine<D> {

// Create a future that sends channel senders to the respective actors.
// We clone `self` so that the future does not capture any lifetimes.
let this = self.clone();
let this = self;

// Subscribe to insert events from the replica.
let a = {
Expand Down
112 changes: 108 additions & 4 deletions src/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,127 @@
//! [`ProtocolHandler`] implementation for the docs [`Engine`].

use std::{path::PathBuf, sync::Arc};

use anyhow::Result;
use futures_lite::future::Boxed as BoxedFuture;
use iroh::{endpoint::Connecting, protocol::ProtocolHandler};
use iroh_blobs::net_protocol::{Blobs, ProtectCb};
use iroh_gossip::net::Gossip;

use crate::engine::Engine;
use crate::{
engine::{DefaultAuthorStorage, Engine},
store::Store,
};

impl<D: iroh_blobs::store::Store> ProtocolHandler for Engine<D> {
impl<S: iroh_blobs::store::Store> ProtocolHandler for Docs<S> {
fn accept(&self, conn: Connecting) -> BoxedFuture<Result<()>> {
let this = self.clone();
let this = self.engine.clone();
Box::pin(async move { this.handle_connection(conn).await })
}

fn shutdown(&self) -> BoxedFuture<()> {
let this = self.clone();
let this = self.engine.clone();
Box::pin(async move {
if let Err(err) = this.shutdown().await {
tracing::warn!("shutdown error: {:?}", err);
}
})
}
}

/// Docs protocol.
#[derive(Debug, Clone)]
pub struct Docs<S> {
engine: Arc<Engine<S>>,
rklaehn marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(feature = "rpc")]
pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
}

impl Docs<()> {
/// Create a new [`Builder`] for the docs protocol, using in memory replica and author storage.
pub fn memory() -> Builder {
Builder::default()
}

/// Create a new [`Builder`] for the docs protocol, using a persistent replica and author storage
/// in the given directory.
pub fn persistent(path: PathBuf) -> Builder {
Builder { path: Some(path) }
}
}

impl<S: iroh_blobs::store::Store> Docs<S> {
/// Get an in memory client to interact with the docs engine.
#[cfg(feature = "rpc")]
pub fn client(&self) -> &crate::rpc::client::docs::MemClient {
&self
.rpc_handler
.get_or_init(|| crate::rpc::RpcHandler::new(self.engine.clone()))
.client
}

/// Create a new docs protocol with the given engine.
///
/// Note that usually you would use the [`Builder`] to create a new docs protocol.
pub fn new(engine: Engine<S>) -> Self {
Self {
engine: Arc::new(engine),
#[cfg(feature = "rpc")]
rpc_handler: Default::default(),
}
}

/// Handle a docs request from the RPC server.
#[cfg(feature = "rpc")]
pub async fn handle_rpc_request<
C: quic_rpc::server::ChannelTypes<crate::rpc::proto::RpcService>,
>(
self,
msg: crate::rpc::proto::Request,
chan: quic_rpc::server::RpcChannel<crate::rpc::proto::RpcService, C>,
) -> Result<(), quic_rpc::server::RpcServerError<C>> {
crate::rpc::Handler(self.engine.clone())
.handle_rpc_request(msg, chan)
.await
}

/// Get the protect callback for the docs engine.
pub fn protect_cb(&self) -> ProtectCb {
self.engine.protect_cb()
}
}

/// Builder for the docs protocol.
#[derive(Debug, Default)]
pub struct Builder {
path: Option<PathBuf>,
}

impl Builder {
/// Build a [`Docs`] protocol given a [`Blobs`] and [`Gossip`] protocol.
pub async fn spawn<S: iroh_blobs::store::Store>(
self,
blobs: &Blobs<S>,
gossip: &Gossip,
) -> anyhow::Result<Docs<S>> {
let replica_store = match self.path {
Some(ref path) => Store::persistent(path.join("docs.redb"))?,
None => Store::memory(),
};
let author_store = match self.path {
Some(ref path) => DefaultAuthorStorage::Persistent(path.join("default-author")),
None => DefaultAuthorStorage::Mem,
};
let engine = Engine::spawn(
blobs.endpoint().clone(),
gossip.clone(),
replica_store,
blobs.store().clone(),
blobs.downloader().clone(),
author_store,
blobs.rt().clone(),
)
.await?;
Ok(Docs::new(engine))
}
}
25 changes: 15 additions & 10 deletions src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Quic RPC implementation for docs.

use std::{ops::Deref, sync::Arc};

use proto::{Request, RpcService};
use quic_rpc::{
server::{ChannelTypes, RpcChannel},
Expand All @@ -17,15 +19,18 @@ mod docs_handle_request;
type RpcError = serde_error::Error;
type RpcResult<T> = std::result::Result<T, RpcError>;

impl<D: iroh_blobs::store::Store> Engine<D> {
/// Get an in memory client to interact with the docs engine.
pub fn client(&self) -> &client::docs::MemClient {
&self
.rpc_handler
.get_or_init(|| RpcHandler::new(self))
.client
#[derive(Debug, Clone)]
pub(crate) struct Handler<S>(pub(crate) Arc<Engine<S>>);

impl<S> Deref for Handler<S> {
type Target = Engine<S>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<D: iroh_blobs::store::Store> Handler<D> {
/// Handle a docs request from the RPC server.
pub async fn handle_rpc_request<C: ChannelTypes<RpcService>>(
self,
Expand Down Expand Up @@ -80,14 +85,14 @@ impl<D: iroh_blobs::store::Store> Engine<D> {
#[derive(Debug)]
pub(crate) struct RpcHandler {
/// Client to hand out
client: client::docs::MemClient,
pub(crate) client: client::docs::MemClient,
/// Handler task
_handler: AbortOnDropHandle<()>,
}

impl RpcHandler {
fn new<D: iroh_blobs::store::Store>(engine: &Engine<D>) -> Self {
let engine = engine.clone();
pub fn new<D: iroh_blobs::store::Store>(engine: Arc<Engine<D>>) -> Self {
let engine = Handler(engine);
let (listener, connector) = quic_rpc::transport::flume::channel(1);
let listener = RpcServer::new(listener);
let client = client::docs::MemClient::new(RpcClient::new(connector));
Expand Down
6 changes: 3 additions & 3 deletions src/rpc/docs_handle_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ use super::{
SetRequest, SetResponse, ShareRequest, ShareResponse, StartSyncRequest, StartSyncResponse,
StatusRequest, StatusResponse,
},
RpcError, RpcResult,
Handler, RpcError, RpcResult,
};
use crate::{engine::Engine, Author, DocTicket, NamespaceSecret};
use crate::{Author, DocTicket, NamespaceSecret};

/// Capacity for the flume channels to forward sync store iterators to async RPC streams.
const ITER_CHANNEL_CAP: usize = 64;

impl<D: iroh_blobs::store::Store> Engine<D> {
impl<D: iroh_blobs::store::Store> Handler<D> {
pub(super) async fn author_create(
self,
_req: AuthorCreateRequest,
Expand Down
54 changes: 11 additions & 43 deletions tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ use std::{
net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6},
ops::Deref,
path::{Path, PathBuf},
sync::Arc,
};

use iroh::{discovery::Discovery, dns::DnsResolver, key::SecretKey, NodeId, RelayMode};
use iroh_blobs::{
net_protocol::Blobs,
store::{GcConfig, Store as BlobStore},
util::local_pool::LocalPool,
};
use iroh_docs::protocol::Docs;
use iroh_gossip::net::Gossip;
use nested_enum_utils::enum_conversions;
use quic_rpc::transport::{Connector, Listener};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -134,58 +136,24 @@ impl<S: BlobStore> Builder<S> {
builder = builder.dns_resolver(dns_resolver);
}
let endpoint = builder.bind().await?;
let addr = endpoint.node_addr().await?;
let local_pool = LocalPool::single();
let mut router = iroh::protocol::Router::builder(endpoint.clone());

// Setup blobs
let downloader = iroh_blobs::downloader::Downloader::new(
store.clone(),
endpoint.clone(),
local_pool.handle().clone(),
);
let blobs = iroh_blobs::net_protocol::Blobs::new(
store.clone(),
local_pool.handle().clone(),
Default::default(),
downloader.clone(),
endpoint.clone(),
);
let gossip = iroh_gossip::net::Gossip::from_endpoint(
endpoint.clone(),
Default::default(),
&addr.info,
);
let replica_store = match self.path {
Some(ref path) => iroh_docs::store::Store::persistent(path.join("docs.redb"))?,
None => iroh_docs::store::Store::memory(),
};
let author_store = match self.path {
Some(ref path) => {
iroh_docs::engine::DefaultAuthorStorage::Persistent(path.join("default-author"))
}
None => iroh_docs::engine::DefaultAuthorStorage::Mem,
let blobs = Blobs::builder(store.clone()).build(&local_pool, &endpoint);
let gossip = Gossip::builder().spawn(endpoint.clone()).await?;
let builder = match self.path {
Some(ref path) => Docs::persistent(path.to_path_buf()),
None => Docs::memory(),
};
let docs = match iroh_docs::engine::Engine::spawn(
endpoint,
gossip.clone(),
replica_store,
store.clone(),
downloader,
author_store,
local_pool.handle().clone(),
)
.await
{
let docs = match builder.spawn(&blobs, &gossip).await {
Ok(docs) => docs,
Err(err) => {
store.shutdown().await;
return Err(err);
}
};
router = router.accept(iroh_blobs::ALPN, blobs.clone());
router = router.accept(iroh_docs::ALPN, Arc::new(docs.clone()));
router = router.accept(iroh_gossip::ALPN, Arc::new(gossip.clone()));
router = router.accept(iroh_docs::ALPN, docs.clone());
router = router.accept(iroh_gossip::ALPN, gossip.clone());

// Build the router
let router = router.spawn().await?;
Expand Down
Loading