Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
cdata committed Oct 18, 2023
1 parent 9e5c83b commit 17c9a65
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 5 deletions.
3 changes: 2 additions & 1 deletion rust/noosphere-gateway/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ where

let ipfs_client = KuboClient::new(&ipfs_api)?;

let (syndication_tx, syndication_task) = start_ipfs_syndication::<C, S>(ipfs_api.clone());
let (syndication_tx, syndication_task) =
start_ipfs_syndication::<C, S>(ipfs_api.clone(), vec![sphere_context.clone()]);
let (name_system_tx, name_system_task) = start_name_system::<C, S>(
NameSystemConfiguration {
connection_type: NameSystemConnectionType::Remote(name_resolver_api),
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-gateway/src/worker/name_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ where
loop {
for local_sphere in local_spheres.iter() {
if let Err(error) = periodic_publish_record(&tx, local_sphere).await {
error!("Could not publish record: {}", error);
error!("Periodic re-publish of name record failed: {}", error);
};
}
tokio::time::sleep(Duration::from_secs(PERIODIC_PUBLISH_INTERVAL_SECONDS)).await;
Expand Down
62 changes: 59 additions & 3 deletions rust/noosphere-gateway/src/worker/syndication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,76 @@ impl SyndicationCheckpoint {
}
}

// Re-syndicate every 90 days
// Force full re-syndicate every 90 days
const MAX_SYNDICATION_CHECKPOINT_LIFETIME: Duration = Duration::from_secs(60 * 60 * 24 * 90);

// Periodic syndication check every 5 minutes
const PERIODIC_SYNDICATION_INTERVAL_SECONDS: Duration = Duration::from_secs(5 * 60);

/// Start a Tokio task that waits for [SyndicationJob] messages and then
/// attempts to syndicate to the configured IPFS RPC. Currently only Kubo IPFS
/// backends are supported.
pub fn start_ipfs_syndication<C, S>(
ipfs_api: Url,
local_spheres: Vec<C>,
) -> (UnboundedSender<SyndicationJob<C>>, JoinHandle<Result<()>>)
where
C: HasMutableSphereContext<S> + 'static,
S: Storage + 'static,
{
let (tx, rx) = unbounded_channel();

(tx, tokio::task::spawn(ipfs_syndication_task(ipfs_api, rx)))
let task = {
let tx = tx.clone();
tokio::task::spawn(async move {
let (_, syndication_result) = tokio::join!(
periodic_syndication_task(tx, local_spheres),
ipfs_syndication_task(ipfs_api, rx)
);
syndication_result?;
Ok(())
})
};

(tx, task)
}

async fn periodic_syndication_task<C, S>(
tx: UnboundedSender<SyndicationJob<C>>,
local_spheres: Vec<C>,
) where
C: HasMutableSphereContext<S>,
S: Storage + 'static,
{
loop {
for local_sphere in &local_spheres {
if let Err(error) = periodic_syndication(&tx, local_sphere).await {
error!("Periodic syndication of sphere history failed: {}", error);
};
tokio::time::sleep(Duration::from_secs(5)).await;
}
tokio::time::sleep(PERIODIC_SYNDICATION_INTERVAL_SECONDS).await;
}
}

async fn periodic_syndication<C, S>(
tx: &UnboundedSender<SyndicationJob<C>>,
local_sphere: &C,
) -> Result<()>
where
C: HasMutableSphereContext<S>,
S: Storage + 'static,
{
let latest_version = local_sphere.version().await?;

if let Err(error) = tx.send(SyndicationJob {
revision: latest_version,
context: local_sphere.clone(),
}) {
warn!("Failed to request periodic syndication: {}", error);
};

Ok(())
}

async fn ipfs_syndication_task<C, S>(
Expand Down Expand Up @@ -173,6 +227,7 @@ where
&sphere_revision,
syndication_checkpoint.last_syndicated_version.as_ref(),
)
.exclude_past()
.to_chronological()
.await?;

Expand Down Expand Up @@ -281,7 +336,8 @@ mod tests {
let ipfs_url = Url::parse("http://127.0.0.1:5001")?;
let local_kubo_client = KuboClient::new(&ipfs_url.clone())?;

let (syndication_tx, _syndication_join_handle) = start_ipfs_syndication::<_, _>(ipfs_url);
let (syndication_tx, _syndication_join_handle) =
start_ipfs_syndication::<_, _>(ipfs_url, vec![user_sphere_context.clone()]);

user_sphere_context
.write("foo", &ContentType::Text, b"bar".as_ref(), None)
Expand Down

0 comments on commit 17c9a65

Please sign in to comment.