Skip to content

Commit

Permalink
feat: make tentacle run on async-std
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Oct 14, 2020
1 parent 3be11de commit 88b3e80
Show file tree
Hide file tree
Showing 65 changed files with 537 additions and 128 deletions.
64 changes: 1 addition & 63 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,71 +1,9 @@
[package]
name = "tentacle"
version = "0.3.1"
license = "MIT"
description = "Minimal implementation for a multiplexed p2p network framework."
authors = ["piaoliu <441594700@qq.com>", "Nervos Core Dev <dev@nervos.org>"]
repository = "https://github.com/nervosnetwork/tentacle"
include = ["Cargo.toml", "src/*", "README.md", "LICENSE"]
readme = "README.md"
keywords = ["network", "peer-to-peer"]
categories = ["network-programming", "asynchronous"]
edition = "2018"

[package.metadata.docs.rs]
features = [ "molc" ]
all-features = false
no-default-features = true

[dependencies]
yamux = { path = "yamux", version = "0.2.6", package = "tokio-yamux" }
secio = { path = "secio", version = "0.4.1", package = "tentacle-secio" }

futures = { version = "0.3.0" }
tokio = { version = "0.2.0", features = ["time", "io-util", "tcp", "dns", "rt-threaded", "blocking"] }
tokio-util = { version = "0.3.0", features = ["codec"] }
log = "0.4"
bytes = "0.5.0"
thiserror = "1.0"
socket2 = { version = "0.3.15", features = ["reuseport"] }
tokio-tungstenite = { version = "0.11", optional = true }

flatbuffers = { version = "0.6.0", optional = true }
flatbuffers-verifier = { version = "0.2.0", optional = true }
multiaddr = { path = "multiaddr", package = "tentacle-multiaddr", version = "0.2.0" }
molecule = { version = "0.6.0", optional = true }

# upnp
igd = "0.9"

[target.'cfg(unix)'.dependencies.libc]
version = "0.2"

[target.'cfg(windows)'.dependencies.winapi]
version = "0.3.7"
features = ["minwindef", "ws2def", "winerror", "heapapi"]

[dev-dependencies]
env_logger = "0.6.0"
crossbeam-channel = "0.3.6"
systemstat = "0.1.3"
futures-test = "0.3.5"

[target.'cfg(unix)'.dev-dependencies]
nix = "0.13.0"

[features]
default = []
# use flatbuffer to handshake
flatc = [ "flatbuffers", "flatbuffers-verifier", "secio/flatc" ]
# use molecule to handshake
molc = [ "molecule", "secio/molc" ]
ws = ["tokio-tungstenite"]

[workspace]
members = [
"yamux",
"secio",
"multiaddr",
"tentacle",
"bench",
]
exclude = [
Expand Down
17 changes: 9 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,31 @@ FLATC_RUST_FILES := $(patsubst %.fbs,%_generated.rs,${FBS_FILES})
FLATBUFFERS_VERIFIER_FILES := $(patsubst %.fbs,%_generated_verifier.rs,${FBS_FILES})
MOL_RUST_FILES := $(patsubst %.mol,%_mol.rs,${MOL_FILES})

Change_Work_Path := cd tentacle

fmt:
cargo fmt --all -- --check

clippy:
RUSTFLAGS='-F warnings' cargo clippy --all --tests --features molc,ws -- -D clippy::let_underscore_must_use
RUSTFLAGS='-F warnings' cargo clippy --all --tests --features flatc -- -D clippy::let_underscore_must_use
$(Change_Work_Path) && RUSTFLAGS='-F warnings' cargo clippy --all --tests --features molc,ws -- -D clippy::let_underscore_must_use
$(Change_Work_Path) && RUSTFLAGS='-F warnings' cargo clippy --all --tests --features flatc -- -D clippy::let_underscore_must_use

test:
RUSTFLAGS='-F warnings' RUST_BACKTRACE=full cargo test --all --features molc,ws
RUSTFLAGS='-F warnings' RUST_BACKTRACE=full cargo test --all --features flatc
$(Change_Work_Path) && RUSTFLAGS='-F warnings' RUST_BACKTRACE=full cargo test --all --features molc,ws
$(Change_Work_Path) && RUSTFLAGS='-F warnings' RUST_BACKTRACE=full cargo test --all --features flatc

fuzz:
cargo +nightly fuzz run secio_crypto_decrypt_cipher -- -max_total_time=60
cargo +nightly fuzz run secio_crypto_encrypt_cipher -- -max_total_time=60
cargo +nightly fuzz run yamux_frame_codec -- -max_total_time=60

build:
RUSTFLAGS='-F warnings' cargo build --all --features molc,ws
RUSTFLAGS='-F warnings' cargo build --all --features flatc
$(Change_Work_Path) && RUSTFLAGS='-F warnings' cargo build --all --features molc,ws
$(Change_Work_Path) && RUSTFLAGS='-F warnings' cargo build --all --features flatc

examples:
cargo build --examples --all --features molc
cargo build --examples --all --features flatc
$(Change_Work_Path) && cargo build --examples --all --features molc
$(Change_Work_Path) && cargo build --examples --all --features flatc

bench_p2p:
cd bench && cargo run --release --features molc
Expand Down
2 changes: 1 addition & 1 deletion bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ name = "main"
path = "./src/main.rs"

[dependencies]
p2p = { path = "..", package = "tentacle" }
p2p = { path = "../tentacle", package = "tentacle" }
rand = "0.6.1"
futures = { version = "0.3.0" }
tokio = { version = "0.2.0", features = ["time", "io-util", "tcp", "dns", "rt-threaded", "blocking"] }
Expand Down
75 changes: 75 additions & 0 deletions tentacle/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
[package]
name = "tentacle"
version = "0.3.1"
license = "MIT"
description = "Minimal implementation for a multiplexed p2p network framework."
authors = ["piaoliu <441594700@qq.com>", "Nervos Core Dev <dev@nervos.org>"]
repository = "https://github.com/nervosnetwork/tentacle"
readme = "README.md"
keywords = ["network", "peer-to-peer"]
categories = ["network-programming", "asynchronous"]
edition = "2018"

[package.metadata.docs.rs]
features = [ "molc" ]
all-features = false
no-default-features = true

[dependencies]
yamux = { path = "../yamux", version = "0.2.6", default-features = false, package = "tokio-yamux"}
secio = { path = "../secio", version = "0.4.1", package = "tentacle-secio" }

futures = { version = "0.3.0" }
tokio = { version = "0.2.0" }
tokio-util = { version = "0.3.0", features = ["codec"] }
log = "0.4"
bytes = "0.5.0"
thiserror = "1.0"
socket2 = { version = "0.3.15", features = ["reuseport"] }
tokio-tungstenite = { version = "0.11", optional = true }
futures-timer = { version = "3.0.2", optional = true }
async-std = { version = "1", features = ["unstable"], optional = true }
async-io = { version = "1", optional = true }

flatbuffers = { version = "0.6.0", optional = true }
flatbuffers-verifier = { version = "0.2.0", optional = true }
multiaddr = { path = "../multiaddr", package = "tentacle-multiaddr", version = "0.2.0" }
molecule = { version = "0.6.0", optional = true }

# upnp
igd = "0.9"

[target.'cfg(unix)'.dependencies.libc]
version = "0.2"

[target.'cfg(windows)'.dependencies.winapi]
version = "0.3.7"
features = ["minwindef", "ws2def", "winerror", "heapapi"]

[dev-dependencies]
env_logger = "0.6.0"
crossbeam-channel = "0.3.6"
systemstat = "0.1.3"
futures-test = "0.3.5"

[target.'cfg(unix)'.dev-dependencies]
nix = "0.13.0"

[features]
default = ["tokio-runtime", "tokio-timer"]
# use flatbuffer to handshake
flatc = [ "flatbuffers", "flatbuffers-verifier", "secio/flatc" ]
# use molecule to handshake
molc = [ "molecule", "secio/molc" ]
ws = ["tokio-tungstenite"]

# Related to runtime

tokio-timer = ["yamux/tokio-timer", "tokio/time", "tokio-runtime"]
tokio-runtime = ["tokio/io-util", "tokio/tcp", "tokio/dns", "tokio/rt-threaded", "tokio/blocking"]

async-timer = ["async-runtime"]
async-runtime = ["async-std", "async-io", "yamux/generic-timer", "tokio-util/compat"]

generic-timer = ["futures-timer", "yamux/generic-timer"]
wasm-timer = ["futures-timer", "yamux/wasm", "futures-timer/wasm-bindgen"]
75 changes: 75 additions & 0 deletions tentacle/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Tentacle

[![Build Status](https://api.travis-ci.org/nervosnetwork/tentacle.svg?branch=master)](https://travis-ci.org/nervosnetwork/tentacle)
![image](https://img.shields.io/badge/rustc-1.46-blue.svg)

## Overview

This is a minimal implementation for a multiplexed p2p network based on `yamux` that supports mounting custom protocols.

## Architecture

1. Data stream transmission

```rust
+----+ +----------------+ +-----------+ +-------------+ +----------+ +------+
|user| <--> | custom streams | <--> |Yamux frame| <--> |Secure stream| <--> |TCP stream| <--> |remote|
+----+ +----------------+ +-----------+ +-------------+ +----------+ +------+
```

2. Code implementation

All data is passed through the futures channel, `yamux` splits the actual tcp/websocket stream into multiple substreams,
and the service layer wraps the yamux substream into a protocol stream.

Detailed introduction: [中文](./docs/introduction_zh.md)/[English](./docs/introduction_en.md)

> Note: It is not compatible with `libp2p`.
## Status

The API of this project is basically usable. However we still need more tests. PR is welcome.

The codes in the `protocols/` directory are no longer maintained and only used as reference

Feature `flatc` is not recommended and will be removed in the next version.

## Usage

### From cargo

```toml
[dependencies]
tentacle = { version = "0.3", features = ["molc"] }
```

### Example

1. Clone

```bash
$ git clone https://github.com/nervosnetwork/tentacle.git
```

2. On one terminal:

Listen on 127.0.0.1:1337
```bash
$ RUST_LOG=simple=info,tentacle=debug cargo run --example simple --features molc -- server
```

3. On another terminal:

```bash
$ RUST_LOG=simple=info,tentacle=debug cargo run --example simple --features molc
```

4. Now you can see some data interaction information on the terminal.

You can see more detailed example in these two repos: [ckb](https://github.com/nervosnetwork/ckb)/[cita](https://github.com/cryptape/cita).

## Why?

Because when I use `rust-libp2p`, I have encountered some difficult problems,
and it is difficult to locate whether it is my problem or the library itself,
it is better to implement one myself.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
1 change: 1 addition & 0 deletions src/lib.rs → tentacle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub(crate) mod transports;
pub mod utils;

mod channel;
mod runtime;

pub(crate) mod upnp;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ where
F: FnOnce() -> R,
{
if flag {
tokio::task::block_in_place(f)
crate::runtime::block_in_place(f)
} else {
f()
}
Expand Down Expand Up @@ -237,13 +237,13 @@ where
// NOTE: A Interval/Delay will block tokio runtime from gracefully shutdown.
// So we spawn it in FutureTaskManager
let task = async move {
tokio::time::delay_until(tokio::time::Instant::now() + interval).await;
crate::runtime::delay_for(interval).await;
if sender.send(token).await.is_err() {
trace!("service notify token {} send err", token)
}
};
let mut future_task_sender = self.future_task_sender.clone();
tokio::spawn(async move {
crate::runtime::spawn(async move {
if future_task_sender.send(Box::pin(task)).await.is_err() {
trace!("service notify task send err")
}
Expand All @@ -268,7 +268,7 @@ impl<T> Drop for ServiceProtocolStream<T> {
},
};
let mut panic_sender = self.panic_report.clone();
tokio::spawn(async move {
crate::runtime::spawn(async move {
if panic_sender.send(event).await.is_err() {
trace!("service panic message send err")
}
Expand Down Expand Up @@ -493,13 +493,13 @@ where
// NOTE: A Interval/Delay will block tokio runtime from gracefully shutdown.
// So we spawn it in FutureTaskManager
let task = async move {
tokio::time::delay_until(tokio::time::Instant::now() + interval).await;
crate::runtime::delay_for(interval).await;
if sender.send(token).await.is_err() {
trace!("session notify token {} send err", token)
}
};
let mut future_task_sender = self.future_task_sender.clone();
tokio::spawn(async move {
crate::runtime::spawn(async move {
if future_task_sender.send(Box::pin(task)).await.is_err() {
trace!("session notify task send err")
}
Expand All @@ -522,7 +522,7 @@ impl<T> Drop for SessionProtocolStream<T> {
proto_id: self.handle_context.proto_id,
};
let mut panic_sender = self.panic_report.clone();
tokio::spawn(async move {
crate::runtime::spawn(async move {
if panic_sender.send(event).await.is_err() {
trace!("session panic message send err")
}
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit 88b3e80

Please sign in to comment.