Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracking: P2P #64

Closed
23 tasks done
Rjected opened this issue Oct 14, 2022 · 10 comments
Closed
23 tasks done

Tracking: P2P #64

Rjected opened this issue Oct 14, 2022 · 10 comments
Assignees
Labels
A-devp2p Related to the Ethereum P2P protocol C-tracking-issue An issue that collects information about a broad development initiative

Comments

@Rjected
Copy link
Member

Rjected commented Oct 14, 2022

P2P Networking proposal and tracking

RLPx Peer Connection

The RLPx peer connection should implement framed encryption as specified by RLPx.

Client

The ECIES transport should implement AsyncRead and AsyncWrite, so the p2p connection can use it.
Long term, it would be nice to interact with framed encryption like this (similar to TLS libraries):

let tcp_conn = TcpStream::connect("127.0.0.1:30303")?;
let rlpx = RLPxConnector::new(secret_key, peer_id)?;
let mut client: ECIESStream = rlpx.connect(tcp_conn).await?;

client.write_all(b"hello").await?;

let mut buffer = [0; 16];
let n = client.read(&mut buffer[..]).await?;

High priority tasks:

  • Implement AsyncRead on ECIESStream<Io>
  • Implement AsyncWrite on ECIESStream<Io>

Lower priority:

  • Refactor ECIESStream<Io> to support the above UX.

Server

The RLPx Server should also work with any transport that implements AsyncRead and AsyncWrite.
Longer term, it should be possible to serve RLPx like this:

let acceptor = RLPxAcceptor::new(secret_key);
let listener = TcpListener::bind(&addr).await?;

// roughly based off of the design of tokio::net::TcpListener
loop {
    let (client: ECIESStream, remote_addr) = acceptor.accept(listener).await?;
    process_socket(client).await;
}

Low priority tasks:

  • Implement accept pattern on ECIESStream for an arbitrary Io that implements AsyncRead and AsyncWrite.

p2p Peer Connection

The RLPx peer connection will contain a server and client portion, and will take care of capability negotiation (the p2p capability), pings, and disconnects.
Both the client and server should be capable of working with a type that implements AsyncRead and AsyncWrite, meaning the transport does not need to implement framed encryption.
This makes it slightly easier to test, since it would not require creating an ECIESStream.

Client

We should be able to do something roughly similar to the above designs, but we need an extra task to handle driving the p2p state.
This is roughly inspired by the design of hyper's client::conn.

// it would be nice to have utilities to properly generate a `Hello` message
let hello = Hello { ... };

let tcp_conn = TcpStream::connect("127.0.0.1:30303")?;
let p2p_config = P2PConfig::new(hello);
let (mut client: P2PStream, p2p_state) = p2p_config.handshake(tcp_conn).await?;

tokio::spawn(async move {
    if let Err(e) = p2p_state.await {
        println!("Error! maybe ping error, etc: {}", e);
    }
});

// just an example, real messages are likely to be much more concrete
let message: Message = MessageBuilder::new()
    .with_hello(hello)
    .request_id(0xf)
    .message("hello");

// todo: may want to consider a message size limit on read
// ECIESStream can implement AsyncRead/AsyncWrite, but `p2p` changes the interface from working mainly with bufs to also including a message id
// luckily the stream after being decrypted should have a header that tells us the length
client.send(message).await?;
let peer_message = client.read().await?;

Tasks:

  • Implement RLPx connection with the above UX.

Server

This is roughly inspired by the design of hyper's server::conn.

let mut listener = TcpListener::bind(&addr).await?;
let server = P2PServer::new(hello);
loop {
    let (stream, remote_addr) = tcp_listener.accept().await?
    let mut client: P2PStream = server.accept(stream).await
    process_message(client).await;
}

Tasks:

  • Implement P2PServer

eth Peer Connection

This contains a client and server, just like the RLPx and p2p connection.
Instead of a p2p handshake, both a p2p and eth handshake are performed.

Client

// would also be nice to have a sensible way to generate `Status` messages
let status = Status { ... };

let tcp_conn = TcpStream::connect("127.0.0.1:30303")?;
// this should create a `P2PConfig` under the hood
let eth_config = EthConfig::new(hello, status);
let (mut client: EthStream, p2p_state) = eth_config.handshake(tcp_conn).await?;

tokio::spawn(async move {
    if let Err(e) = p2p_state.await {
        println!("Error! maybe ping error, maybe disconnect, etc: {}", e);
    }
});

// since we are in a subprotocol now we can create a more concrete message
let get_pooled_transactions: GetPooledTransactions = vec!["0x7cab7b2c72d1c6cc8539ee5e4b8af9b86a130d63b1428c2c52c4454ead266ff4"].into();

// TODO: should the client impl just take care of choosing request ids and abstract out multiplexing?
let pooled_transactions = client.send_request(get_pooled_transactions).await?;
let mut hashes = client.stream_hashes().await?;
// if there were another client its stream could be joined
while let Some(hash) = hashes.next().await {
    // ...
}

Tasks:

  • Implement EthConfig
  • Implement EthStream

Server

The eth server will wait for incoming connections and stream incoming

let mut listener = TcpListener::bind(&addr).await?;
// this call should make sure that the `Hello` message includes `eth` as a capability
let server = EthServer::new(hello, status);
loop {
    let (stream, remote_addr) = tcp_listener.accept().await?
    // this should create a `P2PServer` under the hood
    let mut client: EthStream = server.accept(stream).await
    process_message(client).await;
}

Tasks:

  • Implement EthServer

eth Wire Protocol

NOTE See: https://github.com/rjected/ethp2p

We need to provide RLP encoding and decoding for each of the eth protocol messages and each of the types they contain.

  • RequestPair type
  • Status (feat: add eth-wire #20)
  • NewBlockHashes
  • Transactions
  • GetBlockHeaders
  • BlockHeaders
  • GetBlockBodies
  • BlockBodies
  • NewBlock
  • NewPooledTransactionHashes
  • GetPooledTransactions
  • PooledTransactions
  • GetReceipts
    • What is this request/response pair used for?
  • Receipts

Once this is done, we can also create a Message type can be created that is the sum type of all the protocol messages.

eth Network abstraction

Finally, the Network abstraction will integrate all of the above components to provide a reliable connection to a set of peers on the network.
This will implement interfaces that other components (e.g. staged sync, txpool) will use.

The network abstraction will also integrate with discovery mechanisms and manage the discovery state.
This will act as a Server (an eth server) if the user is able to accept incoming connections and will use discovery methods to create outgoing connections, which will yield a set of EthStreams.

Current questions

How will we implement the Engine API? It prompts the execution layer to request blocks, which happens over eth p2p.
It could also just use the API provided by the Network abstraction.

@Rjected Rjected added A-devp2p Related to the Ethereum P2P protocol C-tracking-issue An issue that collects information about a broad development initiative labels Oct 14, 2022
@Rjected Rjected self-assigned this Oct 14, 2022
@gakonst
Copy link
Member

gakonst commented Oct 14, 2022

Long term, it would be nice to interact with framed encryption like this (similar to TLS libraries):

Love the API proposed that uses AsyncRead/Write.

  • Is there any limitation wrt what async runtime we'll need to use?.
  • What is the biggest blocker in terms of doing the refactor? What do you think about completely stripping out devp2p-rs as it is right now, and starting a clean write following the APIs you want?

Longer term, it should be possible to serve RLPx like this:

Same Q as above re: server refactor, if we do it from scratch, do you think we could build directly for such an API?

This makes it slightly easier to test, since it would not require creating an ECIESStream.
Once this is done, we can also create a Message type can be created that is the sum type of all the protocol messages.

Great!

// it would be nice to have utilities to properly generate a Hello message

Do you mean like builder pattern? We can either do it manually, or for simple structs use something like https://docs.rs/derive_builder/latest/derive_builder/. Same for your Status comment later.

// todo: may want to consider a message size limit on read
// ECIESStream can implement AsyncRead/AsyncWrite, but p2p changes the interface from working mainly with bufs to also including a message id
// luckily the stream after being decrypted should have a header that tells us the length

Didn't exactly understand what you mean here. cc @mattsse who's had thoughts on p2p.

let p2p_config = P2PConfig::new(hello);

Why does hello go into `P2PConfig?

// TODO: should the client impl just take care of choosing request ids and abstract out
multiplexing?

Any reason not to? Any tradeoff here?

// if there were another client its stream could be joined

Makes a lot of sense.

https://github.com/ethereum/devp2p/blob/master/caps/eth.md

^This is a sick doc.

Once this is done, we can also create a Message type can be created that is the sum type of all the protocol messages.

Yep, makes sense. How does Akula do it?

This will implement interfaces that other components (e.g. staged sync, txpool) will use.

How do you envision this to be exposed? Any changes you'd make vs how Akula does it, or how the so-far HeaderClient trait has been exposed in #58?

The network abstraction will also integrate with discovery mechanisms and manage the discovery state.

What kind of discovery mechanisms?

How will we implement the Engine API? It prompts the execution layer to request blocks, which happens over eth p2p.
It could also just use the API provided by the Network abstraction.

So looking at Akulas' implementation of the EngineApi it doesn't seem like they have logic for that? It delegates everything to the Sentry implementation? The stuff here also looks interesting. I would encourage us not to copy the code blindly, and think from first principles on how to do that, as you've said.

Sorry, I misunderstood. Do you mean exposing the engine_* endpoints over an RPC server? In that case, Akula exposes this over JSON RPC to the CL, which we also will as shown here.

This proceeds to send a new fork choice to the Headers Stage receiver which will get consumed like this. Effectively CL calls Engine API, which triggers new HEAD which triggers EL to send HeaderRequest to all its peers, and then download the headers.

Does that make sense?

@mattsse
Copy link
Collaborator

mattsse commented Oct 14, 2022

this is a great summary @Rjected

Implement AsyncRead ...

perhaps this is not even necessary if we have an ECIESStream codec https://docs.rs/tokio-codec/latest/tokio_codec/ and use the https://docs.rs/tokio-codec/latest/tokio_codec/struct.Framed.html adapter which automatically gives us AsyncRead+Write

Server

this pattern is pretty common:

  1. loop incoming
  2. spawn (peer) connections in new task: use shareable context/channels to communicate

CLient

this probably similar to the server but reversed.

for the tech stack, we should look closely at tower's Service trait: https://docs.rs/tower/latest/tower/trait.Service.html which is a request -> response function.

re @gakonst qs

Is there any limitation wrt what async runtime we'll need to use?.

not really, [Framed](https://docs.rs/tokio-util/0.2.0/tokio_util/codec/struct.Framed.html) for example is an adapter over Codec + AsyncRead|Write (which could be io for example) which then implements Stream+Sink with a message type as item.

For Sink it's basically: Item -> Encode -> Bytes -> AsyncWrite, and Stream in reverse: AsnycRead -> Bytes -> Decoder -> Item

@Rjected
Copy link
Member Author

Rjected commented Oct 14, 2022

What is the biggest blocker in terms of doing the refactor? What do you think about completely stripping out devp2p-rs as it is right now, and starting a clean write following the APIs you want?

No blockers, just hoping to get some input on the designs. A clean write would definitely let me build these APIs.

If we do it from scratch, do you think we could build directly for such an API?

Yes

Do you mean like builder pattern? We can either do it manually, or for simple structs use something like https://docs.rs/derive_builder/latest/derive_builder/. Same for your Status comment later.

Interesting, that would be useful. Yeah, more referring to something like the builder pattern.

Didn't exactly understand what you mean here

Since we need to handle the Ping and Pong messages, we need to parse all incoming messages and determine if it belongs to the p2p capability (p2p reserves message IDs 0x00-0x0f). We could still provide an interface similar to AsyncRead / AsyncWrite, but it might make more sense to return something that also contains the message ID parsed if it was not part of the p2p capability.

Why does hello go into `P2PConfig?

Because a Hello message is necessary for the p2p handshake, similar to how Status is necessary for the eth handshake. Would anything else go into the config? Or do you think it's possible for a hello message to not be input to the constructor?

Any reason not to? Any tradeoff here?

I don't think there's any reason not to (unless anyone else can think of anything) especially if we provide a request/response API to other components

How does Akula do it?

They have something similar here

How do you envision this to be exposed? Any changes you'd make vs how Akula does it, or how the so-far HeaderClient trait has been exposed in #58?

Overall, I like the idea of splitting things up by interface (e.g. HeadersClient, maybe we have TransactionClient) and having the Network implement those interfaces. Akula has a more general send_message. We could implement this too, but maybe we can get away with most things using higher level, context-specific APIs.

What kind of discovery mechanisms?

discv4 / discv5 / dnsdisc

Do you mean exposing the engine_* endpoints over an RPC server? ... Does that make sense?

Ah yeah that makes sense. The links are very helpful, thanks!

@mattsse
Copy link
Collaborator

mattsse commented Oct 14, 2022

@mattsse
Copy link
Collaborator

mattsse commented Oct 15, 2022

What we need

Discovery stack

  • built on top of UDP: allows us to find new peers to create outbound connections to

Devp2p stack

  • built on top of TCP: exchange data (tx etc..)

Server half (incoming connections)

  • listens for incoming connections and establishes RLPx sessions: handshake , RLPx transport protocol (multiplexed via message identifiers)
  • eth wire protocol is a sub protocol in an RLPx session: used to exchange ethereum blockchain info between peers

Client half (outgoing connections)

  • connections to new peers discovered via discovery

@Rjected a RLPx session is symmetrical, so there's no distinction between client/server peer, right? So outgoing + incoming connections should look the same.

Plugging everything together:

Looks like we have 3 big parts:

  1. udp stream -> yields discovered peers
  2. TCP stream -> yields incoming connections
  3. Outgoing connections: open connection to peer

2 + 3 should produce the same thing, a NodeSession?

Terminology

This can get confusing really quick.

Proposing some terms:

Swarm: Contains the state of the network and can be used to alter (e.g. adding peers manually). This will essentially be the user-facing API.

Node/Peer: Any peer that implements the devp2p protocol

(Connection)Pool: manages a set of connections

struct Swarm {
  incoming // A Stream type that produces "incoming connections"
  pool // manages active nodes, basically just keeps track of background tasks for the sessions, since sessions are separate task they should have a channel back to the pool so `pool <-> sessions` can exchange info, for example for notifying if session closed
  
}

// this is effectively the main event loop
Stream for Swarm {
   type Item = Event; // Some event type this stream produces

  fn next() {
      // 1. poll `incoming` stream that also yields an event
     // somethings needs to handle this new connection and decided what to do with it, for example, reject if spammer etc. or establish a connection
    // 2. pool manages established connections: spawn session with attached channel
  }
}

these are just my initial ideas, peer sessions are probably pretty self contained.

@gakonst
Copy link
Member

gakonst commented Oct 15, 2022

makes sense

@Rjected a RLPx session is symmetrical, so there's no distinction between client/server peer, right? So outgoing + incoming connections should look the same.

yes

@gakonst
Copy link
Member

gakonst commented Oct 16, 2022

Part 1, the encrypted connection is done in #80.

@Rjected
Copy link
Member Author

Rjected commented Oct 20, 2022

Wire protocol complete in #82

@Rjected
Copy link
Member Author

Rjected commented Nov 3, 2022

p2p component completed in #114, I think everything here has been more or less accomplished after #110 is done

@mattsse
Copy link
Collaborator

mattsse commented Nov 26, 2022

this has been completed.

Follow-up via individual issues on specific topics.

@mattsse mattsse closed this as completed Nov 26, 2022
@onbjerg onbjerg moved this to Todo in Reth Tracker Jan 4, 2023
@onbjerg onbjerg moved this from Todo to Done in Reth Tracker Jan 4, 2023
greged93 pushed a commit to greged93/reth that referenced this issue Dec 7, 2024
greged93 pushed a commit to greged93/reth that referenced this issue Dec 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-devp2p Related to the Ethereum P2P protocol C-tracking-issue An issue that collects information about a broad development initiative
Projects
Archived in project
Development

No branches or pull requests

3 participants