From 900d1698a14724d5888c23bb44e70bafd464f85e Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 10 Mar 2025 14:24:12 +0200 Subject: [PATCH 01/15] Add API to get the content of a single tag --- src/rpc.rs | 10 ++++++++++ src/rpc/client/tags.rs | 6 ++++++ src/rpc/proto/tags.rs | 28 +++++++++++++++++++++++++++- src/util.rs | 13 +++++++++++++ 4 files changed, 56 insertions(+), 1 deletion(-) diff --git a/src/rpc.rs b/src/rpc.rs index a17cbb77c..10bb54879 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -317,6 +317,16 @@ impl Handler { #[allow(clippy::manual_flatten)] for item in tags { if let Ok((name, HashAndFormat { hash, format })) = item { + if let Some(from) = msg.from.as_ref() { + if &name < from { + continue; + } + } + if let Some(to) = msg.to.as_ref() { + if &name >= to { + break; + } + } if (format.is_raw() && msg.raw) || (format.is_hash_seq() && msg.hash_seq) { co.yield_(TagInfo { name, hash, format }).await; } diff --git a/src/rpc/client/tags.rs b/src/rpc/client/tags.rs index 103ecc618..5976fc8ae 100644 --- a/src/rpc/client/tags.rs +++ b/src/rpc/client/tags.rs @@ -42,6 +42,12 @@ where Self { rpc } } + /// Get the value of a single tag + pub async fn get(&self, name: Tag) -> Result> { + let mut stream = self.rpc.server_streaming(ListRequest::single(name)).await?; + Ok(stream.next().await.transpose()?) + } + /// Lists all tags. pub async fn list(&self) -> Result>> { let stream = self.rpc.server_streaming(ListRequest::all()).await?; diff --git a/src/rpc/proto/tags.rs b/src/rpc/proto/tags.rs index 54d35f625..8bb42e30b 100644 --- a/src/rpc/proto/tags.rs +++ b/src/rpc/proto/tags.rs @@ -1,10 +1,13 @@ //! Tags RPC protocol +use bytes::Bytes; use nested_enum_utils::enum_conversions; use quic_rpc_derive::rpc_requests; use serde::{Deserialize, Serialize}; use super::{RpcResult, RpcService}; -use crate::{net_protocol::BatchId, rpc::client::tags::TagInfo, HashAndFormat, Tag}; +use crate::{ + net_protocol::BatchId, rpc::client::tags::TagInfo, util::increment_vec, HashAndFormat, Tag, +}; #[allow(missing_docs)] #[derive(strum::Display, Debug, Serialize, Deserialize)] @@ -73,14 +76,33 @@ pub struct ListRequest { pub raw: bool, /// List hash seq tags pub hash_seq: bool, + /// From tag + pub from: Option, + /// To tag (exclusive) + pub to: Option, } impl ListRequest { + /// List a single tag + pub fn single(name: Tag) -> Self { + let mut next = name.0.to_vec(); + increment_vec(&mut next); + let next = Bytes::from(next).into(); + Self { + raw: true, + hash_seq: true, + from: Some(name), + to: Some(next), + } + } + /// List all tags pub fn all() -> Self { Self { raw: true, hash_seq: true, + from: None, + to: None, } } @@ -89,6 +111,8 @@ impl ListRequest { Self { raw: true, hash_seq: false, + from: None, + to: None, } } @@ -97,6 +121,8 @@ impl ListRequest { Self { raw: false, hash_seq: true, + from: None, + to: None, } } } diff --git a/src/util.rs b/src/util.rs index b2c0d76a8..4b12d6ffd 100644 --- a/src/util.rs +++ b/src/util.rs @@ -302,6 +302,19 @@ pub(crate) fn raw_outboard_size(size: u64) -> u64 { BaoTree::new(size, IROH_BLOCK_SIZE).outboard_size() } +/// Increment a byte vector, lexographically. +pub(crate) fn increment_vec(bytes: &mut Vec) { + for byte in bytes.iter_mut().rev() { + if *byte < 255 { + *byte += 1; + return; + } + *byte = 0; + } + + bytes.push(0); +} + /// Synchronously compute the outboard of a file, and return hash and outboard. /// /// It is assumed that the file is not modified while this is running. From a9ca5059a8dbd8973656f106ac8c094fcbe3860b Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 10 Mar 2025 14:41:36 +0200 Subject: [PATCH 02/15] Add a fn to list all tags with a certain prefix --- src/rpc/client/tags.rs | 10 ++++++++++ src/rpc/proto/tags.rs | 21 ++++++++++++++++++++- src/util.rs | 15 +++++++++++++++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/src/rpc/client/tags.rs b/src/rpc/client/tags.rs index 5976fc8ae..9854706da 100644 --- a/src/rpc/client/tags.rs +++ b/src/rpc/client/tags.rs @@ -11,6 +11,7 @@ //! //! [`Client::delete`] can be used to delete a tag. use anyhow::Result; +use bytes::Bytes; use futures_lite::{Stream, StreamExt}; use quic_rpc::{client::BoxedConnector, Connector, RpcClient}; use serde::{Deserialize, Serialize}; @@ -48,6 +49,15 @@ where Ok(stream.next().await.transpose()?) } + /// Lists all tags. + pub async fn list_prefix(&self, prefix: &[u8]) -> Result>> { + let stream = self + .rpc + .server_streaming(ListRequest::prefix(Bytes::copy_from_slice(prefix).into())) + .await?; + Ok(stream.map(|res| res.map_err(anyhow::Error::from))) + } + /// Lists all tags. pub async fn list(&self) -> Result>> { let stream = self.rpc.server_streaming(ListRequest::all()).await?; diff --git a/src/rpc/proto/tags.rs b/src/rpc/proto/tags.rs index 8bb42e30b..7ddb53139 100644 --- a/src/rpc/proto/tags.rs +++ b/src/rpc/proto/tags.rs @@ -6,7 +6,10 @@ use serde::{Deserialize, Serialize}; use super::{RpcResult, RpcService}; use crate::{ - net_protocol::BatchId, rpc::client::tags::TagInfo, util::increment_vec, HashAndFormat, Tag, + net_protocol::BatchId, + rpc::client::tags::TagInfo, + util::{increment_vec, next_prefix}, + HashAndFormat, Tag, }; #[allow(missing_docs)] @@ -83,6 +86,22 @@ pub struct ListRequest { } impl ListRequest { + /// List tags with a prefix + pub fn prefix(prefix: Tag) -> Self { + let mut to = prefix.0.to_vec(); + let to = if next_prefix(&mut to) { + Some(Bytes::from(to).into()) + } else { + None + }; + Self { + raw: true, + hash_seq: true, + from: Some(prefix), + to, + } + } + /// List a single tag pub fn single(name: Tag) -> Self { let mut next = name.0.to_vec(); diff --git a/src/util.rs b/src/util.rs index 4b12d6ffd..14105923c 100644 --- a/src/util.rs +++ b/src/util.rs @@ -302,6 +302,21 @@ pub(crate) fn raw_outboard_size(size: u64) -> u64 { BaoTree::new(size, IROH_BLOCK_SIZE).outboard_size() } +/// Given a prefix, increment it lexographically. +/// +/// If the prefix is all FF, this will return false because there is no +/// higher prefix than that. +pub(crate) fn next_prefix(bytes: &mut [u8]) -> bool { + for byte in bytes.iter_mut().rev() { + if *byte < 255 { + *byte += 1; + return true; + } + *byte = 0; + } + false +} + /// Increment a byte vector, lexographically. pub(crate) fn increment_vec(bytes: &mut Vec) { for byte in bytes.iter_mut().rev() { From daf4a88490c1c181f7b28683c1a657c54de5c189 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 10 Mar 2025 14:54:49 +0200 Subject: [PATCH 03/15] fix warnings --- src/util.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/util.rs b/src/util.rs index 14105923c..677cd03e9 100644 --- a/src/util.rs +++ b/src/util.rs @@ -306,6 +306,7 @@ pub(crate) fn raw_outboard_size(size: u64) -> u64 { /// /// If the prefix is all FF, this will return false because there is no /// higher prefix than that. +#[allow(dead_code)] pub(crate) fn next_prefix(bytes: &mut [u8]) -> bool { for byte in bytes.iter_mut().rev() { if *byte < 255 { @@ -318,6 +319,7 @@ pub(crate) fn next_prefix(bytes: &mut [u8]) -> bool { } /// Increment a byte vector, lexographically. +#[allow(dead_code)] pub(crate) fn increment_vec(bytes: &mut Vec) { for byte in bytes.iter_mut().rev() { if *byte < 255 { From b47accac03b485279590838a4220c4634ecd8e71 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 11 Mar 2025 11:25:17 +0200 Subject: [PATCH 04/15] WIP --- src/rpc/client/tags.rs | 15 ++++++++++----- src/rpc/proto/tags.rs | 20 ++++++++++++-------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/rpc/client/tags.rs b/src/rpc/client/tags.rs index 9854706da..e0793235c 100644 --- a/src/rpc/client/tags.rs +++ b/src/rpc/client/tags.rs @@ -11,7 +11,6 @@ //! //! [`Client::delete`] can be used to delete a tag. use anyhow::Result; -use bytes::Bytes; use futures_lite::{Stream, StreamExt}; use quic_rpc::{client::BoxedConnector, Connector, RpcClient}; use serde::{Deserialize, Serialize}; @@ -44,16 +43,22 @@ where } /// Get the value of a single tag - pub async fn get(&self, name: Tag) -> Result> { - let mut stream = self.rpc.server_streaming(ListRequest::single(name)).await?; + pub async fn get(&self, name: impl AsRef<[u8]>) -> Result> { + let mut stream = self + .rpc + .server_streaming(ListRequest::single(name.as_ref())) + .await?; Ok(stream.next().await.transpose()?) } /// Lists all tags. - pub async fn list_prefix(&self, prefix: &[u8]) -> Result>> { + pub async fn list_prefix( + &self, + prefix: impl AsRef<[u8]>, + ) -> Result>> { let stream = self .rpc - .server_streaming(ListRequest::prefix(Bytes::copy_from_slice(prefix).into())) + .server_streaming(ListRequest::prefix(prefix.as_ref())) .await?; Ok(stream.map(|res| res.map_err(anyhow::Error::from))) } diff --git a/src/rpc/proto/tags.rs b/src/rpc/proto/tags.rs index 7ddb53139..e0f80a2f6 100644 --- a/src/rpc/proto/tags.rs +++ b/src/rpc/proto/tags.rs @@ -87,8 +87,10 @@ pub struct ListRequest { impl ListRequest { /// List tags with a prefix - pub fn prefix(prefix: Tag) -> Self { - let mut to = prefix.0.to_vec(); + pub fn prefix(prefix: &[u8]) -> Self { + let from = prefix.to_vec(); + let mut to = from.clone(); + let from = Bytes::from(from).into(); let to = if next_prefix(&mut to) { Some(Bytes::from(to).into()) } else { @@ -97,21 +99,23 @@ impl ListRequest { Self { raw: true, hash_seq: true, - from: Some(prefix), + from: Some(from), to, } } /// List a single tag - pub fn single(name: Tag) -> Self { - let mut next = name.0.to_vec(); + pub fn single(name: &[u8]) -> Self { + let from = name.to_vec(); + let mut next = from.clone(); increment_vec(&mut next); - let next = Bytes::from(next).into(); + let from = Bytes::from(from).into(); + let to = Bytes::from(next).into(); Self { raw: true, hash_seq: true, - from: Some(name), - to: Some(next), + from: Some(from), + to: Some(to), } } From 26790abb530e5b0d05b4e6017d6beb23ada07174 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 11 Mar 2025 12:10:38 +0200 Subject: [PATCH 05/15] Make tag more rich and add a richer API for tag deletion as well todo: make it compile and add tests --- src/rpc/client/tags.rs | 170 +++++++++++++++++++++++++++++++++++++---- src/rpc/proto/tags.rs | 87 +++++---------------- src/util.rs | 31 ++++++++ 3 files changed, 209 insertions(+), 79 deletions(-) diff --git a/src/rpc/client/tags.rs b/src/rpc/client/tags.rs index e0793235c..caa3a7514 100644 --- a/src/rpc/client/tags.rs +++ b/src/rpc/client/tags.rs @@ -10,6 +10,8 @@ //! [`Client::list_hash_seq`] can be used to list all tags with a hash_seq format. //! //! [`Client::delete`] can be used to delete a tag. +use std::ops::{Bound, RangeBounds}; + use anyhow::Result; use futures_lite::{Stream, StreamExt}; use quic_rpc::{client::BoxedConnector, Connector, RpcClient}; @@ -30,6 +32,125 @@ pub struct Client> { pub(super) rpc: RpcClient, } +/// Options for a list operation. +#[derive(Debug, Clone)] +pub struct ListOptions { + /// List tags to hash seqs + pub hash_seq: bool, + /// List tags to raw blobs + pub raw: bool, + /// Optional from tag (inclusive) + pub from: Option, + /// Optional to tag (exclusive) + pub to: Option, +} + +fn tags_from_range(range: R) -> (Option, Option) +where + R: RangeBounds, + E: AsRef<[u8]>, +{ + let from = match range.start_bound() { + Bound::Included(start) => Some(Tag::from(start.as_ref())), + Bound::Excluded(start) => Some(Tag::from(start.as_ref()).successor()), + Bound::Unbounded => None, + }; + let to = match range.end_bound() { + Bound::Included(end) => Some(Tag::from(end.as_ref()).successor()), + Bound::Excluded(end) => Some(Tag::from(end.as_ref())), + Bound::Unbounded => None, + }; + (from, to) +} + +impl ListOptions { + /// List a range of tags + pub fn range(range: R) -> Self + where + R: RangeBounds, + E: AsRef<[u8]>, + { + let (from, to) = tags_from_range(range); + Self { + from, + to, + raw: true, + hash_seq: true, + } + } + + /// List tags with a prefix + pub fn prefix(prefix: &[u8]) -> Self { + let from = Tag::from(prefix); + let to = from.next_prefix(); + Self { + raw: true, + hash_seq: true, + from: Some(from), + to, + } + } + + /// List a single tag + pub fn single(name: &[u8]) -> Self { + let from = Tag::from(name); + Self { + to: Some(from.successor()), + from: Some(from), + raw: true, + hash_seq: true, + } + } + + /// List all tags + pub fn all() -> Self { + Self { + raw: true, + hash_seq: true, + from: None, + to: None, + } + } + + /// List raw tags + pub fn raw() -> Self { + Self { + raw: true, + hash_seq: false, + from: None, + to: None, + } + } + + /// List hash seq tags + pub fn hash_seq() -> Self { + Self { + raw: false, + hash_seq: true, + from: None, + to: None, + } + } +} + +/// Options for a delete operation. +#[derive(Debug, Clone)] +pub struct DeleteOptions { + /// Optional from tag (inclusive) + pub from: Option, + /// Optional to tag (exclusive) + pub to: Option, +} + +impl DeleteOptions { + pub fn single(name: Tag) -> Self { + Self { + to: Some(name.successor()), + from: Some(name), + } + } +} + /// A client that uses the memory connector. pub type MemClient = Client; @@ -42,44 +163,67 @@ where Self { rpc } } + /// List all tags with options. + /// + /// This is the most flexible way to list tags. All the other list methods are just convenience + /// methods that call this one with the appropriate options. + pub async fn list_with_opts( + &self, + options: ListOptions, + ) -> Result>> { + let stream = self + .rpc + .server_streaming(ListRequest::from(options)) + .await?; + Ok(stream.map(|res| res.map_err(anyhow::Error::from))) + } + /// Get the value of a single tag pub async fn get(&self, name: impl AsRef<[u8]>) -> Result> { let mut stream = self - .rpc - .server_streaming(ListRequest::single(name.as_ref())) + .list_with_opts(ListOptions::single(name.as_ref())) .await?; Ok(stream.next().await.transpose()?) } + /// List a range of tags + pub async fn list_range(&self, range: R) -> Result>> + where + R: RangeBounds, + E: AsRef<[u8]>, + { + self.list_with_opts(ListOptions::range(range)).await + } + /// Lists all tags. pub async fn list_prefix( &self, prefix: impl AsRef<[u8]>, ) -> Result>> { - let stream = self - .rpc - .server_streaming(ListRequest::prefix(prefix.as_ref())) - .await?; - Ok(stream.map(|res| res.map_err(anyhow::Error::from))) + self.list_with_opts(ListOptions::prefix(prefix.as_ref())) + .await } /// Lists all tags. pub async fn list(&self) -> Result>> { - let stream = self.rpc.server_streaming(ListRequest::all()).await?; - Ok(stream.map(|res| res.map_err(anyhow::Error::from))) + self.list_with_opts(ListOptions::all()).await } /// Lists all tags with a hash_seq format. pub async fn list_hash_seq(&self) -> Result>> { - let stream = self.rpc.server_streaming(ListRequest::hash_seq()).await?; - Ok(stream.map(|res| res.map_err(anyhow::Error::from))) + self.list_with_opts(ListOptions::hash_seq()).await } /// Deletes a tag. - pub async fn delete(&self, name: Tag) -> Result<()> { - self.rpc.rpc(DeleteRequest { name }).await??; + pub async fn delete_with_opts(&self, options: DeleteOptions) -> Result<()> { + self.rpc.rpc(DeleteRequest::from(options)).await??; Ok(()) } + + /// Deletes a tag. + pub async fn delete(&self, name: Tag) -> Result<()> { + self.delete_with_opts(DeleteOptions::single(name)).await + } } /// Information about a tag. diff --git a/src/rpc/proto/tags.rs b/src/rpc/proto/tags.rs index e0f80a2f6..853efb8c6 100644 --- a/src/rpc/proto/tags.rs +++ b/src/rpc/proto/tags.rs @@ -1,5 +1,4 @@ //! Tags RPC protocol -use bytes::Bytes; use nested_enum_utils::enum_conversions; use quic_rpc_derive::rpc_requests; use serde::{Deserialize, Serialize}; @@ -7,8 +6,7 @@ use serde::{Deserialize, Serialize}; use super::{RpcResult, RpcService}; use crate::{ net_protocol::BatchId, - rpc::client::tags::TagInfo, - util::{increment_vec, next_prefix}, + rpc::client::tags::{DeleteOptions, ListOptions, TagInfo}, HashAndFormat, Tag, }; @@ -79,73 +77,19 @@ pub struct ListRequest { pub raw: bool, /// List hash seq tags pub hash_seq: bool, - /// From tag + /// From tag (inclusive) pub from: Option, /// To tag (exclusive) pub to: Option, } -impl ListRequest { - /// List tags with a prefix - pub fn prefix(prefix: &[u8]) -> Self { - let from = prefix.to_vec(); - let mut to = from.clone(); - let from = Bytes::from(from).into(); - let to = if next_prefix(&mut to) { - Some(Bytes::from(to).into()) - } else { - None - }; +impl From for ListRequest { + fn from(options: ListOptions) -> Self { Self { - raw: true, - hash_seq: true, - from: Some(from), - to, - } - } - - /// List a single tag - pub fn single(name: &[u8]) -> Self { - let from = name.to_vec(); - let mut next = from.clone(); - increment_vec(&mut next); - let from = Bytes::from(from).into(); - let to = Bytes::from(next).into(); - Self { - raw: true, - hash_seq: true, - from: Some(from), - to: Some(to), - } - } - - /// List all tags - pub fn all() -> Self { - Self { - raw: true, - hash_seq: true, - from: None, - to: None, - } - } - - /// List raw tags - pub fn raw() -> Self { - Self { - raw: true, - hash_seq: false, - from: None, - to: None, - } - } - - /// List hash seq tags - pub fn hash_seq() -> Self { - Self { - raw: false, - hash_seq: true, - from: None, - to: None, + raw: options.raw, + hash_seq: options.hash_seq, + from: options.from, + to: options.to, } } } @@ -153,6 +97,17 @@ impl ListRequest { /// Delete a tag #[derive(Debug, Serialize, Deserialize)] pub struct DeleteRequest { - /// Name of the tag - pub name: Tag, + /// From tag (inclusive) + pub from: Option, + /// To tag (exclusive) + pub to: Option, +} + +impl From for DeleteRequest { + fn from(options: DeleteOptions) -> Self { + Self { + from: options.from, + to: options.to, + } + } } diff --git a/src/util.rs b/src/util.rs index 677cd03e9..7ddcb4bc3 100644 --- a/src/util.rs +++ b/src/util.rs @@ -74,6 +74,18 @@ mod redb_support { } } +impl From<&[u8]> for Tag { + fn from(value: &[u8]) -> Self { + Self(Bytes::copy_from_slice(value)) + } +} + +impl AsRef<[u8]> for Tag { + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + impl Borrow<[u8]> for Tag { fn borrow(&self) -> &[u8] { self.0.as_ref() @@ -132,6 +144,25 @@ impl Tag { i += 1; } } + + /// The successor of this tag in lexicographic order. + pub fn successor(&self) -> Self { + let mut bytes = self.0.to_vec(); + increment_vec(&mut bytes); + Self(bytes.into()) + } + + /// If this is a prefix, get the next prefix. + /// + /// This is like successor, except that it will return None if the prefix is all 0xFF instead of appending a 0 byte. + pub fn next_prefix(&self) -> Option { + let mut bytes = self.0.to_vec(); + if next_prefix(&mut bytes) { + Some(Self(bytes.into())) + } else { + None + } + } } /// Option for commands that allow setting a tag From 2791f794d1de918a005e7c4c2eced14fd5f08d7a Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 11 Mar 2025 12:41:08 +0200 Subject: [PATCH 06/15] implement delete range --- src/rpc.rs | 30 +++++++------------ src/rpc/client/blobs/batch.rs | 2 +- src/rpc/client/tags.rs | 1 + src/rpc/proto/tags.rs | 4 +-- src/store/fs.rs | 56 ++++++++++++++++++++++++++--------- src/store/mem.rs | 27 +++++++++++++---- src/store/readonly_mem.rs | 6 +++- src/store/traits.rs | 14 ++++++++- tests/gc.rs | 18 +++++------ 9 files changed, 105 insertions(+), 53 deletions(-) diff --git a/src/rpc.rs b/src/rpc.rs index 10bb54879..8b73f69d5 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -295,7 +295,7 @@ impl Handler { async fn blob_delete_tag(self, msg: TagDeleteRequest) -> RpcResult<()> { self.store() - .set_tag(msg.name, None) + .delete_tags(msg.from, msg.to) .await .map_err(|e| RpcError::new(&e))?; Ok(()) @@ -403,13 +403,11 @@ impl Handler { blobs.store().sync().await.map_err(|e| RpcError::new(&e))?; } if let Some(batch) = msg.batch { - if let Some(content) = msg.value.as_ref() { - blobs - .batches() - .await - .remove_one(batch, content) - .map_err(|e| RpcError::new(&*e))?; - } + blobs + .batches() + .await + .remove_one(batch, &msg.value) + .map_err(|e| RpcError::new(&*e))?; } Ok(()) } @@ -582,10 +580,7 @@ impl Handler { let HashAndFormat { hash, format } = *hash_and_format; let tag = match tag { SetTagOption::Named(tag) => { - blobs - .store() - .set_tag(tag.clone(), Some(*hash_and_format)) - .await?; + blobs.store().set_tag(tag.clone(), *hash_and_format).await?; tag } SetTagOption::Auto => blobs.store().create_tag(*hash_and_format).await?, @@ -774,10 +769,7 @@ impl Handler { let HashAndFormat { hash, format } = hash_and_format; let tag = match msg.tag { SetTagOption::Named(tag) => { - blobs - .store() - .set_tag(tag.clone(), Some(hash_and_format)) - .await?; + blobs.store().set_tag(tag.clone(), hash_and_format).await?; tag } SetTagOption::Auto => blobs.store().create_tag(hash_and_format).await?, @@ -917,7 +909,7 @@ impl Handler { SetTagOption::Named(tag) => { blobs .store() - .set_tag(tag.clone(), Some(*hash_and_format)) + .set_tag(tag.clone(), *hash_and_format) .await .map_err(|e| RpcError::new(&e))?; tag @@ -932,7 +924,7 @@ impl Handler { for tag in tags_to_delete { blobs .store() - .set_tag(tag, None) + .delete_tags(Some(tag.clone()), Some(tag.successor())) .await .map_err(|e| RpcError::new(&e))?; } @@ -969,7 +961,7 @@ impl Handler { progress.send(DownloadProgress::AllDone(stats)).await.ok(); match tag { SetTagOption::Named(tag) => { - self.store().set_tag(tag, Some(hash_and_format)).await?; + self.store().set_tag(tag, hash_and_format).await?; } SetTagOption::Auto => { self.store().create_tag(hash_and_format).await?; diff --git a/src/rpc/client/blobs/batch.rs b/src/rpc/client/blobs/batch.rs index b82f17837..5a964441e 100644 --- a/src/rpc/client/blobs/batch.rs +++ b/src/rpc/client/blobs/batch.rs @@ -441,7 +441,7 @@ where .rpc .rpc(tags::SetRequest { name: tag, - value: Some(tt.hash_and_format()), + value: tt.hash_and_format(), batch: Some(self.0.batch), sync: SyncMode::Full, }) diff --git a/src/rpc/client/tags.rs b/src/rpc/client/tags.rs index caa3a7514..ecb4cf0a2 100644 --- a/src/rpc/client/tags.rs +++ b/src/rpc/client/tags.rs @@ -143,6 +143,7 @@ pub struct DeleteOptions { } impl DeleteOptions { + /// Delete a single tag pub fn single(name: Tag) -> Self { Self { to: Some(name.successor()), diff --git a/src/rpc/proto/tags.rs b/src/rpc/proto/tags.rs index 853efb8c6..61d2cff01 100644 --- a/src/rpc/proto/tags.rs +++ b/src/rpc/proto/tags.rs @@ -60,8 +60,8 @@ pub struct CreateRequest { pub struct SetRequest { /// Name of the tag pub name: Tag, - /// Value of the tag, None to delete - pub value: Option, + /// Value of the tag + pub value: HashAndFormat, /// Batch to use, none for global pub batch: Option, /// Sync mode diff --git a/src/store/fs.rs b/src/store/fs.rs index 8f6adb37f..c71b466b9 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -67,6 +67,7 @@ use std::{ collections::{BTreeMap, BTreeSet}, future::Future, io, + ops::Bound, path::{Path, PathBuf}, sync::{Arc, RwLock}, time::{Duration, SystemTime}, @@ -608,10 +609,16 @@ pub(crate) enum ActorMessage { ActorResult>>, >, }, - /// Modification method: set a tag to a value, or remove it. + /// Modification method: set a tag to a value. SetTag { tag: Tag, - value: Option, + value: HashAndFormat, + tx: oneshot::Sender>, + }, + /// Modification method: set a tag to a value. + DeleteTags { + from: Option, + to: Option, tx: oneshot::Sender>, }, /// Modification method: create a new unique tag and set it to a value. @@ -673,6 +680,7 @@ impl ActorMessage { | Self::CreateTag { .. } | Self::SetFullEntryState { .. } | Self::Delete { .. } + | Self::DeleteTags { .. } | Self::GcDelete { .. } => MessageCategory::ReadWrite, Self::UpdateInlineOptions { .. } | Self::Sync { .. } @@ -870,7 +878,7 @@ impl StoreInner { Ok(tags) } - async fn set_tag(&self, tag: Tag, value: Option) -> OuterResult<()> { + async fn set_tag(&self, tag: Tag, value: HashAndFormat) -> OuterResult<()> { let (tx, rx) = oneshot::channel(); self.tx .send(ActorMessage::SetTag { tag, value, tx }) @@ -878,6 +886,14 @@ impl StoreInner { Ok(rx.await??) } + async fn delete_tags(&self, from: Option, to: Option) -> OuterResult<()> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(ActorMessage::DeleteTags { from, to, tx }) + .await?; + Ok(rx.await??) + } + async fn create_tag(&self, hash: HashAndFormat) -> OuterResult { let (tx, rx) = oneshot::channel(); self.tx.send(ActorMessage::CreateTag { hash, tx }).await?; @@ -1371,10 +1387,14 @@ impl super::Store for Store { .await??) } - async fn set_tag(&self, name: Tag, hash: Option) -> io::Result<()> { + async fn set_tag(&self, name: Tag, hash: HashAndFormat) -> io::Result<()> { Ok(self.0.set_tag(name, hash).await?) } + async fn delete_tags(&self, from: Option, to: Option) -> io::Result<()> { + Ok(self.0.delete_tags(from, to).await?) + } + async fn create_tag(&self, hash: HashAndFormat) -> io::Result { Ok(self.0.create_tag(hash).await?) } @@ -1998,19 +2018,23 @@ impl ActorState { Ok(tag) } - fn set_tag( + fn set_tag(&self, tables: &mut Tables, tag: Tag, value: HashAndFormat) -> ActorResult<()> { + tables.tags.insert(tag, value)?; + Ok(()) + } + + fn delete_tags( &self, tables: &mut Tables, - tag: Tag, - value: Option, + from: Option, + to: Option, ) -> ActorResult<()> { - match value { - Some(value) => { - tables.tags.insert(tag, value)?; - } - None => { - tables.tags.remove(tag)?; - } + let from = from.map(Bound::Included).unwrap_or(Bound::Unbounded); + let to = to.map(Bound::Excluded).unwrap_or(Bound::Unbounded); + let removing = tables.tags.extract_from_if((from, to), |_, _| true)?; + // drain the iterator to actually remove the tags + for res in removing { + res?; } Ok(()) } @@ -2358,6 +2382,10 @@ impl ActorState { let res = self.set_tag(tables, tag, value); tx.send(res).ok(); } + ActorMessage::DeleteTags { from, to, tx } => { + let res = self.delete_tags(tables, from, to); + tx.send(res).ok(); + } ActorMessage::CreateTag { hash, tx } => { let res = self.create_tag(tables, hash); tx.send(res).ok(); diff --git a/src/store/mem.rs b/src/store/mem.rs index 89d8fffd7..5a51633c3 100644 --- a/src/store/mem.rs +++ b/src/store/mem.rs @@ -207,13 +207,28 @@ impl super::Store for Store { .await? } - async fn set_tag(&self, name: Tag, value: Option) -> io::Result<()> { + async fn set_tag(&self, name: Tag, value: HashAndFormat) -> io::Result<()> { let mut state = self.write_lock(); - if let Some(value) = value { - state.tags.insert(name, value); - } else { - state.tags.remove(&name); - } + state.tags.insert(name, value); + Ok(()) + } + + async fn delete_tags(&self, from: Option, to: Option) -> io::Result<()> { + let mut state = self.write_lock(); + // todo: more efficient impl + state.tags.retain(|tag, _| { + if let Some(from) = &from { + if tag < from { + return true; + } + } + if let Some(to) = &to { + if tag >= to { + return true; + } + } + false + }); Ok(()) } diff --git a/src/store/readonly_mem.rs b/src/store/readonly_mem.rs index a04161554..e5d649ae9 100644 --- a/src/store/readonly_mem.rs +++ b/src/store/readonly_mem.rs @@ -313,7 +313,11 @@ impl super::Store for Store { Err(io::Error::new(io::ErrorKind::Other, "not implemented")) } - async fn set_tag(&self, _name: Tag, _hash: Option) -> io::Result<()> { + async fn set_tag(&self, _name: Tag, _hash: HashAndFormat) -> io::Result<()> { + Err(io::Error::new(io::ErrorKind::Other, "not implemented")) + } + + async fn delete_tags(&self, _from: Option, _to: Option) -> io::Result<()> { Err(io::Error::new(io::ErrorKind::Other, "not implemented")) } diff --git a/src/store/traits.rs b/src/store/traits.rs index 01c48229d..b9be04816 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -345,7 +345,19 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug { fn set_tag( &self, name: Tag, - hash: Option, + hash: HashAndFormat, + ) -> impl Future> + Send; + + /// Delete a single tag + fn delete_tag(&self, name: Tag) -> impl Future> + Send { + self.delete_tags(Some(name.clone()), Some(name.successor())) + } + + /// Bulk delete tags + fn delete_tags( + &self, + from: Option, + to: Option, ) -> impl Future> + Send; /// Create a new tag diff --git a/tests/gc.rs b/tests/gc.rs index ea4526027..dcf76b4ef 100644 --- a/tests/gc.rs +++ b/tests/gc.rs @@ -188,7 +188,7 @@ async fn gc_basics() -> Result<()> { // create an explicit tag for h1 (as raw) and then delete the temp tag. Entry should still be there. let tag = Tag::from("test"); bao_store - .set_tag(tag.clone(), Some(HashAndFormat::raw(h2))) + .set_tag(tag.clone(), HashAndFormat::raw(h2)) .await?; drop(tt2); tracing::info!("dropped tt2"); @@ -196,7 +196,7 @@ async fn gc_basics() -> Result<()> { assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::Complete); // delete the explicit tag, entry should be gone - bao_store.set_tag(tag, None).await?; + bao_store.delete_tag(tag).await?; step(&evs).await; assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::NotFound); @@ -234,7 +234,7 @@ async fn gc_hashseq_impl() -> Result<()> { // make a permanent tag for the link seq, then delete the temp tag. Entries should still be there. let tag = Tag::from("test"); bao_store - .set_tag(tag.clone(), Some(HashAndFormat::hash_seq(hr))) + .set_tag(tag.clone(), HashAndFormat::hash_seq(hr)) .await?; drop(ttr); step(&evs).await; @@ -244,7 +244,7 @@ async fn gc_hashseq_impl() -> Result<()> { // change the permanent tag to be just for the linkseq itself as a blob. Only the linkseq should be there, not the entries. bao_store - .set_tag(tag.clone(), Some(HashAndFormat::raw(hr))) + .set_tag(tag.clone(), HashAndFormat::raw(hr)) .await?; step(&evs).await; assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::NotFound); @@ -252,7 +252,7 @@ async fn gc_hashseq_impl() -> Result<()> { assert_eq!(bao_store.entry_status(&hr).await?, EntryStatus::Complete); // delete the permanent tag, everything should be gone - bao_store.set_tag(tag, None).await?; + bao_store.delete_tag(tag).await?; step(&evs).await; assert_eq!(bao_store.entry_status(&h1).await?, EntryStatus::NotFound); assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::NotFound); @@ -339,7 +339,7 @@ async fn gc_file_basics() -> Result<()> { drop(tt2); let tag = Tag::from("test"); bao_store - .set_tag(tag.clone(), Some(HashAndFormat::hash_seq(*ttr.hash()))) + .set_tag(tag.clone(), HashAndFormat::hash_seq(*ttr.hash())) .await?; drop(ttr); @@ -359,7 +359,7 @@ async fn gc_file_basics() -> Result<()> { tracing::info!("changing tag from hashseq to raw, this should orphan the children"); bao_store - .set_tag(tag.clone(), Some(HashAndFormat::raw(hr))) + .set_tag(tag.clone(), HashAndFormat::raw(hr)) .await?; // now only hr itself should be protected, but not its children @@ -376,7 +376,7 @@ async fn gc_file_basics() -> Result<()> { assert!(!path(&hr).exists()); assert!(!outboard_path(&hr).exists()); - bao_store.set_tag(tag, None).await?; + bao_store.delete_tag(tag).await?; step(&evs).await; bao_store.sync().await?; assert!(check_consistency(&bao_store).await? <= ReportLevel::Info); @@ -504,7 +504,7 @@ async fn gc_file_stress() -> Result<()> { if i % 100 == 0 { let tag = Tag::from(format!("test{}", i)); bao_store - .set_tag(tag.clone(), Some(HashAndFormat::raw(*tt.hash()))) + .set_tag(tag.clone(), HashAndFormat::raw(*tt.hash())) .await?; live.push(*tt.hash()); } else { From 34f915a6cb9f3999491676b90c954a8090f6d250 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 11 Mar 2025 12:55:41 +0200 Subject: [PATCH 07/15] fix successor --- src/store/fs.rs | 1 - src/store/mem.rs | 4 ++++ src/util.rs | 17 ++--------------- 3 files changed, 6 insertions(+), 16 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index c71b466b9..6a63a624c 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -86,7 +86,6 @@ use serde::{Deserialize, Serialize}; use smallvec::SmallVec; use tokio::io::AsyncWriteExt; use tracing::trace_span; - mod tables; #[doc(hidden)] pub mod test_support; diff --git a/src/store/mem.rs b/src/store/mem.rs index 5a51633c3..6d3da9df7 100644 --- a/src/store/mem.rs +++ b/src/store/mem.rs @@ -17,6 +17,7 @@ use bao_tree::{ use bytes::{Bytes, BytesMut}; use futures_lite::{Stream, StreamExt}; use iroh_io::AsyncSliceReader; +use tracing::info; use super::{ temp_name, BaoBatchWriter, ConsistencyCheckProgress, ExportMode, ExportProgressCb, ImportMode, @@ -215,6 +216,8 @@ impl super::Store for Store { async fn delete_tags(&self, from: Option, to: Option) -> io::Result<()> { let mut state = self.write_lock(); + info!("deleting tags from {:?} to {:?}", from, to); + // state.tags.remove(&from.unwrap()); // todo: more efficient impl state.tags.retain(|tag, _| { if let Some(from) = &from { @@ -227,6 +230,7 @@ impl super::Store for Store { return true; } } + info!(" removing {:?}", tag); false }); Ok(()) diff --git a/src/util.rs b/src/util.rs index 7ddcb4bc3..fcf3115bf 100644 --- a/src/util.rs +++ b/src/util.rs @@ -148,7 +148,8 @@ impl Tag { /// The successor of this tag in lexicographic order. pub fn successor(&self) -> Self { let mut bytes = self.0.to_vec(); - increment_vec(&mut bytes); + // increment_vec(&mut bytes); + bytes.push(0); Self(bytes.into()) } @@ -349,20 +350,6 @@ pub(crate) fn next_prefix(bytes: &mut [u8]) -> bool { false } -/// Increment a byte vector, lexographically. -#[allow(dead_code)] -pub(crate) fn increment_vec(bytes: &mut Vec) { - for byte in bytes.iter_mut().rev() { - if *byte < 255 { - *byte += 1; - return; - } - *byte = 0; - } - - bytes.push(0); -} - /// Synchronously compute the outboard of a file, and return hash and outboard. /// /// It is assumed that the file is not modified while this is running. From 83998d770664537489734e1e1f7564e17213b278 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 11 Mar 2025 13:07:16 +0200 Subject: [PATCH 08/15] clippy --- src/rpc/client/tags.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/src/rpc/client/tags.rs b/src/rpc/client/tags.rs index ecb4cf0a2..40f571efa 100644 --- a/src/rpc/client/tags.rs +++ b/src/rpc/client/tags.rs @@ -19,10 +19,10 @@ use serde::{Deserialize, Serialize}; use crate::{ rpc::proto::{ - tags::{DeleteRequest, ListRequest}, + tags::{DeleteRequest, ListRequest, SetRequest, SyncMode}, RpcService, }, - BlobFormat, Hash, Tag, + BlobFormat, Hash, HashAndFormat, Tag, }; /// Iroh tags client. @@ -179,12 +179,25 @@ where Ok(stream.map(|res| res.map_err(anyhow::Error::from))) } + /// Set the value for a single tag + pub async fn set(&self, name: impl AsRef<[u8]>, value: impl Into) -> Result<()> { + self.rpc + .rpc(SetRequest { + name: Tag::from(name.as_ref()), + value: value.into(), + batch: None, + sync: SyncMode::Full, + }) + .await??; + Ok(()) + } + /// Get the value of a single tag pub async fn get(&self, name: impl AsRef<[u8]>) -> Result> { let mut stream = self .list_with_opts(ListOptions::single(name.as_ref())) .await?; - Ok(stream.next().await.transpose()?) + stream.next().await.transpose() } /// List a range of tags From 2d3a90139c4048c2acc27f3fc8a8fc013cd835a1 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 11 Mar 2025 14:30:13 +0200 Subject: [PATCH 09/15] Add some simple smoke tests for tags --- src/hash.rs | 6 ++ src/rpc.rs | 12 +--- src/rpc/client/tags.rs | 47 ++++++++++++++-- src/store/fs.rs | 40 +++++++------ src/store/mem.rs | 24 +++++++- src/store/readonly_mem.rs | 6 +- src/store/traits.rs | 8 ++- tests/tags.rs | 115 ++++++++++++++++++++++++++++++++++++++ 8 files changed, 219 insertions(+), 39 deletions(-) create mode 100644 tests/tags.rs diff --git a/src/hash.rs b/src/hash.rs index ce87a06e8..dda748348 100644 --- a/src/hash.rs +++ b/src/hash.rs @@ -256,6 +256,12 @@ pub struct HashAndFormat { pub format: BlobFormat, } +impl From for HashAndFormat { + fn from(hash: Hash) -> Self { + Self::raw(hash) + } +} + #[cfg(feature = "redb")] mod redb_support { use postcard::experimental::max_size::MaxSize; diff --git a/src/rpc.rs b/src/rpc.rs index 8b73f69d5..b53e4971f 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -313,20 +313,10 @@ impl Handler { tracing::info!("blob_list_tags"); let blobs = self; Gen::new(|co| async move { - let tags = blobs.store().tags().await.unwrap(); + let tags = blobs.store().tags(msg.from, msg.to).await.unwrap(); #[allow(clippy::manual_flatten)] for item in tags { if let Ok((name, HashAndFormat { hash, format })) = item { - if let Some(from) = msg.from.as_ref() { - if &name < from { - continue; - } - } - if let Some(to) = msg.to.as_ref() { - if &name >= to { - break; - } - } if (format.is_raw() && msg.raw) || (format.is_hash_seq() && msg.hash_seq) { co.yield_(TagInfo { name, hash, format }).await; } diff --git a/src/rpc/client/tags.rs b/src/rpc/client/tags.rs index 40f571efa..8bb84990e 100644 --- a/src/rpc/client/tags.rs +++ b/src/rpc/client/tags.rs @@ -144,12 +144,33 @@ pub struct DeleteOptions { impl DeleteOptions { /// Delete a single tag - pub fn single(name: Tag) -> Self { + pub fn single(name: &[u8]) -> Self { + let name = Tag::from(name); Self { to: Some(name.successor()), from: Some(name), } } + + /// Delete a range of tags + pub fn range(range: R) -> Self + where + R: RangeBounds, + E: AsRef<[u8]>, + { + let (from, to) = tags_from_range(range); + Self { from, to } + } + + /// Delete tags with a prefix + pub fn prefix(prefix: &[u8]) -> Self { + let from = Tag::from(prefix); + let to = from.next_prefix(); + Self { + from: Some(from), + to, + } + } } /// A client that uses the memory connector. @@ -209,7 +230,7 @@ where self.list_with_opts(ListOptions::range(range)).await } - /// Lists all tags. + /// Lists all tags with the given prefix. pub async fn list_prefix( &self, prefix: impl AsRef<[u8]>, @@ -235,13 +256,29 @@ where } /// Deletes a tag. - pub async fn delete(&self, name: Tag) -> Result<()> { - self.delete_with_opts(DeleteOptions::single(name)).await + pub async fn delete(&self, name: impl AsRef<[u8]>) -> Result<()> { + self.delete_with_opts(DeleteOptions::single(name.as_ref())) + .await + } + + /// Deletes a range of tags. + pub async fn delete_range(&self, range: R) -> Result<()> + where + R: RangeBounds, + E: AsRef<[u8]>, + { + self.delete_with_opts(DeleteOptions::range(range)).await + } + + /// Lists all tags with the given prefix. + pub async fn delete_prefix(&self, prefix: impl AsRef<[u8]>) -> Result<()> { + self.delete_with_opts(DeleteOptions::prefix(prefix.as_ref())) + .await } } /// Information about a tag. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct TagInfo { /// Name of the tag pub name: Tag, diff --git a/src/store/fs.rs b/src/store/fs.rs index 6a63a624c..5f0b24ccc 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -601,8 +601,8 @@ pub(crate) enum ActorMessage { }, /// Bulk query method: get the entire tags table Tags { - #[debug(skip)] - filter: FilterPredicate, + from: Option, + to: Option, #[allow(clippy::type_complexity)] tx: oneshot::Sender< ActorResult>>, @@ -863,11 +863,13 @@ impl StoreInner { Ok(res) } - async fn tags(&self) -> OuterResult>> { + async fn tags( + &self, + from: Option, + to: Option, + ) -> OuterResult>> { let (tx, rx) = oneshot::channel(); - let filter: FilterPredicate = - Box::new(|_i, k, v| Some((k.value(), v.value()))); - self.tx.send(ActorMessage::Tags { filter, tx }).await?; + self.tx.send(ActorMessage::Tags { from, to, tx }).await?; let tags = rx.await?; // transform the internal error type into io::Error let tags = tags? @@ -1299,8 +1301,12 @@ impl super::ReadableStore for Store { Ok(Box::new(self.0.partial_blobs().await?.into_iter())) } - async fn tags(&self) -> io::Result> { - Ok(Box::new(self.0.tags().await?.into_iter())) + async fn tags( + &self, + from: Option, + to: Option, + ) -> io::Result> { + Ok(Box::new(self.0.tags(from, to).await?.into_iter())) } fn temp_tags(&self) -> Box + Send + Sync + 'static> { @@ -1985,23 +1991,21 @@ impl ActorState { fn tags( &mut self, tables: &impl ReadableTables, - filter: FilterPredicate, + from: Option, + to: Option, ) -> ActorResult>> { let mut res = Vec::new(); - let mut index = 0u64; - #[allow(clippy::explicit_counter_loop)] - for item in tables.tags().iter()? { + let from = from.map(Bound::Included).unwrap_or(Bound::Unbounded); + let to = to.map(Bound::Excluded).unwrap_or(Bound::Unbounded); + for item in tables.tags().range((from, to))? { match item { Ok((k, v)) => { - if let Some(item) = filter(index, k, v) { - res.push(Ok(item)); - } + res.push(Ok((k.value(), v.value()))); } Err(e) => { res.push(Err(e)); } } - index += 1; } Ok(res) } @@ -2342,8 +2346,8 @@ impl ActorState { let res = self.blobs(tables, filter); tx.send(res).ok(); } - ActorMessage::Tags { filter, tx } => { - let res = self.tags(tables, filter); + ActorMessage::Tags { from, to, tx } => { + let res = self.tags(tables, from, to); tx.send(res).ok(); } ActorMessage::GcStart { tx } => { diff --git a/src/store/mem.rs b/src/store/mem.rs index 6d3da9df7..4a19098b5 100644 --- a/src/store/mem.rs +++ b/src/store/mem.rs @@ -446,10 +446,30 @@ impl ReadableStore for Store { )) } - async fn tags(&self) -> io::Result> { + async fn tags( + &self, + from: Option, + to: Option, + ) -> io::Result> { #[allow(clippy::mutable_key_type)] let tags = self.read_lock().tags.clone(); - Ok(Box::new(tags.into_iter().map(Ok))) + let tags = tags + .into_iter() + .filter(move |(tag, _)| { + if let Some(from) = &from { + if tag < from { + return false; + } + } + if let Some(to) = &to { + if tag >= to { + return false; + } + } + true + }) + .map(Ok); + Ok(Box::new(tags)) } fn temp_tags(&self) -> Box + Send + Sync + 'static> { diff --git a/src/store/readonly_mem.rs b/src/store/readonly_mem.rs index e5d649ae9..aacf80851 100644 --- a/src/store/readonly_mem.rs +++ b/src/store/readonly_mem.rs @@ -232,7 +232,11 @@ impl ReadableStore for Store { )) } - async fn tags(&self) -> io::Result> { + async fn tags( + &self, + _from: Option, + _to: Option, + ) -> io::Result> { Ok(Box::new(std::iter::empty())) } diff --git a/src/store/traits.rs b/src/store/traits.rs index b9be04816..08a992b8d 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -262,7 +262,11 @@ pub trait ReadableStore: Map { /// been imported, and hash sequences that have been created internally. fn blobs(&self) -> impl Future>> + Send; /// list all tags (collections or other explicitly added things) in the database - fn tags(&self) -> impl Future>> + Send; + fn tags( + &self, + from: Option, + to: Option, + ) -> impl Future>> + Send; /// Temp tags fn temp_tags(&self) -> Box + Send + Sync + 'static>; @@ -635,7 +639,7 @@ pub(super) async fn gc_mark_task<'a>( } let mut roots = BTreeSet::new(); debug!("traversing tags"); - for item in store.tags().await? { + for item in store.tags(None, None).await? { let (name, haf) = item?; debug!("adding root {:?} {:?}", name, haf); roots.insert(haf); diff --git a/tests/tags.rs b/tests/tags.rs new file mode 100644 index 000000000..e3903a831 --- /dev/null +++ b/tests/tags.rs @@ -0,0 +1,115 @@ +#![cfg(all(feature = "net_protocol", feature = "rpc"))] +use futures_lite::StreamExt; +use futures_util::Stream; +use iroh::Endpoint; +use iroh_blobs::{ + net_protocol::Blobs, + rpc::{ + client::tags::{self, TagInfo}, + proto::RpcService, + }, + BlobFormat, Hash, +}; +use testresult::TestResult; + +async fn to_vec(stream: impl Stream>) -> anyhow::Result> { + let res = stream.collect::>().await; + res.into_iter().collect::>>() +} + +fn expected(tags: impl IntoIterator) -> Vec { + tags.into_iter() + .map(|tag| TagInfo { + name: tag.into(), + hash: Hash::new(tag), + format: BlobFormat::Raw, + }) + .collect() +} + +async fn tags_smoke>(tags: tags::Client) -> TestResult<()> { + tags.set("a", Hash::new("a")).await?; + tags.set("b", Hash::new("b")).await?; + tags.set("c", Hash::new("c")).await?; + tags.set("d", Hash::new("d")).await?; + tags.set("e", Hash::new("e")).await?; + let stream = tags.list().await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected(["a", "b", "c", "d", "e"])); + + let stream = tags.list_range("b".."d").await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected(["b", "c"])); + + let stream = tags.list_range("b"..).await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected(["b", "c", "d", "e"])); + + let stream = tags.list_range(.."d").await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected(["a", "b", "c"])); + + let stream = tags.list_range(..="d").await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected(["a", "b", "c", "d"])); + + tags.delete_range("b"..).await?; + let stream = tags.list().await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected(["a"])); + + tags.delete_range(..="a").await?; + let stream = tags.list().await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected([])); + + tags.set("a", Hash::new("a")).await?; + tags.set("aa", Hash::new("aa")).await?; + tags.set("aaa", Hash::new("aaa")).await?; + tags.set("aab", Hash::new("aab")).await?; + tags.set("b", Hash::new("b")).await?; + + let stream = tags.list_prefix("aa").await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected(["aa", "aaa", "aab"])); + + tags.delete_prefix("aa").await?; + let stream = tags.list().await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected(["a", "b"])); + + tags.delete_prefix("").await?; + let stream = tags.list().await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected([])); + + tags.set("a", Hash::new("a")).await?; + tags.set("b", Hash::new("b")).await?; + tags.set("c", Hash::new("c")).await?; + + tags.delete("b").await?; + let stream = tags.list().await?; + let res = to_vec(stream).await?; + assert_eq!(res, expected(["a", "c"])); + + Ok(()) +} + +#[tokio::test] +async fn tags_smoke_mem() -> TestResult<()> { + let endpoint = Endpoint::builder().bind().await?; + let blobs = Blobs::memory().build(&endpoint); + let client = blobs.client(); + tags_smoke(client.tags()).await +} + +#[tokio::test] +async fn tags_smoke_fs() -> TestResult<()> { + let td = tempfile::tempdir()?; + let endpoint = Endpoint::builder().bind().await?; + let blobs = Blobs::persistent(td.path().join("blobs.db")) + .await? + .build(&endpoint); + let client = blobs.client(); + tags_smoke(client.tags()).await +} From aaa01019d8fe790f40af13b53ea9877505a9cb09 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 11 Mar 2025 14:49:37 +0200 Subject: [PATCH 10/15] also add a get test --- tests/tags.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/tags.rs b/tests/tags.rs index e3903a831..f7584fa73 100644 --- a/tests/tags.rs +++ b/tests/tags.rs @@ -87,11 +87,22 @@ async fn tags_smoke>(tags: tags::Client) - tags.set("b", Hash::new("b")).await?; tags.set("c", Hash::new("c")).await?; + assert_eq!( + tags.get("b").await?, + Some(TagInfo { + name: "b".into(), + hash: Hash::new("b"), + format: BlobFormat::Raw, + }) + ); + tags.delete("b").await?; let stream = tags.list().await?; let res = to_vec(stream).await?; assert_eq!(res, expected(["a", "c"])); + assert_eq!(tags.get("b").await?, None); + Ok(()) } From eb2f9fb90a0a775041fd382f34edde5666468c2c Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 11 Mar 2025 14:57:26 +0200 Subject: [PATCH 11/15] add test for list_hash_seq --- src/rpc/client/tags.rs | 2 +- tests/tags.rs | 43 ++++++++++++++++++++++++++++-------------- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/src/rpc/client/tags.rs b/src/rpc/client/tags.rs index 8bb84990e..1e5948ecd 100644 --- a/src/rpc/client/tags.rs +++ b/src/rpc/client/tags.rs @@ -270,7 +270,7 @@ where self.delete_with_opts(DeleteOptions::range(range)).await } - /// Lists all tags with the given prefix. + /// Delete all tags with the given prefix. pub async fn delete_prefix(&self, prefix: impl AsRef<[u8]>) -> Result<()> { self.delete_with_opts(DeleteOptions::prefix(prefix.as_ref())) .await diff --git a/tests/tags.rs b/tests/tags.rs index f7584fa73..7a8417f82 100644 --- a/tests/tags.rs +++ b/tests/tags.rs @@ -8,7 +8,7 @@ use iroh_blobs::{ client::tags::{self, TagInfo}, proto::RpcService, }, - BlobFormat, Hash, + BlobFormat, Hash, HashAndFormat, }; use testresult::TestResult; @@ -27,12 +27,18 @@ fn expected(tags: impl IntoIterator) -> Vec { .collect() } +async fn set>( + tags: &tags::Client, + names: impl IntoIterator, +) -> TestResult<()> { + for name in names { + tags.set(name, Hash::new(name)).await?; + } + Ok(()) +} + async fn tags_smoke>(tags: tags::Client) -> TestResult<()> { - tags.set("a", Hash::new("a")).await?; - tags.set("b", Hash::new("b")).await?; - tags.set("c", Hash::new("c")).await?; - tags.set("d", Hash::new("d")).await?; - tags.set("e", Hash::new("e")).await?; + set(&tags, ["a", "b", "c", "d", "e"]).await?; let stream = tags.list().await?; let res = to_vec(stream).await?; assert_eq!(res, expected(["a", "b", "c", "d", "e"])); @@ -63,11 +69,7 @@ async fn tags_smoke>(tags: tags::Client) - let res = to_vec(stream).await?; assert_eq!(res, expected([])); - tags.set("a", Hash::new("a")).await?; - tags.set("aa", Hash::new("aa")).await?; - tags.set("aaa", Hash::new("aaa")).await?; - tags.set("aab", Hash::new("aab")).await?; - tags.set("b", Hash::new("b")).await?; + set(&tags, ["a", "aa", "aaa", "aab", "b"]).await?; let stream = tags.list_prefix("aa").await?; let res = to_vec(stream).await?; @@ -83,9 +85,7 @@ async fn tags_smoke>(tags: tags::Client) - let res = to_vec(stream).await?; assert_eq!(res, expected([])); - tags.set("a", Hash::new("a")).await?; - tags.set("b", Hash::new("b")).await?; - tags.set("c", Hash::new("c")).await?; + set(&tags, ["a", "b", "c"]).await?; assert_eq!( tags.get("b").await?, @@ -103,6 +103,21 @@ async fn tags_smoke>(tags: tags::Client) - assert_eq!(tags.get("b").await?, None); + tags.delete_prefix("").await?; + + tags.set("a", HashAndFormat::hash_seq(Hash::new("a"))) + .await?; + tags.set("b", HashAndFormat::raw(Hash::new("b"))).await?; + let stream = tags.list_hash_seq().await?; + let res = to_vec(stream).await?; + assert_eq!( + res, + vec![TagInfo { + name: "a".into(), + hash: Hash::new("a"), + format: BlobFormat::HashSeq, + },] + ); Ok(()) } From 0cbe68fd9ef6f87f63f85b8b3d2ece96ac336ff3 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 12 Mar 2025 10:35:37 +0200 Subject: [PATCH 12/15] Add a way to rename tags. --- src/rpc/client/tags.rs | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/src/rpc/client/tags.rs b/src/rpc/client/tags.rs index 1e5948ecd..023bc4226 100644 --- a/src/rpc/client/tags.rs +++ b/src/rpc/client/tags.rs @@ -13,7 +13,7 @@ use std::ops::{Bound, RangeBounds}; use anyhow::Result; -use futures_lite::{Stream, StreamExt}; +use futures_lite::{io, Stream, StreamExt}; use quic_rpc::{client::BoxedConnector, Connector, RpcClient}; use serde::{Deserialize, Serialize}; @@ -221,6 +221,20 @@ where stream.next().await.transpose() } + /// Rename a tag + /// + /// This is done in steps, so it is not atomic! + pub async fn rename(&self, from: impl AsRef<[u8]>, to: impl AsRef<[u8]>) -> Result<()> { + let from = from.as_ref(); + let to = to.as_ref(); + let Some(old) = self.get(from.as_ref()).await? else { + return Err(io::Error::new(io::ErrorKind::NotFound, "Tag not found").into()); + }; + self.set(to.as_ref(), old.hash_and_format()).await?; + self.delete(from.as_ref()).await?; + Ok(()) + } + /// List a range of tags pub async fn list_range(&self, range: R) -> Result>> where @@ -287,3 +301,13 @@ pub struct TagInfo { /// Hash of the data pub hash: Hash, } + +impl TagInfo { + /// Get the hash and format of the tag. + pub fn hash_and_format(&self) -> HashAndFormat { + HashAndFormat { + hash: self.hash, + format: self.format, + } + } +} From 50ce9f736d300903ee30e2206b78752f31e6ffd9 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 12 Mar 2025 10:53:37 +0200 Subject: [PATCH 13/15] Add lots of plumbing to make rename atomic --- src/rpc.rs | 13 ++++++++++++- src/rpc/proto/tags.rs | 11 +++++++++++ src/store/fs.rs | 35 +++++++++++++++++++++++++++++++++++ src/store/mem.rs | 12 ++++++++++++ src/store/readonly_mem.rs | 4 ++++ src/store/traits.rs | 3 +++ 6 files changed, 77 insertions(+), 1 deletion(-) diff --git a/src/rpc.rs b/src/rpc.rs index b53e4971f..d17364aec 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -30,7 +30,7 @@ use proto::{ }, tags::{ CreateRequest as TagsCreateRequest, DeleteRequest as TagDeleteRequest, - ListRequest as TagListRequest, SetRequest as TagsSetRequest, SyncMode, + ListRequest as TagListRequest, RenameRequest, SetRequest as TagsSetRequest, SyncMode, }, Request, RpcError, RpcResult, RpcService, }; @@ -158,6 +158,7 @@ impl Handler { Set(msg) => chan.rpc(msg, self, Self::tags_set).await, DeleteTag(msg) => chan.rpc(msg, self, Self::blob_delete_tag).await, ListTags(msg) => chan.server_streaming(msg, self, Self::blob_list_tags).await, + Rename(msg) => chan.rpc(msg, self, Self::tags_rename).await, } } @@ -382,6 +383,16 @@ impl Handler { rx.map(AddPathResponse) } + async fn tags_rename(self, msg: RenameRequest) -> RpcResult<()> { + let blobs = self; + blobs + .store() + .rename_tag(msg.from, msg.to) + .await + .map_err(|e| RpcError::new(&e))?; + Ok(()) + } + async fn tags_set(self, msg: TagsSetRequest) -> RpcResult<()> { let blobs = self; blobs diff --git a/src/rpc/proto/tags.rs b/src/rpc/proto/tags.rs index 61d2cff01..f30547fa7 100644 --- a/src/rpc/proto/tags.rs +++ b/src/rpc/proto/tags.rs @@ -20,6 +20,8 @@ pub enum Request { #[rpc(response = RpcResult<()>)] Set(SetRequest), #[rpc(response = RpcResult<()>)] + Rename(RenameRequest), + #[rpc(response = RpcResult<()>)] DeleteTag(DeleteRequest), #[server_streaming(response = TagInfo)] ListTags(ListRequest), @@ -111,3 +113,12 @@ impl From for DeleteRequest { } } } + +/// Rename a tag atomically +#[derive(Debug, Serialize, Deserialize)] +pub struct RenameRequest { + /// Old tag name + pub from: Tag, + /// New tag name + pub to: Tag, +} diff --git a/src/store/fs.rs b/src/store/fs.rs index 5f0b24ccc..091d674cf 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -625,6 +625,12 @@ pub(crate) enum ActorMessage { hash: HashAndFormat, tx: oneshot::Sender>, }, + /// Modification method: rename a tag atomically. + RenameTag { + from: Tag, + to: Tag, + tx: oneshot::Sender>, + }, /// Modification method: unconditional delete the data for a number of hashes Delete { hashes: Vec, @@ -680,6 +686,7 @@ impl ActorMessage { | Self::SetFullEntryState { .. } | Self::Delete { .. } | Self::DeleteTags { .. } + | Self::RenameTag { .. } | Self::GcDelete { .. } => MessageCategory::ReadWrite, Self::UpdateInlineOptions { .. } | Self::Sync { .. } @@ -901,6 +908,14 @@ impl StoreInner { Ok(rx.await??) } + async fn rename_tag(&self, from: Tag, to: Tag) -> OuterResult<()> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(ActorMessage::RenameTag { from, to, tx }) + .await?; + Ok(rx.await??) + } + async fn delete(&self, hashes: Vec) -> OuterResult<()> { let (tx, rx) = oneshot::channel(); self.tx.send(ActorMessage::Delete { hashes, tx }).await?; @@ -1404,6 +1419,10 @@ impl super::Store for Store { Ok(self.0.create_tag(hash).await?) } + async fn rename_tag(&self, from: Tag, to: Tag) -> io::Result<()> { + Ok(self.0.rename_tag(from, to).await?) + } + async fn delete(&self, hashes: Vec) -> io::Result<()> { Ok(self.0.delete(hashes).await?) } @@ -2021,6 +2040,18 @@ impl ActorState { Ok(tag) } + fn rename_tag(&mut self, tables: &mut Tables, from: Tag, to: Tag) -> ActorResult<()> { + let value = tables + .tags + .get(from)? + .ok_or_else(|| { + ActorError::Io(io::Error::new(io::ErrorKind::NotFound, "tag not found")) + })? + .value(); + tables.tags.insert(to, value)?; + Ok(()) + } + fn set_tag(&self, tables: &mut Tables, tag: Tag, value: HashAndFormat) -> ActorResult<()> { tables.tags.insert(tag, value)?; Ok(()) @@ -2393,6 +2424,10 @@ impl ActorState { let res = self.create_tag(tables, hash); tx.send(res).ok(); } + ActorMessage::RenameTag { from, to, tx } => { + let res = self.rename_tag(tables, from, to); + tx.send(res).ok(); + } ActorMessage::Delete { hashes, tx } => { let res = self.delete(tables, hashes, true); tx.send(res).ok(); diff --git a/src/store/mem.rs b/src/store/mem.rs index 4a19098b5..105b8ddd5 100644 --- a/src/store/mem.rs +++ b/src/store/mem.rs @@ -208,6 +208,18 @@ impl super::Store for Store { .await? } + async fn rename_tag(&self, from: Tag, to: Tag) -> io::Result<()> { + let mut state = self.write_lock(); + let value = state.tags.remove(&from).ok_or_else(|| { + io::Error::new( + io::ErrorKind::NotFound, + format!("tag not found: {:?}", from), + ) + })?; + state.tags.insert(to, value); + Ok(()) + } + async fn set_tag(&self, name: Tag, value: HashAndFormat) -> io::Result<()> { let mut state = self.write_lock(); state.tags.insert(name, value); diff --git a/src/store/readonly_mem.rs b/src/store/readonly_mem.rs index aacf80851..ec8f1b5f3 100644 --- a/src/store/readonly_mem.rs +++ b/src/store/readonly_mem.rs @@ -307,6 +307,10 @@ impl super::Store for Store { Err(io::Error::new(io::ErrorKind::Other, "not implemented")) } + async fn rename_tag(&self, _from: Tag, _to: Tag) -> io::Result<()> { + Err(io::Error::new(io::ErrorKind::Other, "not implemented")) + } + async fn import_stream( &self, data: impl Stream> + Unpin + Send, diff --git a/src/store/traits.rs b/src/store/traits.rs index 08a992b8d..1d0df2325 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -352,6 +352,9 @@ pub trait Store: ReadableStore + MapMut + std::fmt::Debug { hash: HashAndFormat, ) -> impl Future> + Send; + /// Rename a tag + fn rename_tag(&self, from: Tag, to: Tag) -> impl Future> + Send; + /// Delete a single tag fn delete_tag(&self, name: Tag) -> impl Future> + Send { self.delete_tags(Some(name.clone()), Some(name.successor())) From bee5bfa430e24e4b0324cff547672926fd1690d2 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 12 Mar 2025 11:11:13 +0200 Subject: [PATCH 14/15] add a test for rename --- src/rpc/client/tags.rs | 20 ++++++++++++++++++++ tests/tags.rs | 37 +++++++++++++++++++++++-------------- 2 files changed, 43 insertions(+), 14 deletions(-) diff --git a/src/rpc/client/tags.rs b/src/rpc/client/tags.rs index 023bc4226..e93c77f26 100644 --- a/src/rpc/client/tags.rs +++ b/src/rpc/client/tags.rs @@ -289,6 +289,15 @@ where self.delete_with_opts(DeleteOptions::prefix(prefix.as_ref())) .await } + + /// Delete all tags. Use with care. After this, all data will be garbage collected. + pub async fn delete_all(&self) -> Result<()> { + self.delete_with_opts(DeleteOptions { + from: None, + to: None, + }) + .await + } } /// Information about a tag. @@ -303,6 +312,17 @@ pub struct TagInfo { } impl TagInfo { + /// Create a new tag info. + pub fn new(name: impl AsRef<[u8]>, value: impl Into) -> Self { + let name = name.as_ref(); + let value = value.into(); + Self { + name: Tag::from(name), + hash: value.hash, + format: value.format, + } + } + /// Get the hash and format of the tag. pub fn hash_and_format(&self) -> HashAndFormat { HashAndFormat { diff --git a/tests/tags.rs b/tests/tags.rs index 7a8417f82..8a4af2d54 100644 --- a/tests/tags.rs +++ b/tests/tags.rs @@ -8,7 +8,7 @@ use iroh_blobs::{ client::tags::{self, TagInfo}, proto::RpcService, }, - BlobFormat, Hash, HashAndFormat, + Hash, HashAndFormat, }; use testresult::TestResult; @@ -19,11 +19,7 @@ async fn to_vec(stream: impl Stream>) -> anyhow::Res fn expected(tags: impl IntoIterator) -> Vec { tags.into_iter() - .map(|tag| TagInfo { - name: tag.into(), - hash: Hash::new(tag), - format: BlobFormat::Raw, - }) + .map(|tag| TagInfo::new(tag, Hash::new(tag))) .collect() } @@ -89,11 +85,7 @@ async fn tags_smoke>(tags: tags::Client) - assert_eq!( tags.get("b").await?, - Some(TagInfo { - name: "b".into(), - hash: Hash::new("b"), - format: BlobFormat::Raw, - }) + Some(TagInfo::new("b", Hash::new("b"))) ); tags.delete("b").await?; @@ -103,7 +95,7 @@ async fn tags_smoke>(tags: tags::Client) - assert_eq!(tags.get("b").await?, None); - tags.delete_prefix("").await?; + tags.delete_all().await?; tags.set("a", HashAndFormat::hash_seq(Hash::new("a"))) .await?; @@ -115,9 +107,26 @@ async fn tags_smoke>(tags: tags::Client) - vec![TagInfo { name: "a".into(), hash: Hash::new("a"), - format: BlobFormat::HashSeq, - },] + format: iroh_blobs::BlobFormat::HashSeq, + }] ); + + tags.delete_all().await?; + set(&tags, ["c"]).await?; + tags.rename("c", "f").await?; + let stream = tags.list().await?; + let res = to_vec(stream).await?; + assert_eq!( + res, + vec![TagInfo { + name: "f".into(), + hash: Hash::new("c"), + format: iroh_blobs::BlobFormat::Raw, + }] + ); + + let res = tags.rename("y", "z").await; + assert!(res.is_err()); Ok(()) } From fbd6f1a328a574f034727b15d186570a8e68ae50 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 12 Mar 2025 15:53:25 +0200 Subject: [PATCH 15/15] Add top level docs to the tags api --- src/rpc/client/tags.rs | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/rpc/client/tags.rs b/src/rpc/client/tags.rs index e93c77f26..595a139d3 100644 --- a/src/rpc/client/tags.rs +++ b/src/rpc/client/tags.rs @@ -2,14 +2,24 @@ //! //! The purpose of tags is to mark information as important to prevent it //! from being garbage-collected (if the garbage collector is turned on). -//! Currently this is used for blobs. //! -//! The main entry point is the [`Client`]. +//! A tag has a name that is an arbitrary byte string. In many cases this will be +//! a valid UTF8 string, but there are also use cases where it is useful to have +//! non string data like integer ids in the tag name. +//! +//! Tags point to a [`HashAndFormat`]. +//! +//! A tag can point to a hash with format [`BlobFormat::Raw`]. In that case it will +//! protect *just this blob* from being garbage-collected. //! -//! [`Client::list`] can be used to list all tags. -//! [`Client::list_hash_seq`] can be used to list all tags with a hash_seq format. +//! It can also point to a hash in format [`BlobFormat::HashSeq`]. In that case it will +//! protect the blob itself and all hashes in the blob (the blob must be just a sequence of hashes). +//! Using this format it is possible to protect a large number of blobs with a single tag. //! -//! [`Client::delete`] can be used to delete a tag. +//! Tags can be created, read, renamed and deleted. Tags *do not* have to correspond to +//! already existing data. It is perfectly valid to create a tag for data you don't have yet. +//! +//! The main entry point is the [`Client`]. use std::ops::{Bound, RangeBounds}; use anyhow::Result; @@ -221,9 +231,9 @@ where stream.next().await.transpose() } - /// Rename a tag + /// Rename a tag atomically /// - /// This is done in steps, so it is not atomic! + /// If the tag does not exist, this will return an error. pub async fn rename(&self, from: impl AsRef<[u8]>, to: impl AsRef<[u8]>) -> Result<()> { let from = from.as_ref(); let to = to.as_ref();