diff --git a/rust/noosphere-core/src/api/client.rs b/rust/noosphere-core/src/api/client.rs index fdf208d0c..91ac5c780 100644 --- a/rust/noosphere-core/src/api/client.rs +++ b/rust/noosphere-core/src/api/client.rs @@ -254,7 +254,10 @@ where .stream() .map(|block| match block { Ok(block) => Ok(block), - Err(error) => Err(anyhow!(error)), + Err(error) => { + warn!("Replication stream ended prematurely"); + Err(anyhow!(error)) + } }), ) } diff --git a/rust/noosphere-gateway/src/gateway.rs b/rust/noosphere-gateway/src/gateway.rs index 0b1f2665a..57e7f8145 100644 --- a/rust/noosphere-gateway/src/gateway.rs +++ b/rust/noosphere-gateway/src/gateway.rs @@ -78,7 +78,8 @@ where let ipfs_client = KuboClient::new(&ipfs_api)?; - let (syndication_tx, syndication_task) = start_ipfs_syndication::(ipfs_api.clone()); + let (syndication_tx, syndication_task) = + start_ipfs_syndication::(ipfs_api.clone(), vec![sphere_context.clone()]); let (name_system_tx, name_system_task) = start_name_system::( NameSystemConfiguration { connection_type: NameSystemConnectionType::Remote(name_resolver_api), diff --git a/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs b/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs index 06fdc442d..00a1df668 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs @@ -105,7 +105,7 @@ where self.incorporate_history(&push_body).await?; self.synchronize_names(&push_body).await?; - let (next_version, new_blocks) = self.update_gateway_sphere().await?; + let (next_version, new_blocks) = self.update_gateway_sphere(&push_body).await?; // These steps are order-independent let _ = tokio::join!( @@ -320,19 +320,17 @@ where /// synchronize the pusher with the latest local history. async fn update_gateway_sphere( &mut self, + push_body: &PushBody, ) -> Result<(Link, impl Stream)>>), PushError> { debug!("Updating the gateway's sphere..."); - // NOTE CDATA: "Previous version" doesn't cover all cases; this needs to be a version given - // in the push body, or else we don't know how far back we actually have to go (e.g., the name - // system may have created a new version in the mean time. - let previous_version = self.sphere_context.version().await?; + let previous_version = push_body.counterpart_tip.as_ref(); let next_version = SphereCursor::latest(self.sphere_context.clone()) .save(None) .await?; let db = self.sphere_context.sphere_context().await?.db().clone(); - let block_stream = memo_history_stream(db, &next_version, Some(&previous_version), false); + let block_stream = memo_history_stream(db, &next_version, previous_version, false); Ok((next_version, block_stream)) } diff --git a/rust/noosphere-gateway/src/worker/name_system.rs b/rust/noosphere-gateway/src/worker/name_system.rs index a64c4a11b..09589022a 100644 --- a/rust/noosphere-gateway/src/worker/name_system.rs +++ b/rust/noosphere-gateway/src/worker/name_system.rs @@ -130,7 +130,7 @@ where loop { for local_sphere in local_spheres.iter() { if let Err(error) = periodic_publish_record(&tx, local_sphere).await { - error!("Could not publish record: {}", error); + error!("Periodic re-publish of link record failed: {}", error); }; } tokio::time::sleep(Duration::from_secs(PERIODIC_PUBLISH_INTERVAL_SECONDS)).await; @@ -153,7 +153,7 @@ where record, republish: true, }) { - warn!("Failed to request name record publish: {}", error); + warn!("Failed to request link record publish: {}", error); } } _ => { @@ -393,7 +393,7 @@ where // TODO(#260): What if the resolved value is None? Some(record) if last_known_record != next_record => { debug!( - "Gateway adopting petname record for '{}' ({}): {}", + "Gateway adopting petname link record for '{}' ({}): {}", name, identity.did, record ); @@ -404,7 +404,7 @@ where } if let Err(e) = context.set_petname_record(&name, record).await { - warn!("Could not set petname record: {}", e); + warn!("Could not set petname link record: {}", e); continue; } } @@ -419,7 +419,7 @@ where Ok(()) } -/// Attempts to fetch a single name record from the name system. +/// Attempts to fetch a single link record from the name system. async fn fetch_record( client: Arc, name: String, diff --git a/rust/noosphere-gateway/src/worker/syndication.rs b/rust/noosphere-gateway/src/worker/syndication.rs index 66c138caf..9bbfe01af 100644 --- a/rust/noosphere-gateway/src/worker/syndication.rs +++ b/rust/noosphere-gateway/src/worker/syndication.rs @@ -39,19 +39,17 @@ pub struct SyndicationJob { } /// A [SyndicationCheckpoint] represents the last spot in the history of a -/// sphere that was successfully syndicated to an IPFS node. It records a Bloom -/// filter populated by the CIDs of all blocks that have been syndicated, which -/// gives us a short-cut to determine if a block should be added. +/// sphere that was successfully syndicated to an IPFS node. #[derive(Serialize, Deserialize)] pub struct SyndicationCheckpoint { - pub last_syndicated_version: Option>, + pub last_syndicated_counterpart_version: Option>, pub syndication_epoch: u64, } impl SyndicationCheckpoint { pub fn new() -> Result { Ok(Self { - last_syndicated_version: None, + last_syndicated_counterpart_version: None, syndication_epoch: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(), }) } @@ -70,14 +68,18 @@ impl SyndicationCheckpoint { } } -// Re-syndicate every 90 days +// Force full re-syndicate every 90 days const MAX_SYNDICATION_CHECKPOINT_LIFETIME: Duration = Duration::from_secs(60 * 60 * 24 * 90); +// Periodic syndication check every 5 minutes +const PERIODIC_SYNDICATION_INTERVAL_SECONDS: Duration = Duration::from_secs(5 * 60); + /// Start a Tokio task that waits for [SyndicationJob] messages and then /// attempts to syndicate to the configured IPFS RPC. Currently only Kubo IPFS /// backends are supported. pub fn start_ipfs_syndication( ipfs_api: Url, + local_spheres: Vec, ) -> (UnboundedSender>, JoinHandle>) where C: HasMutableSphereContext + 'static, @@ -85,7 +87,57 @@ where { let (tx, rx) = unbounded_channel(); - (tx, tokio::task::spawn(ipfs_syndication_task(ipfs_api, rx))) + let task = { + let tx = tx.clone(); + tokio::task::spawn(async move { + let (_, syndication_result) = tokio::join!( + periodic_syndication_task(tx, local_spheres), + ipfs_syndication_task(ipfs_api, rx) + ); + syndication_result?; + Ok(()) + }) + }; + + (tx, task) +} + +async fn periodic_syndication_task( + tx: UnboundedSender>, + local_spheres: Vec, +) where + C: HasMutableSphereContext, + S: Storage + 'static, +{ + loop { + for local_sphere in &local_spheres { + if let Err(error) = periodic_syndication(&tx, local_sphere).await { + error!("Periodic syndication of sphere history failed: {}", error); + }; + tokio::time::sleep(Duration::from_secs(5)).await; + } + tokio::time::sleep(PERIODIC_SYNDICATION_INTERVAL_SECONDS).await; + } +} + +async fn periodic_syndication( + tx: &UnboundedSender>, + local_sphere: &C, +) -> Result<()> +where + C: HasMutableSphereContext, + S: Storage + 'static, +{ + let latest_version = local_sphere.version().await?; + + if let Err(error) = tx.send(SyndicationJob { + revision: latest_version, + context: local_sphere.clone(), + }) { + warn!("Failed to request periodic syndication: {}", error); + }; + + Ok(()) } async fn ipfs_syndication_task( @@ -165,14 +217,23 @@ where None => SyndicationCheckpoint::new()?, }; + if Some(counterpart_revision) == syndication_checkpoint.last_syndicated_counterpart_version + { + warn!("Counterpart version hasn't changed; skipping syndication"); + return Ok(()); + } + (counterpart_revision, syndication_checkpoint, db) }; let timeline = Timeline::new(&db) .slice( &sphere_revision, - syndication_checkpoint.last_syndicated_version.as_ref(), + syndication_checkpoint + .last_syndicated_counterpart_version + .as_ref(), ) + .exclude_past() .to_chronological() .await?; @@ -204,7 +265,7 @@ where { Ok(_) => { debug!("Syndicated sphere revision {} to IPFS", cid); - syndication_checkpoint.last_syndicated_version = Some(cid); + syndication_checkpoint.last_syndicated_counterpart_version = Some(cid); } Err(error) => warn!( "Failed to pin orphans for revision {} to IPFS: {:?}", @@ -281,7 +342,8 @@ mod tests { let ipfs_url = Url::parse("http://127.0.0.1:5001")?; let local_kubo_client = KuboClient::new(&ipfs_url.clone())?; - let (syndication_tx, _syndication_join_handle) = start_ipfs_syndication::<_, _>(ipfs_url); + let (syndication_tx, _syndication_join_handle) = + start_ipfs_syndication::<_, _>(ipfs_url, vec![user_sphere_context.clone()]); user_sphere_context .write("foo", &ContentType::Text, b"bar".as_ref(), None) diff --git a/rust/noosphere-storage/examples/bench/main.rs b/rust/noosphere-storage/examples/bench/main.rs index 36484568e..bacdd922c 100644 --- a/rust/noosphere-storage/examples/bench/main.rs +++ b/rust/noosphere-storage/examples/bench/main.rs @@ -132,7 +132,7 @@ impl BenchmarkStorage { ))] let (storage, storage_name) = { ( - noosphere_storage::SledStorage::new(&storage_path)?, + noosphere_storage::SledStorage::new(storage_path)?, "SledDbStorage", ) }; diff --git a/rust/noosphere-storage/src/retry.rs b/rust/noosphere-storage/src/retry.rs index c83a41d26..697ff04ba 100644 --- a/rust/noosphere-storage/src/retry.rs +++ b/rust/noosphere-storage/src/retry.rs @@ -7,11 +7,11 @@ use tokio::select; use crate::BlockStore; const DEFAULT_MAX_RETRIES: u32 = 2u32; -const DEFAULT_TIMEOUT: Duration = Duration::from_millis(1500); -const DEFAULT_MINIMUM_DELAY: Duration = Duration::from_secs(1); -const DEFAULT_BACKOFF: Backoff = Backoff::Exponential { - exponent: 2f32, - ceiling: Duration::from_secs(6), +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1); +const DEFAULT_MINIMUM_DELAY: Duration = Duration::from_millis(100); +const DEFAULT_BACKOFF: Backoff = Backoff::Linear { + increment: Duration::from_secs(1), + ceiling: Duration::from_secs(3), }; /// Backoff configuration used to define how [BlockStoreRetry] should time @@ -127,17 +127,28 @@ where }; }, _ = tokio::time::sleep(next_timeout) => { - warn!("Timed out trying to get {} after {} seconds...", cid, next_timeout.as_secs()); + warn!("Timed out trying to get {} after {} seconds...", cid, next_timeout.as_secs_f32()); } } let spent_window_time = Instant::now() - window_start; - let remaining_window_time = spent_window_time.max(self.minimum_delay); + + // NOTE: Be careful here; `Duration` will overflow when dealing with + // negative values so these operations are effectively fallible. + // https://doc.rust-lang.org/std/time/struct.Duration.html#panics-7 + let remaining_window_time = self.attempt_window + - spent_window_time + .max(self.minimum_delay) + .min(self.attempt_window); retry_count += 1; if let Some(backoff) = &self.backoff { next_timeout = backoff.step(next_timeout); + trace!( + "Next timeout will be {} seconds", + next_timeout.as_secs_f32() + ); } tokio::time::sleep(remaining_window_time).await;