Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples: Showcase how to use rust-libp2p in larger application #2158

Closed
mxinden opened this issue Jul 23, 2021 · 16 comments · Fixed by #2186
Closed

examples: Showcase how to use rust-libp2p in larger application #2158

mxinden opened this issue Jul 23, 2021 · 16 comments · Fixed by #2186
Labels
difficulty:easy getting-started Issues that can be tackled if you don't know the internals of libp2p very well help wanted

Comments

@mxinden
Copy link
Member

mxinden commented Jul 23, 2021

While we have many examples which showcase rust-libp2p in isolation, it would be great to have an example on how to integrate rust-libp2p into a larger application.

I was thinking of something along the lines of:

  • A single task polling (a) the Swarm and (b) a channel of incoming commands.

    The commands coming through (b) would contain a oneshot::Sender, allowing the task to send back a response when ready.

  • A remote control providing async methods, each sending a command down to the task, awaiting the oneshot::Receiver side.

The example should derive its own NetworkBehaviour based on e.g. libp2p-request-response and libp2p-identify.

See the following resources for additional background:

Happy to expand / discuss details in case someone wants to tackle this.

@mxinden mxinden added difficulty:easy help wanted getting-started Issues that can be tackled if you don't know the internals of libp2p very well labels Jul 23, 2021
@thomaseizinger
Copy link
Contributor

  • The commands coming through (b) would contain a oneshot::Sender, allowing the task to send back a response when ready.

I've found bmrng to be helpful here.

@mxinden
Copy link
Member Author

mxinden commented Jul 23, 2021

  • The commands coming through (b) would contain a oneshot::Sender, allowing the task to send back a response when ready.

I've found bmrng to be helpful here.

What would be the benefit of bmrng compared to a combination of mpsc::channel and mpsc::oneshot apart from being a bit less noisy?

@thomaseizinger
Copy link
Contributor

It is primarily less noisy but also comes with a built in timeout which is useful if you have other parts of the application that talk to the remote control and don't want to wait forever on results coming back from the network layer.

But really, the main value is convenience through f.e. the send function.

@elenaf9
Copy link
Contributor

elenaf9 commented Jul 27, 2021

I'd be happy to add such an example, similar to how it is done in the interface the iotaledger/stronghold.rs#210 PR you linked above.
But I probably won't get to it until end of next week, so in case anybody else would like to tackle this before, please feel free.

I like bmrng::channel_with_timeout because of the timeout, but I see it as a problem that you can set only one fixed timeout for all commands. I could imagine that in case of an e.g. start-listening command you may want to have a different timeout than for sending requests/ receiving the response.
And the documentation of bmrng states that it is a channel for Tokio, so not sure if it's good to use it within a different runtime. It could be that this is not an issue, but in case that it is, I don't think we should restrict this example to the usage of tokio.

@thomaseizinger
Copy link
Contributor

And the documentation of bmrng states that it is a channel for Tokio, so not sure if it's good to use it within a different runtime. It could be that this is not an issue, but in case that it is, I don't think we should restrict this example to the usage of tokio.

That is likely relevant for the timer inside the channel. There needs to be a tokio reacter somewhere running for this to work I believe. Otherwise it will panic at runtime.

@mxinden
Copy link
Member Author

mxinden commented Jul 27, 2021

I'd be happy to add such an example, similar to how it is done in the interface the iotaledger/stronghold.rs#210 PR you linked above. But I probably won't get to it until end of next week, so in case anybody else would like to tackle this before, please feel free.

🙏 thanks!

To keep things simple I would prefer an example with plain old future channels instead of bmrng. That said, I am fine referencing the crate in a comment.

@umgefahren
Copy link
Contributor

I tried implementing something like that but I'm hopelessly stuck. Please help.

Cargo.toml:

[dependencies]
tokio = { version = "1.9", features = ["full"] }
libp2p = {version = "0.39.1", features = ["mdns", "tcp-tokio"]}
async-std = { version = "1", features = ["attributes", "tokio1"] }

main.rs:

use std::sync::{Arc, Mutex};
use std::collections::HashMap;
use libp2p::swarm::{NetworkBehaviourEventProcess, NetworkBehaviour, SwarmEvent};
use libp2p::NetworkBehaviour;
use libp2p::PeerId;
use libp2p::kad::{Kademlia, QueryId, KademliaEvent, Quorum, Record, QueryResult, PutRecordError, PutRecordOk};
use libp2p::kad::store::MemoryStore;
use libp2p::mdns::{Mdns, MdnsEvent, MdnsConfig};
use libp2p::kad::record::Key;
use libp2p::{identity, noise, Transport, mplex, Swarm};
use libp2p::tcp::{TcpConfig, TokioTcpConfig};
use libp2p::core::upgrade;
use tokio::sync::oneshot;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::io::AsyncBufReadExt;
use libp2p::futures::StreamExt;
// use libp2p::futures::channel::oneshot;
// use libp2p::futures::channel::oneshot::Sender;

#[derive(NetworkBehaviour)]
struct MyBehaviour {
    kademlia: Kademlia<MemoryStore>,
    mdns: Mdns,
    #[behaviour(ignore)]
    ht: Arc<Mutex<HashMap<QueryId, Sender<QueryResult>>>>,
}

impl NetworkBehaviourEventProcess<MdnsEvent> for MyBehaviour {
    fn inject_event(&mut self, event: MdnsEvent) {
        match event {
            MdnsEvent::Discovered(list) => {
                for (peer_id, multiaddr) in list {
                    self.kademlia.add_address(&peer_id, multiaddr);
                }
            }
            MdnsEvent::Expired(list) => {
                for (peer_id, multiaddr) in list {
                    self.kademlia.remove_address(&peer_id, &multiaddr)
                        .expect("Error removing address");
                }
            }
        }
    }
}

impl NetworkBehaviourEventProcess<KademliaEvent> for MyBehaviour {
     fn inject_event(&mut self, event: KademliaEvent) {
        println!("event => {:?}", event);
        match event {
            KademliaEvent::OutboundQueryCompleted { result, id, .. } => {
                println!("Result => {:?}", result);
                // let res = tokio::task::block_in_place(move || self.ht.lock()).remove(&id).unwrap();
                let res = self.ht.lock().unwrap().remove(&id).unwrap();
                res.send(result).unwrap();
            },
            _ => {
                println!("Something other happened");
            }
        }
    }
}

async fn handle_input_line(kademlia: &mut Dht, line: String) {
    let mut args = line.split(' ');

    match args.next() {
        Some("GET") => {
            let key = {
                match args.next() {
                    Some(key) => Key::new(&key),
                    None => {
                        eprintln!("Expected key");
                        return;
                    }
                }
            };
            println!("Key => {:?}", key);
            let res = kademlia.get(key).await.unwrap();
            println!("Record is => {:?}", res);
        }
        Some("PUT") => {
            let key = {
                match args.next() {
                    Some(key) => Key::new(&key),
                    None => {
                        eprintln!("Expected key");
                        return;
                    }
                }
            };
            let value = {
                match args.next() {
                    Some(value) => value.as_bytes().to_vec(),
                    None => {
                        eprintln!("Expected value");
                        return;
                    }
                }
            };
            let record = Record {
                key,
                value,
                publisher: None,
                expires: None,
            };
            // kademlia.put_record(record, Quorum::One).expect("Failed to store record locally.");
            let res = kademlia.put(record).await.unwrap();
            println!("Success Key was => {:?}", res.to_vec());
        },
        _ => {}
    }
}

struct Dht (Swarm<MyBehaviour>);

impl Dht {
    pub fn new(swarm: Swarm<MyBehaviour>) -> Self {
        Self(swarm)
    }

    pub async fn put(&mut self, value: Record) -> Result<Key, &'static str> {
        let behaviour = self.0.behaviour_mut();
        let (sx, rx) = oneshot::channel();
        let query_id = behaviour.kademlia.put_record(value, Quorum::One).unwrap();
        behaviour.ht.lock().unwrap().insert(query_id, sx);
        println!("HT => {:?}", behaviour.ht);
        drop(behaviour);
        let res = rx.await.unwrap();
        match res {
            QueryResult::PutRecord(d) => {
                match d {
                    Ok(dd) => {
                        Ok(dd.key)
                    }
                    Err(e) => {
                        Err("Something went wrong again")
                    }
                }
            }
            _ => {
                Err("Something went wrong")
            }
        }
    }

    pub async fn get(&mut self, key: Key) -> Result<Record, &'static str> {
        let behaviour = self.0.behaviour_mut();
        let (sx, rx) = oneshot::channel();
        let res = tokio::spawn(async {
            rx.await.unwrap()
        });
        let query_id = behaviour.kademlia.get_record(&key, Quorum::One);
        behaviour.ht.lock().unwrap().insert(query_id, sx);
        println!("{:?}", behaviour.ht);
        match res.await.unwrap() {
            QueryResult::GetRecord(d) => {
                match d {
                    Ok(dd) => {
                        println!("DD => {:?}", dd);
                        Ok(dd.records.get(0).unwrap().record.clone())
                    }
                    Err(_) => {
                        Err("something went wrong again")
                    }
                }
            }
            _ => {
                Err("Something went wrong")
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let id_keys = identity::Keypair::generate_ed25519();
    let peer_id = PeerId::from(id_keys.public());
    println!("Local peer id => {:?}", peer_id);

    let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
        .into_authentic(&id_keys)
        .expect("Signing libp2p-noise static DH keypair failed.");

    let transport = TokioTcpConfig::new()
        .nodelay(true)
        .upgrade(upgrade::Version::V1)
        .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
        .multiplex(mplex::MplexConfig::new())
        .boxed();

    let mut swarm = {
        let store = MemoryStore::new(peer_id);
        let kademlia = Kademlia::new(peer_id, store);
        let mdns = Mdns::new(MdnsConfig::default()).await.unwrap();
        let ht = Arc::new(Mutex::new(HashMap::new()));
        let behaviour = MyBehaviour {
            kademlia,
            mdns,
            ht,
        };
        Swarm::new(transport, behaviour, peer_id)
    };


    let mut stdin = tokio::io::BufReader::new(tokio::io::stdin()).lines();

    swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap();

    let mut managed_swarm = Dht::new(swarm);

    managed_swarm.put(Record {
        key: Key::new(&[0, 0, 1]),
        value: vec![1, 3, 6],
        publisher: None,
        expires: None
    });

    loop {
        tokio::select! {
            line = stdin.next_line() => {
                let line = line.unwrap().expect("stdin closed");
                handle_input_line(&mut managed_swarm, line).await;
            }
            event = managed_swarm.0.select_next_some() => {
                println!("Event => {:?}", event);
                if let SwarmEvent::NewListenAddr { address, .. } = event {
                    println!("Listening on {:?}", address);
                }
            }
        }
    }
}

when I execute the program halts after printing the current HashTable and then stalls. What am I doing wrong?

@elenaf9
Copy link
Contributor

elenaf9 commented Jul 29, 2021

when I execute the program halts after printing the current HashTable and then stalls. What am I doing wrong?

First of all, you are not calling await on your async managed_swarm.put() method call. I am actually surprised that it prints the current HashTable, since I thought that async methods return a future and will do nothing if you do not call await on them.

But apart from that, I think the general problem is that you are blocking for a KademliaEvent before even starting to poll the swarm.
The Swarm needs to be continously polled via <ExpandedSwarm as Stream> in order to make any progress, e.g. with managed_swarm.0.select_next_some() that you are doing in your loop below.
As long is this is not done, no events will be reported via NetworkBehaviourEventProcess::inject_event.

In your method Dht::put, you write let res = rx.await.unwrap();. The Sender side of this channel will only write if a Kademlia event was reported, but at this point this will never happen because it takes place before you poll the swarm.
The same issue is also in Dht::get; while blocking you don't poll the swarm, hence no event can happen.

With your current implementation, this could maybe be solved with something like:

let query_id = behaviour.kademlia.put_record(value, Quorum::One).unwrap();
behaviour.ht.lock().unwrap().insert(query_id, sx);
println!("HT => {:?}", behaviour.ht);
drop(behaviour);
let res = loop {
   tokio::select! {
        _ = self.0.select_next_some() => {},
       res = rx.fuse() => break res;
    }
}

But since you are then essentially sending the event from within the NetworkBehaviourEventProcess::inject_event block to the loop in which you are also polling the swarm, I'd suggest to not use NetworkBehaviourEventProcess , and instead set event_process= false how it was also discussed in #2024 (comment).
Then you could directly poll the outcome from the swarm with something like this:

let query_id = behaviour.kademlia.put_record(value, Quorum::One).unwrap();
println!("HT => {:?}", behaviour.ht);
let res = loop {
   if let SwarmEvent::Behaviour(KademliaEvent::OutboundQueryCompleted {result, ..}) = self.0.select_next_some() {
        break result;
    }
}

@umgefahren
Copy link
Contributor

Ok I think I understand and I even got the first code to work. However I'm struggling to do the second thing. Could you give an example on how to set the behavior flag correctly?

@umgefahren
Copy link
Contributor

If I got it right from the documentation my Network Behavior struct has to look something like this:

#[derive(NetworkBehaviour)]
struct MyBehaviour {
    #[behaviour(out_event = "KademliaEvent")]
    #[behaviour(event_process = false)]
    kademlia: Kademlia<MemoryStore>,
    mdns: Mdns,
    #[behaviour(ignore)]
    ht: Arc<Mutex<HashMap<QueryId, Sender<QueryResult>>>>,
}

Without implementing NetworkBehaviourEventProcess<KademliaEvent> for MyBehaviour. However this doesn't compile.

@elenaf9
Copy link
Contributor

elenaf9 commented Jul 29, 2021

@umgefahren The attribut-macro #[behaviour(out_event = "KademliaEvent")] and #[behaviour(event_process = false)] has to be set for the whole structure, instead of just the kademlia field, so like this:

#[derive(NetworkBehaviour)]
#[behaviour(out_event = "OutEvent", event_process = false)]
struct MyBehaviour {
    kademlia: Kademlia<MemoryStore>,
    mdns: Mdns,
}

With OutEvent being a wrapper for the two possible events KademliaEvent and MdnsEvent:

pub enum OutEvent {
  Kademlia(KademliaEvent),
  Mdns(MdnsEvent)
}

impl From<KademliaEvent> for OutEvent {..}

impl From<MdnsEvent> for OutEvent {..}

And when polling the swarm, the Swarm::TBehaviour-type would be a OutEvent, so you have wrap within the if-statement once more:

let res = loop {
   if let SwarmEvent::Behaviour(OutEvent::Kademlia(KademliaEvent::...)) = self.0.select_next_some() {..}
}

This is quite analogous to how it is done in the project that @thomaseizinger linked in #2024 (comment), so I'd recommend to take a deeper look into their implementation in case of further questions.

@umgefahren
Copy link
Contributor

Thanks that worked :)
I clean up the code and post it here so that it might be considered when showing libp2p in larger application.

@umgefahren
Copy link
Contributor

Here is the result:
https://gist.github.com/umgefahren/4255d9ffa5623825d2b8920e4f1cf3fd

@thomaseizinger
Copy link
Contributor

thomaseizinger commented Jul 29, 2021

Here is the result:
gist.github.com/umgefahren/4255d9ffa5623825d2b8920e4f1cf3fd

Whilst this may work, it may impose some problems down the line :/

In particular, I think polling the swarm (i.e. calling .next), in multiple places is not a particularly good idea. That is because the Swarm will not do any work unless polled (same as futures). Hence to avoid bugs, you really want to make sure it is continuously polled which is best done if that only happens in a single place, like an EventLoop, and then you merely distribute the events and take commands through channels.

This is only one pattern but thinking of it as an actor-based design where you have a single worker that drives the Swarm is something that should go well with async. That is pretty much what @mxinden suggested in the first post. It is a bit hard to imagine so I am hoping I can put some code snippets together in the next couple of days!

@umgefahren
Copy link
Contributor

That would be definitely great

@thomaseizinger
Copy link
Contributor

I've posted something here: #2171

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
difficulty:easy getting-started Issues that can be tackled if you don't know the internals of libp2p very well help wanted
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants