Skip to content

Commit

Permalink
fix(kad): don't assume QuerIds are unique
Browse files Browse the repository at this point in the history
We mistakenly assumed that `QueryId`s are unique in that, only a single request will be emitted per `QueryId`. This is wrong. A bootstrap for example will issue multiple requests as part of the same `QueryId`. Thus, we cannot use the `QueryId` as a key for the `FuturesMap`. Instead, we use a `FuturesTupleSet` to associate the `QueryId` with the in-flight request.

Related: #4901.
Resolves: #4948.

Pull-Request: #4971.
  • Loading branch information
thomaseizinger authored Dec 5, 2023
1 parent 0f98c98 commit f12dabc
Showing 1 changed file with 36 additions and 31 deletions.
67 changes: 36 additions & 31 deletions protocols/kad/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ pub struct Handler {
next_connec_unique_id: UniqueConnecId,

/// List of active outbound streams.
outbound_substreams: futures_bounded::FuturesMap<QueryId, io::Result<Option<KadResponseMsg>>>,
outbound_substreams:
futures_bounded::FuturesTupleSet<io::Result<Option<KadResponseMsg>>, QueryId>,

/// Contains one [`oneshot::Sender`] per outbound stream that we have requested.
pending_streams:
Expand Down Expand Up @@ -453,7 +454,7 @@ impl Handler {
remote_peer_id,
next_connec_unique_id: UniqueConnecId(0),
inbound_substreams: Default::default(),
outbound_substreams: futures_bounded::FuturesMap::new(
outbound_substreams: futures_bounded::FuturesTupleSet::new(
Duration::from_secs(10),
MAX_NUM_STREAMS,
),
Expand Down Expand Up @@ -552,32 +553,36 @@ impl Handler {
let (sender, receiver) = oneshot::channel();

self.pending_streams.push_back(sender);
let result = self.outbound_substreams.try_push(id, async move {
let mut stream = receiver
.await
.map_err(|_| io::Error::from(io::ErrorKind::BrokenPipe))?
.map_err(|e| match e {
StreamUpgradeError::Timeout => io::ErrorKind::TimedOut.into(),
StreamUpgradeError::Apply(e) => e,
StreamUpgradeError::NegotiationFailed => {
io::Error::new(io::ErrorKind::ConnectionRefused, "protocol not supported")
}
StreamUpgradeError::Io(e) => e,
})?;

let has_answer = !matches!(msg, KadRequestMsg::AddProvider { .. });

stream.send(msg).await?;
stream.close().await?;

if !has_answer {
return Ok(None);
}
let result = self.outbound_substreams.try_push(
async move {
let mut stream = receiver
.await
.map_err(|_| io::Error::from(io::ErrorKind::BrokenPipe))?
.map_err(|e| match e {
StreamUpgradeError::Timeout => io::ErrorKind::TimedOut.into(),
StreamUpgradeError::Apply(e) => e,
StreamUpgradeError::NegotiationFailed => io::Error::new(
io::ErrorKind::ConnectionRefused,
"protocol not supported",
),
StreamUpgradeError::Io(e) => e,
})?;

let has_answer = !matches!(msg, KadRequestMsg::AddProvider { .. });

stream.send(msg).await?;
stream.close().await?;

if !has_answer {
return Ok(None);
}

let msg = stream.next().await.ok_or(io::ErrorKind::UnexpectedEof)??;
let msg = stream.next().await.ok_or(io::ErrorKind::UnexpectedEof)??;

Ok(Some(msg))
});
Ok(Some(msg))
},
id,
);

debug_assert!(
result.is_ok(),
Expand Down Expand Up @@ -728,23 +733,23 @@ impl ConnectionHandler for Handler {
}

match self.outbound_substreams.poll_unpin(cx) {
Poll::Ready((query, Ok(Ok(Some(response))))) => {
Poll::Ready((Ok(Ok(Some(response))), query_id)) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
process_kad_response(response, query),
process_kad_response(response, query_id),
))
}
Poll::Ready((_, Ok(Ok(None)))) => {
Poll::Ready((Ok(Ok(None)), _)) => {
continue;
}
Poll::Ready((query_id, Ok(Err(e)))) => {
Poll::Ready((Ok(Err(e)), query_id)) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::QueryError {
error: HandlerQueryErr::Io(e),
query_id,
},
))
}
Poll::Ready((query_id, Err(_timeout))) => {
Poll::Ready((Err(_timeout), query_id)) => {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::QueryError {
error: HandlerQueryErr::Io(io::ErrorKind::TimedOut.into()),
Expand Down

0 comments on commit f12dabc

Please sign in to comment.