Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Replace tokio_core with tokio.
Browse files Browse the repository at this point in the history
* Remove `tokio-core` and replace with `tokio` in
    - `ethcore/stratum`
    - `secret_store`
    - `util/fetch`
    - `util/reactor`

* Bump hyper to 0.12 in
    - `miner`
    - `util/fake-fetch`
    - `util/fetch`

* Bump `jsonrpc-***` to 0.9 in
    - `parity`
    - `ethcore/stratum`
    - `ipfs`
    - `rpc`
    - `rpc_client`
    - `whisper`

* Bump `ring` to 0.13
  • Loading branch information
c0gent committed Sep 27, 2018
1 parent a8f6f5b commit bd7d453
Show file tree
Hide file tree
Showing 49 changed files with 1,287 additions and 972 deletions.
934 changes: 522 additions & 412 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 2 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ futures = "0.1"
futures-cpupool = "0.1"
fdlimit = "0.1"
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }
jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" }
jsonrpc-core = { git = "https://github.com/c0gent/jsonrpc.git", branch = "c0gent-hyper" }
ethcore = { path = "ethcore", features = ["parity"] }
parity-bytes = "0.1"
ethcore-io = { path = "util/io" }
Expand Down Expand Up @@ -137,7 +137,4 @@ members = [
"util/keccak-hasher",
"util/patricia-trie-ethereum",
"util/fastmap",
]

[patch.crates-io]
ring = { git = "https://github.com/paritytech/ring" }
]
2 changes: 1 addition & 1 deletion ethcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ hashdb = "0.2.1"
memorydb = "0.2.1"
patricia-trie = "0.2"
patricia-trie-ethereum = { path = "../util/patricia-trie-ethereum" }
parity-crypto = "0.1"
parity-crypto = "0.2"
error-chain = { version = "0.12", default-features = false }
ethcore-io = { path = "../util/io" }
ethcore-logger = { path = "../logger" }
Expand Down
2 changes: 1 addition & 1 deletion ethcore/private-tx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ ethabi-derive = "6.0"
ethabi-contract = "6.0"
ethcore = { path = ".." }
parity-bytes = "0.1"
parity-crypto = "0.1"
parity-crypto = "0.2"
ethcore-io = { path = "../../util/io" }
ethcore-logger = { path = "../../logger" }
ethcore-miner = { path = "../../miner" }
Expand Down
4 changes: 2 additions & 2 deletions ethcore/private-tx/src/encryptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ impl SecretStoreEncryptor {

// send HTTP request
let method = if use_post {
Method::Post
Method::POST
} else {
Method::Get
Method::GET
};

let url = Url::from_str(&url).map_err(|e| ErrorKind::Encrypt(e.to_string()))?;
Expand Down
2 changes: 1 addition & 1 deletion ethcore/res/ethereum/tests
Submodule tests updated 19584 files
8 changes: 4 additions & 4 deletions ethcore/stratum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
ethereum-types = "0.4"
keccak-hash = "0.1"
jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" }
jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" }
jsonrpc-tcp-server = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" }
jsonrpc-core = { git = "https://github.com/c0gent/jsonrpc.git", branch = "c0gent-hyper" }
jsonrpc-macros = { git = "https://github.com/c0gent/jsonrpc.git", branch = "c0gent-hyper" }
jsonrpc-tcp-server = { git = "https://github.com/c0gent/jsonrpc.git", branch = "c0gent-hyper" }
log = "0.4"
parking_lot = "0.6"

[dev-dependencies]
env_logger = "0.5"
tokio-core = "0.1"
tokio = "0.1"
tokio-io = "0.1"
ethcore-logger = { path = "../../logger" }
71 changes: 36 additions & 35 deletions ethcore/stratum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ extern crate parking_lot;

#[macro_use] extern crate log;

#[cfg(test)] extern crate tokio_core;
#[cfg(test)] extern crate tokio;
#[cfg(test)] extern crate tokio_io;
#[cfg(test)] extern crate ethcore_logger;

Expand Down Expand Up @@ -323,12 +323,10 @@ impl MetaExtractor<SocketMetadata> for PeerMetaExtractor {
#[cfg(test)]
mod tests {
use super::*;
use std::net::SocketAddr;
use std::net::{SocketAddr, Shutdown};
use std::sync::Arc;

use tokio_core::reactor::{Core, Timeout};
use tokio_core::net::TcpStream;
use tokio_io::io;
use tokio::{io, runtime::Runtime, timer::timeout::{self, Timeout}, net::TcpStream};
use jsonrpc_core::futures::{Future, future};

use ethcore_logger::init_log;
Expand All @@ -342,23 +340,23 @@ mod tests {
}

fn dummy_request(addr: &SocketAddr, data: &str) -> Vec<u8> {
let mut core = Core::new().expect("Tokio Core should be created with no errors");
let mut buffer = vec![0u8; 2048];
let mut runtime = Runtime::new().expect("Tokio Runtime should be created with no errors");

let mut data_vec = data.as_bytes().to_vec();
data_vec.extend(b"\n");

let stream = TcpStream::connect(addr, &core.handle())
.and_then(|stream| {
io::write_all(stream, &data_vec)
let stream = TcpStream::connect(addr)
.and_then(move |stream| {
io::write_all(stream, data_vec)
})
.and_then(|(stream, _)| {
io::read(stream, &mut buffer)
stream.shutdown(Shutdown::Write).unwrap();
io::read_to_end(stream, Vec::with_capacity(2048))
})
.and_then(|(_, read_buf, len)| {
future::ok(read_buf[0..len].to_vec())
.and_then(|(_stream, read_buf)| {
future::ok(read_buf)
});
let result = core.run(stream).expect("Core should run with no errors");
let result = runtime.block_on(stream).expect("Runtime should run with no errors");

result
}
Expand Down Expand Up @@ -417,7 +415,7 @@ mod tests {
}

#[test]
fn receives_initial_paylaod() {
fn receives_initial_payload() {
let addr = "127.0.0.1:19975".parse().unwrap();
let _stratum = Stratum::start(&addr, DummyManager::new(), None).expect("There should be no error starting stratum");
let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": [], "id": 2}"#;
Expand Down Expand Up @@ -460,40 +458,43 @@ mod tests {
.to_vec();
auth_request.extend(b"\n");

let mut core = Core::new().expect("Tokio Core should be created with no errors");
let timeout1 = Timeout::new(::std::time::Duration::from_millis(100), &core.handle())
.expect("There should be a timeout produced in message test");
let timeout2 = Timeout::new(::std::time::Duration::from_millis(100), &core.handle())
.expect("There should be a timeout produced in message test");
let mut buffer = vec![0u8; 2048];
let mut buffer2 = vec![0u8; 2048];
let stream = TcpStream::connect(&addr, &core.handle())
.and_then(|stream| {
io::write_all(stream, &auth_request)
let auth_response = "{\"jsonrpc\":\"2.0\",\"result\":true,\"id\":1}\n";

let mut runtime = Runtime::new().expect("Tokio Runtime should be created with no errors");
let read_buf0 = vec![0u8; auth_response.len()];
let read_buf1 = Vec::with_capacity(2048);
let stream = TcpStream::connect(&addr)
.and_then(move |stream| {
io::write_all(stream, auth_request)
})
.and_then(|(stream, _)| {
io::read(stream, &mut buffer)
io::read_exact(stream, read_buf0)
})
.and_then(|(stream, _, _)| {
.map_err(|err| panic!("{:?}", err))
.and_then(move |(stream, read_buf0)| {
assert_eq!(String::from_utf8(read_buf0).unwrap(), auth_response);
trace!(target: "stratum", "Received authorization confirmation");
timeout1.join(future::ok(stream))
Timeout::new(future::ok(stream), ::std::time::Duration::from_millis(100))
})
.and_then(|(_, stream)| {
.map_err(|err: timeout::Error<()>| panic!("Timeout: {:?}", err))
.and_then(move |stream| {
trace!(target: "stratum", "Pusing work to peers");
stratum.push_work_all(r#"{ "00040008", "100500" }"#.to_owned())
.expect("Pushing work should produce no errors");
timeout2.join(future::ok(stream))
Timeout::new(future::ok(stream), ::std::time::Duration::from_millis(100))
})
.and_then(|(_, stream)| {
.map_err(|err: timeout::Error<()>| panic!("Timeout: {:?}", err))
.and_then(|stream| {
trace!(target: "stratum", "Ready to read work from server");
io::read(stream, &mut buffer2)
stream.shutdown(Shutdown::Write).unwrap();
io::read_to_end(stream, read_buf1)
})
.and_then(|(_, read_buf, len)| {
.and_then(|(_, read_buf1)| {
trace!(target: "stratum", "Received work from server");
future::ok(read_buf[0..len].to_vec())
future::ok(read_buf1)
});
let response = String::from_utf8(
core.run(stream).expect("Core should run with no errors")
runtime.block_on(stream).expect("Runtime should run with no errors")
).expect("Response should be utf-8");

assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion ethkey/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
byteorder = "1.0"
edit-distance = "2.0"
parity-crypto = "0.1"
parity-crypto = "0.2"
eth-secp256k1 = { git = "https://github.com/paritytech/rust-secp256k1" }
ethereum-types = "0.4"
lazy_static = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion ethstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ tiny-keccak = "1.4"
time = "0.1.34"
itertools = "0.5"
parking_lot = "0.6"
parity-crypto = "0.1"
parity-crypto = "0.2"
ethereum-types = "0.4"
dir = { path = "../util/dir" }
smallvec = "0.6"
Expand Down
8 changes: 4 additions & 4 deletions ipfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ authors = ["Parity Technologies <admin@parity.io>"]
ethcore = { path = "../ethcore" }
parity-bytes = "0.1"
ethereum-types = "0.4"
jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" }
jsonrpc-http-server = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" }
jsonrpc-core = { git = "https://github.com/c0gent/jsonrpc.git", branch = "c0gent-hyper" }
jsonrpc-http-server = { git = "https://github.com/c0gent/jsonrpc.git", branch = "c0gent-hyper" }
rlp = { version = "0.2.4", features = ["ethereum"] }
cid = "0.2"
multihash = "0.7"
cid = "0.3"
multihash = "0.8"
unicase = "2.0"

[dev-dependencies]
Expand Down
12 changes: 12 additions & 0 deletions ipfs/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,15 @@ impl From<ServerError> for String {
}
}
}

impl ::std::fmt::Display for ServerError {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
write!(f, "{:?}", self)
}
}

impl ::std::error::Error for ServerError {
fn description(&self) -> &str { "ServerError" }

fn cause(&self) -> Option<&::std::error::Error> { None }
}
84 changes: 46 additions & 38 deletions ipfs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ use std::net::{SocketAddr, IpAddr};
use core::futures::future::{self, FutureResult};
use core::futures::{self, Future};
use ethcore::client::BlockChainClient;
use http::hyper::header::{self, Vary, ContentType};
use http::hyper::{Method, StatusCode};
use http::hyper::{self, server};
use unicase::Ascii;
use http::hyper::{self, server, Method, StatusCode, Body,
header::{self, HeaderValue},
};

use error::ServerError;
use route::Out;
Expand Down Expand Up @@ -67,18 +66,18 @@ impl IpfsHandler {
client: client,
}
}
pub fn on_request(&self, req: hyper::Request) -> (Option<header::AccessControlAllowOrigin>, Out) {
pub fn on_request(&self, req: hyper::Request<Body>) -> (Option<HeaderValue>, Out) {
match *req.method() {
Method::Get | Method::Post => {},
Method::GET | Method::POST => {},
_ => return (None, Out::Bad("Invalid Request")),
}

if !http::is_host_allowed(&req, &self.allowed_hosts) {
return (None, Out::Bad("Disallowed Host header"));
}

let cors_header = http::cors_header(&req, &self.cors_domains);
if cors_header == http::CorsHeader::Invalid {
let cors_header = http::cors_allow_origin(&req, &self.cors_domains);
if cors_header == http::AllowCors::Invalid {
return (None, Out::Bad("Disallowed Origin header"));
}

Expand All @@ -88,39 +87,39 @@ impl IpfsHandler {
}
}

impl server::Service for IpfsHandler {
type Request = hyper::Request;
type Response = hyper::Response;
impl hyper::service::Service for IpfsHandler {
type ReqBody = Body;
type ResBody = Body;
type Error = hyper::Error;
type Future = FutureResult<hyper::Response, hyper::Error>;
type Future = FutureResult<hyper::Response<Body>, Self::Error>;

fn call(&self, request: Self::Request) -> Self::Future {
fn call(&mut self, request: hyper::Request<Self::ReqBody>) -> Self::Future {
let (cors_header, out) = self.on_request(request);

let mut res = match out {
Out::OctetStream(bytes) => {
hyper::Response::new()
.with_status(StatusCode::Ok)
.with_header(ContentType::octet_stream())
.with_body(bytes)
hyper::Response::builder()
.status(StatusCode::OK)
.header("content-type", HeaderValue::from_static("application/octet-stream"))
.body(bytes.into())
},
Out::NotFound(reason) => {
hyper::Response::new()
.with_status(StatusCode::NotFound)
.with_header(ContentType::plaintext())
.with_body(reason)
hyper::Response::builder()
.status(StatusCode::NOT_FOUND)
.header("content-type", HeaderValue::from_static("text/plain; charset=utf-8"))
.body(reason.into())
},
Out::Bad(reason) => {
hyper::Response::new()
.with_status(StatusCode::BadRequest)
.with_header(ContentType::plaintext())
.with_body(reason)
hyper::Response::builder()
.status(StatusCode::BAD_REQUEST)
.header("content-type", HeaderValue::from_static("text/plain; charset=utf-8"))
.body(reason.into())
}
};
}.expect("Response builder: Parsing 'content-type' header name will not fail; qed");

if let Some(cors_header) = cors_header {
res.headers_mut().set(cors_header);
res.headers_mut().set(Vary::Items(vec![Ascii::new("Origin".into())]));
res.headers_mut().append(header::ACCESS_CONTROL_ALLOW_ORIGIN, cors_header);
res.headers_mut().append(header::VARY, HeaderValue::from_static("origin"));
}

future::ok(res)
Expand Down Expand Up @@ -164,23 +163,32 @@ pub fn start_server(
let hosts: DomainsValidation<_> = hosts.map(move |hosts| include_current_interface(hosts, interface, port)).into();

let (close, shutdown_signal) = futures::sync::oneshot::channel::<()>();
let (tx, rx) = mpsc::sync_channel(1);
let (tx, rx) = mpsc::sync_channel::<Result<(), ServerError>>(1);
let thread = thread::spawn(move || {
let send = |res| tx.send(res).expect("rx end is never dropped; qed");
let server = match server::Http::new().bind(&addr, move || {
Ok(IpfsHandler::new(cors.clone(), hosts.clone(), client.clone()))
}) {
Ok(server) => {
send(Ok(()));
server
},

let server_bldr = match server::Server::try_bind(&addr) {
Ok(s) => s,
Err(err) => {
send(Err(err));
send(Err(ServerError::from(err)));
return;
}
};

let _ = server.run_until(shutdown_signal.map_err(|_| {}));
let new_service = move || {
Ok::<_, ServerError>(
IpfsHandler::new(cors.clone(), hosts.clone(), client.clone())
)
};

let server = server_bldr
.serve(new_service)
.map_err(|_| ())
.select(shutdown_signal.map_err(|_| ()))
.then(|_| Ok(()));

hyper::rt::run(server);
send(Ok(()));
});

// Wait for server to start successfuly.
Expand Down
Loading

0 comments on commit bd7d453

Please sign in to comment.