Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
feat: Publish to Kubo at intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
cdata committed Oct 23, 2023
1 parent 9e5c83b commit f7dd38d
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 31 deletions.
5 changes: 4 additions & 1 deletion rust/noosphere-core/src/api/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,10 @@ where
.stream()
.map(|block| match block {
Ok(block) => Ok(block),
Err(error) => Err(anyhow!(error)),
Err(error) => {
warn!("Replication stream ended prematurely");
Err(anyhow!(error))
}
}),
)
}
Expand Down
3 changes: 2 additions & 1 deletion rust/noosphere-gateway/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ where

let ipfs_client = KuboClient::new(&ipfs_api)?;

let (syndication_tx, syndication_task) = start_ipfs_syndication::<C, S>(ipfs_api.clone());
let (syndication_tx, syndication_task) =
start_ipfs_syndication::<C, S>(ipfs_api.clone(), vec![sphere_context.clone()]);
let (name_system_tx, name_system_task) = start_name_system::<C, S>(
NameSystemConfiguration {
connection_type: NameSystemConnectionType::Remote(name_resolver_api),
Expand Down
10 changes: 4 additions & 6 deletions rust/noosphere-gateway/src/handlers/v0alpha2/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ where
self.incorporate_history(&push_body).await?;
self.synchronize_names(&push_body).await?;

let (next_version, new_blocks) = self.update_gateway_sphere().await?;
let (next_version, new_blocks) = self.update_gateway_sphere(&push_body).await?;

// These steps are order-independent
let _ = tokio::join!(
Expand Down Expand Up @@ -320,19 +320,17 @@ where
/// synchronize the pusher with the latest local history.
async fn update_gateway_sphere(
&mut self,
push_body: &PushBody,
) -> Result<(Link<MemoIpld>, impl Stream<Item = Result<(Cid, Vec<u8>)>>), PushError> {
debug!("Updating the gateway's sphere...");

// NOTE CDATA: "Previous version" doesn't cover all cases; this needs to be a version given
// in the push body, or else we don't know how far back we actually have to go (e.g., the name
// system may have created a new version in the mean time.
let previous_version = self.sphere_context.version().await?;
let previous_version = push_body.counterpart_tip.as_ref();
let next_version = SphereCursor::latest(self.sphere_context.clone())
.save(None)
.await?;

let db = self.sphere_context.sphere_context().await?.db().clone();
let block_stream = memo_history_stream(db, &next_version, Some(&previous_version), false);
let block_stream = memo_history_stream(db, &next_version, previous_version, false);

Ok((next_version, block_stream))
}
Expand Down
10 changes: 5 additions & 5 deletions rust/noosphere-gateway/src/worker/name_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ where
loop {
for local_sphere in local_spheres.iter() {
if let Err(error) = periodic_publish_record(&tx, local_sphere).await {
error!("Could not publish record: {}", error);
error!("Periodic re-publish of link record failed: {}", error);
};
}
tokio::time::sleep(Duration::from_secs(PERIODIC_PUBLISH_INTERVAL_SECONDS)).await;
Expand All @@ -153,7 +153,7 @@ where
record,
republish: true,
}) {
warn!("Failed to request name record publish: {}", error);
warn!("Failed to request link record publish: {}", error);
}
}
_ => {
Expand Down Expand Up @@ -393,7 +393,7 @@ where
// TODO(#260): What if the resolved value is None?
Some(record) if last_known_record != next_record => {
debug!(
"Gateway adopting petname record for '{}' ({}): {}",
"Gateway adopting petname link record for '{}' ({}): {}",
name, identity.did, record
);

Expand All @@ -404,7 +404,7 @@ where
}

if let Err(e) = context.set_petname_record(&name, record).await {
warn!("Could not set petname record: {}", e);
warn!("Could not set petname link record: {}", e);
continue;
}
}
Expand All @@ -419,7 +419,7 @@ where
Ok(())
}

/// Attempts to fetch a single name record from the name system.
/// Attempts to fetch a single link record from the name system.
async fn fetch_record(
client: Arc<dyn NameResolver>,
name: String,
Expand Down
82 changes: 72 additions & 10 deletions rust/noosphere-gateway/src/worker/syndication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,17 @@ pub struct SyndicationJob<C> {
}

/// A [SyndicationCheckpoint] represents the last spot in the history of a
/// sphere that was successfully syndicated to an IPFS node. It records a Bloom
/// filter populated by the CIDs of all blocks that have been syndicated, which
/// gives us a short-cut to determine if a block should be added.
/// sphere that was successfully syndicated to an IPFS node.
#[derive(Serialize, Deserialize)]
pub struct SyndicationCheckpoint {
pub last_syndicated_version: Option<Link<MemoIpld>>,
pub last_syndicated_counterpart_version: Option<Link<MemoIpld>>,
pub syndication_epoch: u64,
}

impl SyndicationCheckpoint {
pub fn new() -> Result<Self> {
Ok(Self {
last_syndicated_version: None,
last_syndicated_counterpart_version: None,
syndication_epoch: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(),
})
}
Expand All @@ -70,22 +68,76 @@ impl SyndicationCheckpoint {
}
}

// Re-syndicate every 90 days
// Force full re-syndicate every 90 days
const MAX_SYNDICATION_CHECKPOINT_LIFETIME: Duration = Duration::from_secs(60 * 60 * 24 * 90);

// Periodic syndication check every 5 minutes
const PERIODIC_SYNDICATION_INTERVAL_SECONDS: Duration = Duration::from_secs(5 * 60);

/// Start a Tokio task that waits for [SyndicationJob] messages and then
/// attempts to syndicate to the configured IPFS RPC. Currently only Kubo IPFS
/// backends are supported.
pub fn start_ipfs_syndication<C, S>(
ipfs_api: Url,
local_spheres: Vec<C>,
) -> (UnboundedSender<SyndicationJob<C>>, JoinHandle<Result<()>>)
where
C: HasMutableSphereContext<S> + 'static,
S: Storage + 'static,
{
let (tx, rx) = unbounded_channel();

(tx, tokio::task::spawn(ipfs_syndication_task(ipfs_api, rx)))
let task = {
let tx = tx.clone();
tokio::task::spawn(async move {
let (_, syndication_result) = tokio::join!(
periodic_syndication_task(tx, local_spheres),
ipfs_syndication_task(ipfs_api, rx)
);
syndication_result?;
Ok(())
})
};

(tx, task)
}

async fn periodic_syndication_task<C, S>(
tx: UnboundedSender<SyndicationJob<C>>,
local_spheres: Vec<C>,
) where
C: HasMutableSphereContext<S>,
S: Storage + 'static,
{
loop {
for local_sphere in &local_spheres {
if let Err(error) = periodic_syndication(&tx, local_sphere).await {
error!("Periodic syndication of sphere history failed: {}", error);
};
tokio::time::sleep(Duration::from_secs(5)).await;
}
tokio::time::sleep(PERIODIC_SYNDICATION_INTERVAL_SECONDS).await;
}
}

async fn periodic_syndication<C, S>(
tx: &UnboundedSender<SyndicationJob<C>>,
local_sphere: &C,
) -> Result<()>
where
C: HasMutableSphereContext<S>,
S: Storage + 'static,
{
let latest_version = local_sphere.version().await?;

if let Err(error) = tx.send(SyndicationJob {
revision: latest_version,
context: local_sphere.clone(),
}) {
warn!("Failed to request periodic syndication: {}", error);
};

Ok(())
}

async fn ipfs_syndication_task<C, S>(
Expand Down Expand Up @@ -165,14 +217,23 @@ where
None => SyndicationCheckpoint::new()?,
};

if Some(counterpart_revision) == syndication_checkpoint.last_syndicated_counterpart_version
{
warn!("Counterpart version hasn't changed; skipping syndication");
return Ok(());
}

(counterpart_revision, syndication_checkpoint, db)
};

let timeline = Timeline::new(&db)
.slice(
&sphere_revision,
syndication_checkpoint.last_syndicated_version.as_ref(),
syndication_checkpoint
.last_syndicated_counterpart_version
.as_ref(),
)
.exclude_past()
.to_chronological()
.await?;

Expand Down Expand Up @@ -204,7 +265,7 @@ where
{
Ok(_) => {
debug!("Syndicated sphere revision {} to IPFS", cid);
syndication_checkpoint.last_syndicated_version = Some(cid);
syndication_checkpoint.last_syndicated_counterpart_version = Some(cid);
}
Err(error) => warn!(
"Failed to pin orphans for revision {} to IPFS: {:?}",
Expand Down Expand Up @@ -281,7 +342,8 @@ mod tests {
let ipfs_url = Url::parse("http://127.0.0.1:5001")?;
let local_kubo_client = KuboClient::new(&ipfs_url.clone())?;

let (syndication_tx, _syndication_join_handle) = start_ipfs_syndication::<_, _>(ipfs_url);
let (syndication_tx, _syndication_join_handle) =
start_ipfs_syndication::<_, _>(ipfs_url, vec![user_sphere_context.clone()]);

user_sphere_context
.write("foo", &ContentType::Text, b"bar".as_ref(), None)
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-storage/examples/bench/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl BenchmarkStorage {
))]
let (storage, storage_name) = {
(
noosphere_storage::SledStorage::new(&storage_path)?,
noosphere_storage::SledStorage::new(storage_path)?,
"SledDbStorage",
)
};
Expand Down
25 changes: 18 additions & 7 deletions rust/noosphere-storage/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use tokio::select;
use crate::BlockStore;

const DEFAULT_MAX_RETRIES: u32 = 2u32;
const DEFAULT_TIMEOUT: Duration = Duration::from_millis(1500);
const DEFAULT_MINIMUM_DELAY: Duration = Duration::from_secs(1);
const DEFAULT_BACKOFF: Backoff = Backoff::Exponential {
exponent: 2f32,
ceiling: Duration::from_secs(6),
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
const DEFAULT_MINIMUM_DELAY: Duration = Duration::from_millis(100);
const DEFAULT_BACKOFF: Backoff = Backoff::Linear {
increment: Duration::from_secs(1),
ceiling: Duration::from_secs(3),
};

/// Backoff configuration used to define how [BlockStoreRetry] should time
Expand Down Expand Up @@ -127,17 +127,28 @@ where
};
},
_ = tokio::time::sleep(next_timeout) => {
warn!("Timed out trying to get {} after {} seconds...", cid, next_timeout.as_secs());
warn!("Timed out trying to get {} after {} seconds...", cid, next_timeout.as_secs_f32());
}
}

let spent_window_time = Instant::now() - window_start;
let remaining_window_time = spent_window_time.max(self.minimum_delay);

// NOTE: Be careful here; `Duration` will overflow when dealing with
// negative values so these operations are effectively fallible.
// https://doc.rust-lang.org/std/time/struct.Duration.html#panics-7
let remaining_window_time = self.attempt_window
- spent_window_time
.max(self.minimum_delay)
.min(self.attempt_window);

retry_count += 1;

if let Some(backoff) = &self.backoff {
next_timeout = backoff.step(next_timeout);
trace!(
"Next timeout will be {} seconds",
next_timeout.as_secs_f32()
);
}

tokio::time::sleep(remaining_window_time).await;
Expand Down

0 comments on commit f7dd38d

Please sign in to comment.