Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6c9be70
feat: Implement Phase 4 - Proximity-based update forwarding (#1848)
sanity Sep 24, 2025
2e9ba4a
fix: Address [Codex] review comments from PR1851
sanity Sep 24, 2025
3d72e88
fix: Address clippy errors for CI
sanity Sep 24, 2025
3e57d48
feat: Implement cache state sync on new peer connections
sanity Sep 24, 2025
6bf9262
feat: Implement periodic batch announcements for proximity cache
sanity Sep 24, 2025
65c039e
fix: Resolve all three critical issues in proximity-based update forw…
sanity Sep 25, 2025
0992b2e
Remove Phase X implementation artifacts from comments
sanity Sep 25, 2025
5451ac4
Address code review feedback from @iduartgomez
sanity Sep 28, 2025
bc7122e
Fix proximity cache announcements and test issues
sanity Sep 30, 2025
d84367f
Add debug logging to diagnose CI test timeout
sanity Sep 30, 2025
630cff1
Increase network stabilization time for CI environment
sanity Sep 30, 2025
daf948f
Increase cache announcement propagation delay for CI
sanity Sep 30, 2025
26f9ce1
Remove debug logging from test
sanity Sep 30, 2025
621be78
refactor: consolidate comments for easier review
sanity Sep 30, 2025
d6a3cac
fix: increase network stabilization delay for CI
sanity Sep 30, 2025
3734ac8
test: add port release delay to prevent connection failures
sanity Oct 1, 2025
17cbc4c
add proximity cache support to OpManager and related components
netsirius Oct 4, 2025
4df12c7
fix: Remove unnecessary async from get_broadcast_targets_update
sanity Oct 5, 2025
764a427
fix: Resolve stack overflow and improve proximity cache broadcasting
sanity Oct 7, 2025
36dfd7c
fix: resolve transport-layer retransmission flooding
sanity Oct 8, 2025
97e97a8
Merge branch 'main' into fix/1848-phase4-proximity-implementation
sanity Oct 8, 2025
bceb641
fix: reduce exponential backoff cap and increase test timeouts
sanity Oct 8, 2025
d02826f
fix: increase test_three_node_network_connectivity timeout to 300s
sanity Oct 9, 2025
84892e4
fix: increase proximity test response timeouts to 120s
sanity Oct 9, 2025
4d2bf81
fix: increase proximity test overall timeout from 300s to 500s
sanity Oct 9, 2025
a4e0a2c
fix: increase network stabilization delay from 45s to 120s
sanity Oct 9, 2025
1c18a14
Merge branch 'main' into fix/1848-phase4-proximity-implementation
sanity Oct 9, 2025
4643550
fix: increase test_three_node_network_connectivity operation timeouts…
sanity Oct 9, 2025
442dda7
fix: prevent orphaned callbacks in handle_connect_peer causing channe…
sanity Oct 9, 2025
93fa53e
refactor: reduce test timeouts after fixing orphaned callback bug
sanity Oct 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 76 additions & 4 deletions crates/core/src/client_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ pub async fn client_event_handling<ClientEv>(
mut client_events: ClientEv,
mut client_responses: ClientResponsesReceiver,
node_controller: tokio::sync::mpsc::Sender<NodeEvent>,
proximity_cache: Arc<crate::node::proximity_cache::ProximityCacheManager>,
) -> anyhow::Result<Infallible>
where
ClientEv: ClientEventsProxy + Send + 'static,
Expand Down Expand Up @@ -245,7 +246,7 @@ where
}
};
let cli_id = req.client_id;
let res = process_open_request(req, op_manager.clone(), request_router.clone()).await;
let res = process_open_request(req, op_manager.clone(), request_router.clone(), proximity_cache.clone()).await;
results.push(async move {
match res.await {
Ok(Some(Either::Left(res))) => (cli_id, Ok(Some(res))),
Expand Down Expand Up @@ -320,6 +321,9 @@ where
QueryResult::NodeDiagnostics(response) => {
Ok(HostResponse::QueryResponse(QueryResponse::NodeDiagnostics(response)))
}
QueryResult::ProximityCache(proximity_info) => {
Ok(HostResponse::QueryResponse(QueryResponse::ProximityCache(proximity_info)))
}
};
if let Ok(result) = &res {
tracing::debug!(%result, "sending client operation response");
Expand Down Expand Up @@ -356,10 +360,69 @@ enum Error {
}

#[inline]
async fn handle_proximity_cache_info_query(
proximity_cache: &Arc<crate::node::proximity_cache::ProximityCacheManager>,
) -> freenet_stdlib::client_api::ProximityCacheInfo {
let (my_cache_hashes, neighbor_cache_data) = proximity_cache.get_introspection_data().await;
let stats = proximity_cache.get_stats().await;

let my_cache = my_cache_hashes
.into_iter()
.map(|hash| freenet_stdlib::client_api::ContractCacheEntry {
contract_key: format!("hash_{:08x}", hash),
cache_hash: hash,
cached_since: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
})
.collect();

let neighbor_caches: Vec<_> = neighbor_cache_data
.into_iter()
.map(
|(peer_id, contracts)| freenet_stdlib::client_api::NeighborCacheInfo {
peer_id,
known_contracts: contracts,
last_update: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
update_count: 1,
},
)
.collect();

let total_neighbors = neighbor_caches.len();
let total_contracts: usize = neighbor_caches
.iter()
.map(|n| n.known_contracts.len())
.sum();
let avg_cache_size = if total_neighbors > 0 {
total_contracts as f32 / total_neighbors as f32
} else {
0.0
};

freenet_stdlib::client_api::ProximityCacheInfo {
my_cache,
neighbor_caches,
stats: freenet_stdlib::client_api::ProximityStats {
cache_announces_sent: stats.cache_announces_sent,
cache_announces_received: stats.cache_announces_received,
updates_via_proximity: stats.updates_via_proximity,
updates_via_subscription: stats.updates_via_subscription,
false_positive_forwards: stats.false_positive_forwards,
avg_neighbor_cache_size: avg_cache_size,
},
}
}

async fn process_open_request(
mut request: OpenRequest<'static>,
op_manager: Arc<OpManager>,
request_router: Option<Arc<crate::node::RequestRouter>>,
proximity_cache: Arc<crate::node::proximity_cache::ProximityCacheManager>,
) -> BoxFuture<'static, Result<Option<Either<QueryResult, mpsc::Receiver<QueryResult>>>, Error>> {
let (callback_tx, callback_rx) = if matches!(
&*request.request,
Expand Down Expand Up @@ -1239,6 +1302,17 @@ async fn process_open_request(
ClientRequest::NodeQueries(query) => {
tracing::debug!("Received node queries from user event: {:?}", query);

if matches!(
query,
freenet_stdlib::client_api::NodeQuery::ProximityCacheInfo
) {
let proximity_info = handle_proximity_cache_info_query(&proximity_cache).await;
return Ok(Some(Either::Left(QueryResult::ProximityCache(
proximity_info,
))));
}

// For other queries, we need to use the callback_tx
let Some(tx) = callback_tx else {
tracing::error!("callback_tx not available for NodeQueries");
unreachable!("callback_tx should always be Some for NodeQueries based on initialization logic");
Expand All @@ -1258,9 +1332,7 @@ async fn process_open_request(
}
}
freenet_stdlib::client_api::NodeQuery::ProximityCacheInfo => {
// TODO: Implement proximity cache info query
tracing::warn!("ProximityCacheInfo query not yet implemented");
return Ok(None);
unreachable!("ProximityCacheInfo handled above")
}
};

Expand Down
25 changes: 24 additions & 1 deletion crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{

use crate::{
client_events::{ClientId, HostResult},
node::PeerId,
node::{proximity_cache::ProximityCacheMessage, PeerId},
operations::{
connect::ConnectMsg, get::GetMsg, put::PutMsg, subscribe::SubscribeMsg, update::UpdateMsg,
},
Expand Down Expand Up @@ -255,6 +255,10 @@ pub(crate) enum NetMessageV1 {
},
Update(UpdateMsg),
Aborted(Transaction),
ProximityCache {
from: PeerId,
message: ProximityCacheMessage,
},
}

trait Versioned {
Expand All @@ -279,6 +283,7 @@ impl Versioned for NetMessageV1 {
NetMessageV1::Unsubscribed { .. } => semver::Version::new(1, 0, 0),
NetMessageV1::Update(_) => semver::Version::new(1, 0, 0),
NetMessageV1::Aborted(_) => semver::Version::new(1, 0, 0),
NetMessageV1::ProximityCache { .. } => semver::Version::new(1, 0, 0),
}
}
}
Expand Down Expand Up @@ -334,6 +339,11 @@ pub(crate) enum NodeEvent {
key: ContractKey,
subscribed: bool,
},
/// Broadcast a ProximityCache message to all connected peers
BroadcastProximityCache {
from: PeerId,
message: crate::node::proximity_cache::ProximityCacheMessage,
},
/// Send a message to a peer over the network
SendMessage {
target: PeerId,
Expand Down Expand Up @@ -373,6 +383,7 @@ pub(crate) enum QueryResult {
},
NetworkDebug(NetworkDebugInfo),
NodeDiagnostics(freenet_stdlib::client_api::NodeDiagnosticsResponse),
ProximityCache(freenet_stdlib::client_api::ProximityCacheInfo),
}

impl Display for NodeEvent {
Expand Down Expand Up @@ -415,6 +426,9 @@ impl Display for NodeEvent {
"Local subscribe complete (tx: {tx}, key: {key}, subscribed: {subscribed})"
)
}
NodeEvent::BroadcastProximityCache { from, .. } => {
write!(f, "BroadcastProximityCache (from {from})")
}
NodeEvent::SendMessage { target, msg } => {
write!(f, "SendMessage (to {target}, tx: {})", msg.id())
}
Expand Down Expand Up @@ -452,6 +466,7 @@ impl MessageStats for NetMessageV1 {
NetMessageV1::Update(op) => op.id(),
NetMessageV1::Aborted(tx) => tx,
NetMessageV1::Unsubscribed { transaction, .. } => transaction,
NetMessageV1::ProximityCache { .. } => Transaction::NULL,
}
}

Expand All @@ -464,6 +479,7 @@ impl MessageStats for NetMessageV1 {
NetMessageV1::Update(op) => op.target().as_ref().map(|b| b.borrow().clone()),
NetMessageV1::Aborted(_) => None,
NetMessageV1::Unsubscribed { .. } => None,
NetMessageV1::ProximityCache { .. } => None,
}
}

Expand All @@ -476,6 +492,7 @@ impl MessageStats for NetMessageV1 {
NetMessageV1::Update(op) => op.requested_location(),
NetMessageV1::Aborted(_) => None,
NetMessageV1::Unsubscribed { .. } => None,
NetMessageV1::ProximityCache { .. } => None,
}
}
}
Expand All @@ -495,6 +512,12 @@ impl Display for NetMessage {
Unsubscribed { key, from, .. } => {
write!(f, "Unsubscribed {{ key: {key}, from: {from} }}")?;
}
ProximityCache { from, message } => {
write!(
f,
"ProximityCache {{ from: {from}, message: {message:?} }}"
)?;
}
},
};
write!(f, "}}")
Expand Down
45 changes: 45 additions & 0 deletions crates/core/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ mod message_processor;
mod network_bridge;
mod op_state_manager;
mod p2p_impl;
pub(crate) mod proximity_cache;
mod request_router;
pub(crate) mod testing_impl;

Expand Down Expand Up @@ -808,6 +809,28 @@ async fn process_message_v1<CB>(
}
break;
}
NetMessageV1::ProximityCache { from, message } => {
// Handle proximity cache messages
if let Some(proximity_cache) = &op_manager.proximity_cache {
if let Some(response) =
proximity_cache.handle_message(from.clone(), message).await
{
// Send response back to the peer
let response_msg = NetMessage::V1(NetMessageV1::ProximityCache {
from: op_manager.ring.connection_manager.own_location().peer,
message: response,
});
if let Err(err) = conn_manager.send(&from, response_msg).await {
tracing::error!(
"Failed to send proximity cache response to {}: {}",
from,
err
);
}
}
}
break;
}
_ => break, // Exit the loop if no applicable message type is found
}
}
Expand Down Expand Up @@ -1015,6 +1038,28 @@ where
}
break;
}
NetMessageV1::ProximityCache { from, message } => {
// Handle proximity cache messages
if let Some(proximity_cache) = &op_manager.proximity_cache {
if let Some(response) =
proximity_cache.handle_message(from.clone(), message).await
{
// Send response back to the peer
let response_msg = NetMessage::V1(NetMessageV1::ProximityCache {
from: op_manager.ring.connection_manager.own_location().peer,
message: response,
});
if let Err(err) = conn_manager.send(&from, response_msg).await {
tracing::error!(
"Failed to send proximity cache response to {}: {}",
from,
err
);
}
}
}
break;
}
_ => break, // Exit the loop if no applicable message type is found
}
}
Expand Down
Loading
Loading