Skip to content

Commit

Permalink
Implement libp2p#868 for gossipsub.
Browse files Browse the repository at this point in the history
  • Loading branch information
AgeManning committed Feb 2, 2019
1 parent deccbe5 commit e69a772
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 378 deletions.
257 changes: 0 additions & 257 deletions protocols/gossipsub/src/handler.rs

This file was deleted.

42 changes: 36 additions & 6 deletions protocols/gossipsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@

use cuckoofilter::CuckooFilter;
use futures::prelude::*;
use handler::GossipsubHandler;
use libp2p_core::swarm::{
ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use libp2p_core::{protocols_handler::ProtocolsHandler, PeerId};
use libp2p_core::{
protocols_handler::{OneShotHandler, ProtocolsHandler},
PeerId,
};
use libp2p_floodsub::{Topic, TopicHash};
use mcache::MessageCache;
use protocol::{
GossipsubControlAction, GossipsubMessage, GossipsubRpc, GossipsubSubscription,
GossipsubSubscriptionAction,
GossipsubSubscriptionAction, ProtocolConfig,
};
use rand;
use rand::{seq::SliceRandom, thread_rng};
Expand Down Expand Up @@ -968,11 +970,11 @@ impl<TSubstream, TTopology> NetworkBehaviour<TTopology> for Gossipsub<TSubstream
where
TSubstream: AsyncRead + AsyncWrite,
{
type ProtocolsHandler = GossipsubHandler<TSubstream>;
type ProtocolsHandler = OneShotHandler<TSubstream, ProtocolConfig, GossipsubRpc, InnerMessage>;
type OutEvent = GossipsubEvent;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
GossipsubHandler::new()
Default::default()
}

fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) {
Expand Down Expand Up @@ -1071,7 +1073,12 @@ where
debug_assert!(was_in.is_some());
}

fn inject_node_event(&mut self, propagation_source: PeerId, event: GossipsubRpc) {
fn inject_node_event(&mut self, propagation_source: PeerId, event: InnerMessage) {
// ignore successful sends event
let event = match event {
InnerMessage::Rx(event) => event,
InnerMessage::Sent => return,
};
// Handle subscriptions
// Update connected peers topics
self.handle_received_subscriptions(event.subscriptions, &propagation_source);
Expand Down Expand Up @@ -1135,6 +1142,29 @@ where
Async::NotReady
}
}

/// Transmission between the `OneShotHandler` and the `GossipsubRpc`.
pub enum InnerMessage {
/// We received an RPC from a remote.
Rx(GossipsubRpc),
/// We successfully sent an RPC request.
Sent,
}

impl From<GossipsubRpc> for InnerMessage {
#[inline]
fn from(rpc: GossipsubRpc) -> InnerMessage {
InnerMessage::Rx(rpc)
}
}

impl From<()> for InnerMessage {
#[inline]
fn from(_: ()) -> InnerMessage {
InnerMessage::Sent
}
}

/// Event that can happen on the gossipsub behaviour.
#[derive(Debug)]
pub enum GossipsubEvent {
Expand Down
2 changes: 0 additions & 2 deletions protocols/gossipsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@ extern crate tokio_io;
extern crate tokio_timer;
extern crate unsigned_varint;

pub mod handler;
pub mod protocol;

mod layer;
mod mcache;
mod rpc_proto;

pub use self::handler::GossipsubHandler;
pub use self::layer::{Gossipsub, GossipsubConfig, GossipsubEvent};
pub use self::protocol::*;
Loading

0 comments on commit e69a772

Please sign in to comment.