diff --git a/Cargo.lock b/Cargo.lock index 68cf83e46784..c6f87255557c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -299,6 +299,12 @@ dependencies = [ "gimli", ] +[[package]] +name = "adler" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccc9a9dd069569f212bc4330af9f17c4afb5e8ce185e83dbb14f1349dda18b10" + [[package]] name = "adler32" version = "1.1.0" @@ -645,7 +651,7 @@ dependencies = [ "addr2line", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.3.7", "object", "rustc-demangle", ] @@ -1019,9 +1025,9 @@ checksum = "24508e28c677875c380c20f4d28124fab6f8ed4ef929a1397d7b1a31e92f1005" [[package]] name = "cc" -version = "1.0.55" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1be3409f94d7bdceeb5f5fac551039d9b3f00e25da7a74fc4d33400a0d96368" +checksum = "77c1f1d60091c1b73e2b1f4560ab419204b178e625fa945ded7b660becd2bd46" dependencies = [ "jobserver", ] @@ -1617,9 +1623,9 @@ dependencies = [ [[package]] name = "derive_more" -version = "0.99.8" +version = "0.99.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc655351f820d774679da6cdc23355a93de496867d8203496675162e17b1d671" +checksum = "298998b1cf6b5b2c8a7b023dfd45821825ce3ba8a8af55c921a0e734e4653f76" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.7", @@ -1839,9 +1845,9 @@ checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" [[package]] name = "fastrand" -version = "1.3.1" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70fd7ad0c968fbda4e6d978e61cb13f3f31423c35abf785b25a4de37057adc80" +checksum = "b90eb1dec02087df472ab9f0db65f27edaa654a746830042688bcc2eaf68090f" [[package]] name = "femme" @@ -2060,15 +2066,15 @@ checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d" [[package]] name = "flate2" -version = "1.0.14" +version = "1.0.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cfff41391129e0a856d6d822600b8d71179d46879e310417eb9c762eb178b42" +checksum = "68c90b0fc46cf89d227cc78b40e494ff81287a92dd07631e5af0d06fe3cf885e" dependencies = [ "cfg-if", "crc32fast", "libc", "libz-sys", - "miniz_oxide", + "miniz_oxide 0.4.0", ] [[package]] @@ -2287,6 +2293,7 @@ name = "forest_libp2p" version = "0.1.0" dependencies = [ "async-std", + "async-trait", "bytes 0.5.5", "clock", "fnv", @@ -3163,9 +3170,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.40" +version = "0.3.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce10c23ad2ea25ceca0093bd3192229da4c5b3c0f2de499c1ecac0d98d452177" +checksum = "c4b9172132a62451e56142bff9afc91c8e4a4500aa5b847da36815b63bfda916" dependencies = [ "wasm-bindgen", ] @@ -3306,8 +3313,7 @@ dependencies = [ [[package]] name = "libp2p" version = "0.20.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db81113df355dea9dddfcb01cd867555298dca29d915f25d1b1a0aad2e29338b" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "bytes 0.5.5", "futures 0.3.5", @@ -3326,6 +3332,7 @@ dependencies = [ "libp2p-ping", "libp2p-plaintext", "libp2p-pnet", + "libp2p-request-response", "libp2p-secio", "libp2p-swarm", "libp2p-tcp", @@ -3344,8 +3351,7 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.19.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a0387b930c3d4c2533dc4893c1e0394185ddcc019846121b1b27491e45a2c08" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "asn1_der", "bs58", @@ -3378,8 +3384,7 @@ dependencies = [ [[package]] name = "libp2p-core-derive" version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f09548626b737ed64080fde595e06ce1117795b8b9fc4d2629fa36561c583171" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "quote 1.0.7", "syn 1.0.33", @@ -3388,8 +3393,7 @@ dependencies = [ [[package]] name = "libp2p-deflate" version = "0.19.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "467e790a51e801405cd502327b0ce80a8b71d91e8831901b0f3c861960f60f6e" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "flate2", "futures 0.3.5", @@ -3399,8 +3403,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cc186d9a941fd0207cf8f08ef225a735e2d7296258f570155e525f6ee732f87" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "futures 0.3.5", "libp2p-core", @@ -3410,8 +3413,7 @@ dependencies = [ [[package]] name = "libp2p-floodsub" version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7430648e8fc45590c88a8a546a5be81dbca63e9119c32eaff023a076cf9267ca" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "cuckoofilter", "fnv", @@ -3427,8 +3429,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" version = "0.19.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e90d2dedb7603174edc0145a7853b71e3c078dd7cd5383936ab81d369e7cb2cc" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "base64 0.11.0", "byteorder 1.3.4", @@ -3452,8 +3453,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.19.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62f76075b170d908bae616f550ade410d9d27c013fa69042551dbfc757c7c094" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "futures 0.3.5", "libp2p-core", @@ -3468,8 +3468,7 @@ dependencies = [ [[package]] name = "libp2p-kad" version = "0.20.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7c819a5425b2eb3416d67e9c868c5c1e922b6658655e06b9eeafaa41304b876" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "arrayvec", "bytes 0.5.5", @@ -3495,8 +3494,7 @@ dependencies = [ [[package]] name = "libp2p-mdns" version = "0.19.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f55b2d4b80986e5bf158270ab23268ec0e7f644ece5436fbaabc5155472f357" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "async-std", "data-encoding", @@ -3517,8 +3515,7 @@ dependencies = [ [[package]] name = "libp2p-mplex" version = "0.19.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be7d913a4cd57de2013257ec73f07d77bfce390b370023e2d59083e5ca079864" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "bytes 0.5.5", "fnv", @@ -3533,8 +3530,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a03db664653369f46ee03fcec483a378c20195089bb43a26cb9fb0058009ac88" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "curve25519-dalek", "futures 0.3.5", @@ -3554,8 +3550,7 @@ dependencies = [ [[package]] name = "libp2p-ping" version = "0.19.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8dedd34e35a9728d52d59ef36a218e411359a353f9011b2574b86ee790978f6" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "futures 0.3.5", "libp2p-core", @@ -3569,8 +3564,7 @@ dependencies = [ [[package]] name = "libp2p-plaintext" version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1743dfb7817febee1545a21b327846ddae4162ac5dc1977ef1c823d4e9e89d44" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "bytes 0.5.5", "futures 0.3.5", @@ -3587,8 +3581,7 @@ dependencies = [ [[package]] name = "libp2p-pnet" version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37d0db10e139d22d7af0b23ed7949449ec86262798aa0fd01595abdbcb02dc87" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "futures 0.3.5", "log", @@ -3598,11 +3591,23 @@ dependencies = [ "sha3", ] +[[package]] +name = "libp2p-request-response" +version = "0.1.0" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" +dependencies = [ + "async-trait", + "futures 0.3.5", + "libp2p-core", + "libp2p-swarm", + "smallvec 1.4.0", + "wasm-timer", +] + [[package]] name = "libp2p-secio" version = "0.19.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c99b3c33e96bb402486d5b4f7cbeab14e66e6a2ed010abbb5bb032a05460bfda" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "aes-ctr", "ctr", @@ -3631,8 +3636,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce53ff4d127cf8b39adf84dbd381ca32d49bd85788cee08e6669da2495993930" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "futures 0.3.5", "libp2p-core", @@ -3646,8 +3650,7 @@ dependencies = [ [[package]] name = "libp2p-tcp" version = "0.19.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9481500c5774c62e8c413e9535b3f33a0e3dbacf2da63b8d3056c686a9df4146" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "async-std", "futures 0.3.5", @@ -3662,8 +3665,7 @@ dependencies = [ [[package]] name = "libp2p-uds" version = "0.19.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5223f28db25a39708fa1ab6ebc427d7d72051bfc1e7723f72ccc2a3cda5ef378" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "async-std", "futures 0.3.5", @@ -3674,8 +3676,7 @@ dependencies = [ [[package]] name = "libp2p-wasm-ext" version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f59fdbb5706f2723ca108c088b1c7a37f735a8c328021f0508007162627e9885" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "futures 0.3.5", "js-sys", @@ -3688,8 +3689,7 @@ dependencies = [ [[package]] name = "libp2p-websocket" version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e4440551bf6519e0a684cd859ea809aec6d798f686e0d6ed03a28c3e76849b8" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "async-tls", "either", @@ -3708,8 +3708,7 @@ dependencies = [ [[package]] name = "libp2p-yamux" version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8da33e7b5f49c75c6a8afb0b8d1e229f5fa48be9f39bd14cdbc21459a02ac6fc" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "futures 0.3.5", "libp2p-core", @@ -3916,6 +3915,15 @@ dependencies = [ "adler32", ] +[[package]] +name = "miniz_oxide" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be0f75932c1f6cfae3c04000e40114adf955636e19040f9c0a2c380702aa1c7f" +dependencies = [ + "adler", +] + [[package]] name = "mio" version = "0.6.22" @@ -4008,8 +4016,7 @@ checksum = "d8883adfde9756c1d30b0f519c9b8c502a94b41ac62f696453c37c7fc0a958ce" [[package]] name = "multistream-select" version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9157e87afbc2ef0d84cc0345423d715f445edde00141c93721c162de35a05e5" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "bytes 0.5.5", "futures 0.3.5", @@ -4385,8 +4392,7 @@ dependencies = [ [[package]] name = "parity-multiaddr" version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc20af3143a62c16e7c9e92ea5c6ae49f7d271d97d4d8fe73afc28f0514a3d0f" +source = "git+https://github.com/ChainSafe/rust-libp2p?rev=3661071120d3c07eb9e56932204efcd12cdffc49#3661071120d3c07eb9e56932204efcd12cdffc49" dependencies = [ "arrayref", "bs58", @@ -4728,36 +4734,36 @@ dependencies = [ [[package]] name = "protobuf" -version = "2.15.0" +version = "2.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e2ccb6b8f7e175f2d2401e7a5988b0630000164d221262c4fe50ae729513202" +checksum = "e4951a8253c06334be9fe320bbcf73f14949fde62a0c8128d697eec1ff0fa8cd" dependencies = [ "bytes 0.5.5", ] [[package]] name = "protobuf-codegen" -version = "2.15.0" +version = "2.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb9b1b69893bbc580e041ce30827c21f035d8bb3bc593eba22c38464682cfc21" +checksum = "5682cd7a093d1d5c3cbcb9cd4c3c6e4f3cab4c2e4834f9a08e7a1086f88a4f6b" dependencies = [ "protobuf", ] [[package]] name = "protoc" -version = "2.15.0" +version = "2.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b87e88808373ed91952975c353028edbcac8b5dddd4a947dca53852b60c64e" +checksum = "3a535d7d3fed4966b44cacbd17538cf10b5b1725ab99d969fe6cdda168f82cba" dependencies = [ "log", ] [[package]] name = "protoc-rust" -version = "2.15.0" +version = "2.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd8b9b832087bd80a4654e431c92765cd176a89ede16da47d134ee56d04108af" +checksum = "1d87291ba46359549b11c2996189f0db26f3285f97c76fe8983aba1a877623e4" dependencies = [ "protobuf", "protobuf-codegen", @@ -5380,9 +5386,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.55" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec2c5d7e739bc07a3e73381a39d61fdb5f671c60c1df26a130690665803d8226" +checksum = "3433e879a558dde8b5e8feb2a04899cf34fdde1fafb894687e52105fc1162ac3" dependencies = [ "itoa", "ryu", @@ -6516,9 +6522,9 @@ checksum = "e83e153d1053cbb5a118eeff7fd5be06ed99153f00dbcd8ae310c5fb2b22edc0" [[package]] name = "unicode-width" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caaa9d531767d1ff2150b9332433f32a24622147e5ebb1f26409d5da67afd479" +checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" [[package]] name = "unicode-xid" @@ -6643,9 +6649,9 @@ checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" [[package]] name = "wasm-bindgen" -version = "0.2.63" +version = "0.2.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c2dc4aa152834bc334f506c1a06b866416a8b6697d5c9f75b9a689c8486def0" +checksum = "6a634620115e4a229108b71bde263bb4220c483b3f07f5ba514ee8d15064c4c2" dependencies = [ "cfg-if", "serde", @@ -6655,9 +6661,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.63" +version = "0.2.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ded84f06e0ed21499f6184df0e0cb3494727b0c5da89534e0fcc55c51d812101" +checksum = "3e53963b583d18a5aa3aaae4b4c1cb535218246131ba22a71f05b518098571df" dependencies = [ "bumpalo", "lazy_static", @@ -6670,9 +6676,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.13" +version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64487204d863f109eb77e8462189d111f27cb5712cc9fdb3461297a76963a2f6" +checksum = "dba48d66049d2a6cc8488702e7259ab7afc9043ad0dc5448444f46f2a453b362" dependencies = [ "cfg-if", "js-sys", @@ -6682,9 +6688,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.63" +version = "0.2.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "838e423688dac18d73e31edce74ddfac468e37b1506ad163ffaf0a46f703ffe3" +checksum = "3fcfd5ef6eec85623b4c6e844293d4516470d8f19cd72d0d12246017eb9060b8" dependencies = [ "quote 1.0.7", "wasm-bindgen-macro-support", @@ -6692,9 +6698,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.63" +version = "0.2.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3156052d8ec77142051a533cdd686cba889537b213f948cd1d20869926e68e92" +checksum = "9adff9ee0e94b926ca81b57f57f86d5545cdcb1d259e21ec9bdd95b901754c75" dependencies = [ "proc-macro2 1.0.18", "quote 1.0.7", @@ -6705,9 +6711,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.63" +version = "0.2.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9ba19973a58daf4db6f352eda73dc0e289493cd29fb2632eb172085b6521acd" +checksum = "7f7b90ea6c632dd06fd765d44542e234d5e63d9bb917ecd64d79778a13bd79ae" [[package]] name = "wasm-timer" @@ -6727,9 +6733,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.40" +version = "0.3.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b72fe77fd39e4bd3eaa4412fd299a0be6b3dfe9d2597e2f1c20beb968f41d17" +checksum = "863539788676619aac1a23e2df3655e96b32b0e05eb72ca34ba045ad573c625d" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/blockchain/chain_sync/Cargo.toml b/blockchain/chain_sync/Cargo.toml index c89b5d5c0d75..a77f472f1816 100644 --- a/blockchain/chain_sync/Cargo.toml +++ b/blockchain/chain_sync/Cargo.toml @@ -12,7 +12,7 @@ blocks = { package = "forest_blocks", path = "../blocks" } beacon = { path = "../beacon" } db = { path = "../../node/db" } encoding = { package = "forest_encoding", path = "../../encoding" } -libp2p = "0.20" +libp2p = { git = "https://github.com/ChainSafe/rust-libp2p", rev = "3661071120d3c07eb9e56932204efcd12cdffc49" } cid = { package = "forest_cid", path = "../../ipld/cid" } ipld_blockstore = { path = "../../ipld/blockstore" } chain = { path = "../chain" } diff --git a/blockchain/chain_sync/src/network_context.rs b/blockchain/chain_sync/src/network_context.rs index 9be8cc472a5c..1375acaa16b2 100644 --- a/blockchain/chain_sync/src/network_context.rs +++ b/blockchain/chain_sync/src/network_context.rs @@ -8,8 +8,8 @@ use async_std::sync::{Receiver, Sender}; use blocks::{FullTipset, Tipset, TipsetKeys}; use forest_libp2p::{ blocksync::{BlockSyncRequest, BlockSyncResponse, BLOCKS, MESSAGES}, - hello::HelloMessage, - rpc::{RPCEvent, RPCRequest, RPCResponse, RequestId}, + hello::HelloRequest, + rpc::{RPCRequest, RPCResponse, RequestId}, NetworkEvent, NetworkMessage, }; use libp2p::core::PeerId; @@ -44,7 +44,7 @@ impl SyncNetworkContext { network_send, rpc_receiver, receiver, - request_id: 1, + request_id: RequestId(1), } } @@ -111,22 +111,33 @@ impl SyncNetworkContext { } /// Send a hello request to the network (does not await response) - pub async fn hello_request(&self, peer_id: PeerId, request: HelloMessage) { + pub async fn hello_request(&mut self, peer_id: PeerId, request: HelloRequest) { trace!("Sending Hello Message {:?}", request); // TODO update to await response when we want to handle the latency - self.send_rpc_event(peer_id, RPCEvent::Request(0, RPCRequest::Hello(request))) + self.network_send + .send(NetworkMessage::RPC { + peer_id, + request: RPCRequest::Hello(request), + id: self.request_id, + }) .await; + self.request_id.0 += 1; } /// Send any RPC request to the network and await the response pub async fn send_rpc_request( &mut self, peer_id: PeerId, - rpc_request: RPCRequest, + request: RPCRequest, ) -> Result { let request_id = self.request_id; - self.request_id += 1; - self.send_rpc_event(peer_id, RPCEvent::Request(request_id, rpc_request)) + self.request_id.0 += 1; + self.network_send + .send(NetworkMessage::RPC { + peer_id, + request, + id: request_id, + }) .await; loop { match future::timeout(Duration::from_secs(RPC_TIMEOUT), self.rpc_receiver.next()).await @@ -142,11 +153,4 @@ impl SyncNetworkContext { } } } - - /// Handles sending the base event to the network service - async fn send_rpc_event(&self, peer_id: PeerId, event: RPCEvent) { - self.network_send - .send(NetworkMessage::RPC { peer_id, event }) - .await - } } diff --git a/blockchain/chain_sync/src/network_handler.rs b/blockchain/chain_sync/src/network_handler.rs index b02b6b0b0141..d3f273d8bffc 100644 --- a/blockchain/chain_sync/src/network_handler.rs +++ b/blockchain/chain_sync/src/network_handler.rs @@ -42,15 +42,20 @@ impl NetworkHandler { loop { match receiver.next().await { // Handle specifically RPC responses and send to that channel - Some(NetworkEvent::RPCResponse { req_id, response }) => { - rpc_send.send((req_id, response)).await + Some(NetworkEvent::BlockSyncResponse { + request_id, + response, + }) => { + rpc_send + .send((request_id, RPCResponse::BlockSync(response))) + .await } // Pass any non RPC responses through event channel Some(event) => { // Update peer on this thread before sending hello - if let NetworkEvent::Hello { source, .. } = &event { + if let NetworkEvent::HelloRequest { channel, .. } = &event { // TODO should probably add peer with their tipset/ not handled seperately - peer_manager.add_peer(source.clone(), None).await; + peer_manager.add_peer(channel.peer.clone(), None).await; } // TODO revisit, doing this to avoid blocking this thread but can handle better diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index fa5bf4c19e84..3ca8c2cb65ae 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -24,7 +24,7 @@ use encoding::{Cbor, Error as EncodingError}; use fil_types::SectorInfo; use filecoin_proofs_api::{post::verify_winning_post, ProverId, PublicReplicaInfo, SectorId}; use forest_libp2p::{ - hello::HelloMessage, BlockSyncRequest, NetworkEvent, NetworkMessage, MESSAGES, + hello::HelloRequest, BlockSyncRequest, NetworkEvent, NetworkMessage, MESSAGES, }; use futures::{ executor::block_on, @@ -150,21 +150,22 @@ where while let Some(event) = self.network.receiver.next().await { match event { - NetworkEvent::Hello { source, message } => { + NetworkEvent::HelloRequest { request, channel } => { + let source = channel.peer.clone(); info!( "Message inbound, heaviest tipset cid: {:?}", - message.heaviest_tip_set + request.heaviest_tip_set ); match self .fetch_tipset( source.clone(), - &TipsetKeys::new(message.heaviest_tip_set.clone()), + &TipsetKeys::new(request.heaviest_tip_set.clone()), ) .await { Ok(fts) => { - if self.inform_new_head(source.clone(), &fts).await.is_err() { - warn!("Failed to sync with provided tipset",); + if let Err(e) = self.inform_new_head(source.clone(), &fts).await { + warn!("Failed to sync with provided tipset: {}", e); }; } Err(e) => { @@ -177,7 +178,7 @@ where self.network .hello_request( peer_id, - HelloMessage { + HelloRequest { heaviest_tip_set: heaviest.cids().to_vec(), heaviest_tipset_height: heaviest.epoch(), heaviest_tipset_weight: heaviest.weight().clone(), @@ -1023,7 +1024,7 @@ mod tests { use beacon::MockBeacon; use blocks::BlockHeader; use db::MemoryDB; - use forest_libp2p::NetworkEvent; + use forest_libp2p::{rpc::RequestId, NetworkEvent}; use std::sync::Arc; use test_utils::{construct_blocksync_response, construct_messages, construct_tipset}; @@ -1059,9 +1060,9 @@ mod tests { task::block_on(async { event_sender - .send(NetworkEvent::RPCResponse { + .send(NetworkEvent::BlockSyncResponse { // TODO update this, only matching first index of requestId - req_id: 1, + request_id: RequestId(1), response: rpc_response, }) .await; diff --git a/blockchain/chain_sync/src/sync/peer_test.rs b/blockchain/chain_sync/src/sync/peer_test.rs index 7e2d3fb06db0..71f942f592fd 100644 --- a/blockchain/chain_sync/src/sync/peer_test.rs +++ b/blockchain/chain_sync/src/sync/peer_test.rs @@ -7,7 +7,8 @@ use async_std::task; use beacon::MockBeacon; use blocks::BlockHeader; use db::MemoryDB; -use forest_libp2p::hello::HelloMessage; +use forest_libp2p::{hello::HelloRequest, rpc::ResponseChannel}; +use futures::channel::oneshot; use libp2p::core::PeerId; use std::time::Duration; @@ -48,12 +49,16 @@ fn peer_manager_update() { let source = PeerId::random(); let source_clone = source.clone(); + let (sender, _) = oneshot::channel(); task::block_on(async { event_sender - .send(NetworkEvent::Hello { - message: HelloMessage::default(), - source, + .send(NetworkEvent::HelloRequest { + request: HelloRequest::default(), + channel: ResponseChannel { + peer: source, + sender, + }, }) .await; diff --git a/forest/Cargo.toml b/forest/Cargo.toml index 774adc6c52db..3b4d9b78589e 100644 --- a/forest/Cargo.toml +++ b/forest/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" forest_libp2p = { path = "../node/forest_libp2p" } utils = { path = "../node/utils" } db = { path = "../node/db", features = ["rocksdb"] } -libp2p = "0.20" +libp2p = { git = "https://github.com/ChainSafe/rust-libp2p", rev = "3661071120d3c07eb9e56932204efcd12cdffc49" } futures = "0.3.5" log = "0.4.8" async-log = "2.0.0" diff --git a/ipld/graphsync/Cargo.toml b/ipld/graphsync/Cargo.toml index 54a41c998299..9457cbe785a9 100644 --- a/ipld/graphsync/Cargo.toml +++ b/ipld/graphsync/Cargo.toml @@ -10,7 +10,7 @@ cid = { package = "forest_cid", path = "../cid", version = "0.1" } forest_ipld = { path = "../" } fnv = "1.0.6" forest_encoding = { path = "../../encoding", version = "0.1" } -libp2p = "0.20" +libp2p = { git = "https://github.com/ChainSafe/rust-libp2p", rev = "3661071120d3c07eb9e56932204efcd12cdffc49" } futures = "0.3.5" futures-util = "0.3.5" futures_codec = "0.4.0" diff --git a/ipld/graphsync/src/message/proto/message.rs b/ipld/graphsync/src/message/proto/message.rs index 322e85af6d4e..6fb917cb8d7b 100644 --- a/ipld/graphsync/src/message/proto/message.rs +++ b/ipld/graphsync/src/message/proto/message.rs @@ -1,4 +1,4 @@ -// This file is generated by rust-protobuf 2.15.0. Do not edit +// This file is generated by rust-protobuf 2.15.1. Do not edit // @generated // https://github.com/rust-lang/rust-clippy/issues/702 @@ -25,7 +25,7 @@ use protobuf::ProtobufEnum as ProtobufEnum_imported_for_functions; /// Generated files are compatible only with the same version /// of protobuf runtime. -// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_15_0; +// const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_15_1; #[derive(PartialEq,Clone,Default)] pub struct Message { @@ -254,7 +254,7 @@ impl ::protobuf::Message for Message { fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { self as &mut dyn (::std::any::Any) } - fn into_any(self: Box) -> ::std::boxed::Box { + fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { self } @@ -618,7 +618,7 @@ impl ::protobuf::Message for Message_Request { fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { self as &mut dyn (::std::any::Any) } - fn into_any(self: Box) -> ::std::boxed::Box { + fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { self } @@ -870,7 +870,7 @@ impl ::protobuf::Message for Message_Response { fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { self as &mut dyn (::std::any::Any) } - fn into_any(self: Box) -> ::std::boxed::Box { + fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { self } @@ -1081,7 +1081,7 @@ impl ::protobuf::Message for Message_Block { fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { self as &mut dyn (::std::any::Any) } - fn into_any(self: Box) -> ::std::boxed::Box { + fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { self } diff --git a/node/forest_libp2p/Cargo.toml b/node/forest_libp2p/Cargo.toml index 4a3f8bb42674..ac9ab0bedaed 100644 --- a/node/forest_libp2p/Cargo.toml +++ b/node/forest_libp2p/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] utils = { path = "../utils" } -libp2p = "0.20" +libp2p = { git = "https://github.com/ChainSafe/rust-libp2p", rev = "3661071120d3c07eb9e56932204efcd12cdffc49" } futures = "0.3.5" futures-util = "0.3.5" futures_codec = "0.4.0" @@ -22,6 +22,7 @@ fnv = "1.0.6" smallvec = "1.1.0" clock = { path = "../clock" } num-bigint = { path = "../../utils/bigint", package = "forest_bigint" } +async-trait = "0.1" [dev-dependencies] forest_address = { path = "../../vm/address" } diff --git a/node/forest_libp2p/src/behaviour.rs b/node/forest_libp2p/src/behaviour.rs index acf9d6a91ed3..de0f5fbe550d 100644 --- a/node/forest_libp2p/src/behaviour.rs +++ b/node/forest_libp2p/src/behaviour.rs @@ -1,8 +1,12 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use super::rpc::{RPCEvent, RPCMessage, RPC}; +use crate::blocksync::{ + BlockSyncCodec, BlockSyncProtocolName, BlockSyncRequest, BlockSyncResponse, +}; use crate::config::Libp2pConfig; +use crate::hello::{HelloCodec, HelloProtocolName, HelloRequest, HelloResponse}; +use crate::rpc::RPCRequest; use libp2p::core::identity::Keypair; use libp2p::core::PeerId; use libp2p::gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, Topic, TopicHash}; @@ -15,6 +19,10 @@ use libp2p::ping::{ handler::{PingFailure, PingSuccess}, Ping, PingEvent, }; +use libp2p::request_response::{ + ProtocolSupport, RequestId, RequestResponse, RequestResponseEvent, RequestResponseMessage, + ResponseChannel, +}; use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}; use libp2p::NetworkBehaviour; use log::{debug, trace, warn}; @@ -29,7 +37,8 @@ pub struct ForestBehaviour { mdns: Mdns, ping: Ping, identify: Identify, - rpc: RPC, + hello: RequestResponse, + blocksync: RequestResponse, kademlia: Kademlia, #[behaviour(ignore)] events: Vec, @@ -46,7 +55,26 @@ pub enum ForestBehaviourEvent { topics: Vec, message: Vec, }, - RPC(PeerId, RPCEvent), + HelloRequest { + peer: PeerId, + request: HelloRequest, + channel: ResponseChannel, + }, + HelloResponse { + peer: PeerId, + request_id: RequestId, + response: HelloResponse, + }, + BlockSyncRequest { + peer: PeerId, + request: BlockSyncRequest, + channel: ResponseChannel, + }, + BlockSyncResponse { + peer: PeerId, + request_id: RequestId, + response: BlockSyncResponse, + }, } impl NetworkBehaviourEventProcess for ForestBehaviour { @@ -72,8 +100,8 @@ impl NetworkBehaviourEventProcess for ForestBehaviour { impl NetworkBehaviourEventProcess for ForestBehaviour { fn inject_event(&mut self, event: KademliaEvent) { match event { - KademliaEvent::Discovered { peer_id, .. } => { - self.add_peer(peer_id); + KademliaEvent::RoutingUpdated { peer, .. } => { + self.add_peer(peer); } event => { trace!("kad: {:?}", event); @@ -137,31 +165,77 @@ impl NetworkBehaviourEventProcess for ForestBehaviour { } } } -impl NetworkBehaviourEventProcess for ForestBehaviour { - fn inject_event(&mut self, event: RPCMessage) { + +impl NetworkBehaviourEventProcess> + for ForestBehaviour +{ + fn inject_event(&mut self, event: RequestResponseEvent) { match event { - RPCMessage::PeerDialed(peer_id) => { - self.events.push(ForestBehaviourEvent::PeerDialed(peer_id)); - } - RPCMessage::PeerDisconnected(peer_id) => { - self.events - .push(ForestBehaviourEvent::PeerDisconnected(peer_id)); - } - RPCMessage::RPC(peer_id, rpc_event) => match rpc_event { - RPCEvent::Request(req_id, request) => { - self.events.push(ForestBehaviourEvent::RPC( - peer_id, - RPCEvent::Request(req_id, request), - )); + RequestResponseEvent::Message { peer, message } => match message { + RequestResponseMessage::Request { request, channel } => { + self.events.push(ForestBehaviourEvent::HelloRequest { + peer, + request, + channel, + }) } - RPCEvent::Response(req_id, response) => { - self.events.push(ForestBehaviourEvent::RPC( - peer_id, - RPCEvent::Response(req_id, response), - )); + RequestResponseMessage::Response { + request_id, + response, + } => self.events.push(ForestBehaviourEvent::HelloResponse { + peer, + request_id, + response, + }), + }, + RequestResponseEvent::OutboundFailure { + peer, + request_id, + error, + } => warn!( + "Hello outbound failure (peer: {:?}) (id: {:?}): {:?}", + peer, request_id, error + ), + RequestResponseEvent::InboundFailure { peer, error } => { + warn!("Hello inbound error (peer: {:?}): {:?}", peer, error) + } + } + } +} + +impl NetworkBehaviourEventProcess> + for ForestBehaviour +{ + fn inject_event(&mut self, event: RequestResponseEvent) { + match event { + RequestResponseEvent::Message { peer, message } => match message { + RequestResponseMessage::Request { request, channel } => { + self.events.push(ForestBehaviourEvent::BlockSyncRequest { + peer, + request, + channel, + }) } - RPCEvent::Error(req_id, err) => warn!("RPC Error {:?}, {:?}", err, req_id), + RequestResponseMessage::Response { + request_id, + response, + } => self.events.push(ForestBehaviourEvent::BlockSyncResponse { + peer, + request_id, + response, + }), }, + RequestResponseEvent::OutboundFailure { + peer, + request_id, + error, + } => warn!( + "BlockSync outbound error (peer: {:?}) (id: {:?}): {:?}", + peer, request_id, error + ), + RequestResponseEvent::InboundFailure { peer, error } => { + warn!("BlockSync onbound error (peer: {:?}): {:?}", peer, error) + } } } } @@ -202,6 +276,9 @@ impl ForestBehaviour { warn!("Kademlia bootstrap failed: {}", e); } + let hp = std::iter::once((HelloProtocolName, ProtocolSupport::Full)); + let bp = std::iter::once((BlockSyncProtocolName, ProtocolSupport::Full)); + ForestBehaviour { gossipsub: Gossipsub::new(local_peer_id, gossipsub_config), mdns: Mdns::new().expect("Could not start mDNS"), @@ -213,7 +290,8 @@ impl ForestBehaviour { local_key.public(), ), kademlia, - rpc: RPC::default(), + hello: RequestResponse::new(HelloCodec, hp, Default::default()), + blocksync: RequestResponse::new(BlockSyncCodec, bp, Default::default()), events: vec![], peers: Default::default(), } @@ -235,8 +313,13 @@ impl ForestBehaviour { } /// Send an RPC request or response to some peer. - pub fn send_rpc(&mut self, peer_id: PeerId, req: RPCEvent) { - self.rpc.send_rpc(peer_id, req); + pub fn send_rpc_request(&mut self, peer_id: &PeerId, req: RPCRequest, id: RequestId) { + match req { + RPCRequest::Hello(request) => self.hello.send_request_with_id(peer_id, request, id), + RPCRequest::BlockSync(request) => { + self.blocksync.send_request_with_id(peer_id, request, id) + } + } } /// Adds peer to the peer set. diff --git a/node/forest_libp2p/src/blocksync/mod.rs b/node/forest_libp2p/src/blocksync/mod.rs index cc85649736df..0061fb714208 100644 --- a/node/forest_libp2p/src/blocksync/mod.rs +++ b/node/forest_libp2p/src/blocksync/mod.rs @@ -4,5 +4,84 @@ mod message; pub use self::message::*; +use async_trait::async_trait; +use forest_encoding::{from_slice, to_vec}; +use futures::prelude::*; +use libp2p::core::ProtocolName; +use libp2p::request_response::RequestResponseCodec; +use std::io; pub const BLOCKSYNC_PROTOCOL_ID: &[u8] = b"/fil/sync/blk/0.0.1"; + +/// Type to satisfy `ProtocolName` interface for BlockSync RPC. +#[derive(Clone, Debug, PartialEq, Default)] +pub struct BlockSyncProtocolName; + +impl ProtocolName for BlockSyncProtocolName { + fn protocol_name(&self) -> &[u8] { + BLOCKSYNC_PROTOCOL_ID + } +} + +/// BlockSync protocol codec to be used within the RPC service. +#[derive(Debug, Clone, Default)] +pub struct BlockSyncCodec; + +#[async_trait] +impl RequestResponseCodec for BlockSyncCodec { + type Protocol = BlockSyncProtocolName; + type Request = BlockSyncRequest; + type Response = BlockSyncResponse; + + async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut buf = Vec::new(); + io.read_to_end(&mut buf).await?; + Ok(from_slice(&buf).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?) + } + + async fn read_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut buf = Vec::new(); + io.read_to_end(&mut buf).await?; + Ok(from_slice(&buf).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?) + } + + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + io.write_all( + &to_vec(&req).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, + ) + .await + } + + async fn write_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + io.write_all( + &to_vec(&res).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, + ) + .await + } +} diff --git a/node/forest_libp2p/src/hello/message.rs b/node/forest_libp2p/src/hello/message.rs index fb8c6fa36d45..9a006efdaf3f 100644 --- a/node/forest_libp2p/src/hello/message.rs +++ b/node/forest_libp2p/src/hello/message.rs @@ -8,7 +8,7 @@ use num_bigint::BigUint; /// Hello message https://filecoin-project.github.io/specs/#hello-spec #[derive(Clone, Debug, PartialEq, Default, Serialize_tuple, Deserialize_tuple)] -pub struct HelloMessage { +pub struct HelloRequest { pub heaviest_tip_set: Vec, pub heaviest_tipset_height: ChainEpoch, #[serde(with = "num_bigint::biguint_ser")] @@ -33,12 +33,12 @@ mod tests { #[test] fn hello_default_ser() { - let orig_msg = HelloMessage { + let orig_msg = HelloRequest { genesis_hash: Cid::new_from_cbor(&[], Identity), ..Default::default() }; let bz = to_vec(&orig_msg).unwrap(); - let msg: HelloMessage = from_slice(&bz).unwrap(); + let msg: HelloRequest = from_slice(&bz).unwrap(); assert_eq!(msg, orig_msg); } } diff --git a/node/forest_libp2p/src/hello/mod.rs b/node/forest_libp2p/src/hello/mod.rs index 777692682649..eefd3115cd46 100644 --- a/node/forest_libp2p/src/hello/mod.rs +++ b/node/forest_libp2p/src/hello/mod.rs @@ -4,5 +4,84 @@ mod message; pub use self::message::*; +use async_trait::async_trait; +use forest_encoding::{from_slice, to_vec}; +use futures::prelude::*; +use libp2p::core::ProtocolName; +use libp2p::request_response::RequestResponseCodec; +use std::io; pub const HELLO_PROTOCOL_ID: &[u8] = b"/fil/hello/1.0.0"; + +/// Type to satisfy `ProtocolName` interface for BlockSync RPC. +#[derive(Clone, Debug, PartialEq, Default)] +pub struct HelloProtocolName; + +impl ProtocolName for HelloProtocolName { + fn protocol_name(&self) -> &[u8] { + HELLO_PROTOCOL_ID + } +} + +/// Hello protocol codec to be used within the RPC service. +#[derive(Debug, Clone, Default)] +pub struct HelloCodec; + +#[async_trait] +impl RequestResponseCodec for HelloCodec { + type Protocol = HelloProtocolName; + type Request = HelloRequest; + type Response = HelloResponse; + + async fn read_request(&mut self, _: &Self::Protocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut buf = Vec::new(); + io.read_to_end(&mut buf).await?; + Ok(from_slice(&buf).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?) + } + + async fn read_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let mut buf = Vec::new(); + io.read_to_end(&mut buf).await?; + Ok(from_slice(&buf).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?) + } + + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + io.write_all( + &to_vec(&req).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, + ) + .await + } + + async fn write_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + io.write_all( + &to_vec(&res).map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, + ) + .await + } +} diff --git a/node/forest_libp2p/src/rpc/behaviour.rs b/node/forest_libp2p/src/rpc/behaviour.rs deleted file mode 100644 index 2421230ff0c1..000000000000 --- a/node/forest_libp2p/src/rpc/behaviour.rs +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright 2020 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT - -use super::handler::RPCHandler; -use super::RPCEvent; -use futures::task::Context; -use futures_util::task::Poll; -use libp2p::core::connection::ConnectionId; -use libp2p::swarm::{ - protocols_handler::ProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, - PollParameters, -}; -use libp2p::{Multiaddr, PeerId}; - -/// The RPC behaviour that gets consumed by the Swarm. -pub struct RPC { - /// Queue of events to processed. - events: Vec>, -} - -impl RPC { - /// Creates a new RPC behaviour - pub fn new() -> Self { - RPC::default() - } - - /// Send an RPCEvent to a peer specified by peer_id. - pub fn send_rpc(&mut self, peer_id: PeerId, event: RPCEvent) { - self.events.push(NetworkBehaviourAction::NotifyHandler { - peer_id, - event, - handler: NotifyHandler::Any, - }); - } -} - -impl Default for RPC { - fn default() -> Self { - RPC { events: vec![] } - } -} - -/// Messages sent to the user from the RPC protocol. -#[derive(Debug)] -pub enum RPCMessage { - RPC(PeerId, RPCEvent), - PeerDialed(PeerId), - PeerDisconnected(PeerId), -} - -impl NetworkBehaviour for RPC { - type ProtocolsHandler = RPCHandler; - type OutEvent = RPCMessage; - fn new_handler(&mut self) -> Self::ProtocolsHandler { - RPCHandler::default() - } - - fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { - vec![] - } - - fn inject_connected(&mut self, peer_id: &PeerId) { - self.events.push(NetworkBehaviourAction::GenerateEvent( - RPCMessage::PeerDialed(peer_id.clone()), - )); - } - - fn inject_disconnected(&mut self, peer_id: &PeerId) { - self.events.push(NetworkBehaviourAction::GenerateEvent( - RPCMessage::PeerDisconnected(peer_id.clone()), - )); - } - - fn inject_event( - &mut self, - peer_id: PeerId, - _connection: ConnectionId, - event: ::OutEvent, - ) { - self.events - .push(NetworkBehaviourAction::GenerateEvent(RPCMessage::RPC( - peer_id, event, - ))) - } - - fn poll( - &mut self, - _: &mut Context, - _: &mut impl PollParameters, - ) -> Poll< - NetworkBehaviourAction< - ::InEvent, - Self::OutEvent, - >, - > { - if !self.events.is_empty() { - return Poll::Ready(self.events.remove(0)); - } - Poll::Pending - } -} diff --git a/node/forest_libp2p/src/rpc/codec.rs b/node/forest_libp2p/src/rpc/codec.rs deleted file mode 100644 index 5f61e46afae8..000000000000 --- a/node/forest_libp2p/src/rpc/codec.rs +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright 2020 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT - -use super::{RPCError, RPCRequest, RPCResponse}; -use crate::blocksync::BLOCKSYNC_PROTOCOL_ID; -use crate::hello::HELLO_PROTOCOL_ID; -use bytes::BytesMut; -use forest_encoding::{from_slice, to_vec}; -use futures_codec::{Decoder, Encoder}; - -/// Codec used for inbound connections. Decodes the inbound message into a RPCRequest, and encodes the RPCResponse to send. -pub struct InboundCodec { - protocol: &'static [u8], -} - -impl InboundCodec { - pub fn new(protocol: &'static [u8]) -> Self { - Self { protocol } - } -} - -impl Encoder for InboundCodec { - type Error = RPCError; - type Item = RPCResponse; - - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - match item { - RPCResponse::BlockSync(response) => { - let resp = to_vec(&response)?; - dst.clear(); - dst.extend_from_slice(&resp); - Ok(()) - } - RPCResponse::Hello(response) => { - let resp = to_vec(&response)?; - dst.clear(); - dst.extend_from_slice(&resp); - Ok(()) - } - } - } -} - -impl Decoder for InboundCodec { - type Error = RPCError; - type Item = RPCRequest; - - fn decode(&mut self, bz: &mut BytesMut) -> Result, Self::Error> { - if bz.is_empty() { - return Ok(None); - } - - match self.protocol { - HELLO_PROTOCOL_ID => Ok(Some(RPCRequest::Hello( - from_slice(bz).map_err(|err| RPCError::Codec(err.to_string()))?, - ))), - BLOCKSYNC_PROTOCOL_ID => Ok(Some(RPCRequest::BlockSync( - from_slice(bz).map_err(|err| RPCError::Codec(err.to_string()))?, - ))), - _ => Err(RPCError::Codec("Unsupported codec".to_string())), - } - } -} - -/// Codec used for outbound connections. Encodes the outbound message into a RPCRequest to send, and decodes the RPCResponse when received. -pub struct OutboundCodec { - protocol: &'static [u8], -} - -impl OutboundCodec { - pub fn new(protocol: &'static [u8]) -> Self { - Self { protocol } - } -} - -impl Encoder for OutboundCodec { - type Error = RPCError; - type Item = RPCRequest; - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - match item { - RPCRequest::BlockSync(request) => { - let resp = to_vec(&request)?; - dst.clear(); - dst.extend_from_slice(&resp); - Ok(()) - } - RPCRequest::Hello(request) => { - let resp = to_vec(&request)?; - dst.clear(); - dst.extend_from_slice(&resp); - Ok(()) - } - } - } -} - -impl Decoder for OutboundCodec { - type Error = RPCError; - type Item = RPCResponse; - fn decode(&mut self, bz: &mut BytesMut) -> Result, Self::Error> { - if bz.is_empty() { - return Ok(None); - } - match self.protocol { - HELLO_PROTOCOL_ID => Ok(Some(RPCResponse::Hello( - from_slice(bz).map_err(|err| RPCError::Codec(err.to_string()))?, - ))), - BLOCKSYNC_PROTOCOL_ID => Ok(Some(RPCResponse::BlockSync( - from_slice(bz).map_err(|err| RPCError::Codec(err.to_string()))?, - ))), - _ => Err(RPCError::Codec("Unsupported codec".to_string())), - } - } -} diff --git a/node/forest_libp2p/src/rpc/error.rs b/node/forest_libp2p/src/rpc/error.rs deleted file mode 100644 index 60194159412c..000000000000 --- a/node/forest_libp2p/src/rpc/error.rs +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2020 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT - -use forest_encoding::error::Error as EncodingError; -use std::fmt; - -#[derive(Debug, Clone, PartialEq)] -pub enum RPCError { - Codec(String), - Custom(String), -} - -impl From for RPCError { - fn from(err: std::io::Error) -> Self { - Self::Custom(err.to_string()) - } -} - -impl From for RPCError { - fn from(err: EncodingError) -> Self { - Self::Codec(err.to_string()) - } -} - -impl fmt::Display for RPCError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - RPCError::Codec(err) => write!(f, "Codec Error: {}", err), - RPCError::Custom(err) => write!(f, "{}", err), - } - } -} - -impl std::error::Error for RPCError { - fn description(&self) -> &str { - "Libp2p RPC Error" - } -} diff --git a/node/forest_libp2p/src/rpc/handler.rs b/node/forest_libp2p/src/rpc/handler.rs deleted file mode 100644 index af4e9dc75cba..000000000000 --- a/node/forest_libp2p/src/rpc/handler.rs +++ /dev/null @@ -1,405 +0,0 @@ -// Copyright 2020 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT - -use super::protocol::RPCInbound; -use super::{InboundCodec, OutboundCodec, RPCError, RPCEvent, RPCRequest, RPCResponse, RequestId}; -use fnv::FnvHashMap; -use futures::prelude::*; -use futures_codec::Framed; -use libp2p::swarm::{ - KeepAlive, NegotiatedSubstream, ProtocolsHandler, ProtocolsHandlerEvent, - ProtocolsHandlerUpgrErr, SubstreamProtocol, -}; -use libp2p::{InboundUpgrade, OutboundUpgrade}; -use log::debug; -use smallvec::SmallVec; -use std::{ - pin::Pin, - task::{Context, Poll}, - time::{Duration, Instant}, -}; - -/// The time (in seconds) before a substream that is awaiting a response from the user times out. -pub const RESPONSE_TIMEOUT: u64 = 20; - -pub struct RPCHandler { - /// Upgrade configuration for RPC protocol. - listen_protocol: SubstreamProtocol, - - /// If `Some`, something bad happened and we should shut down the handler with an error. - pending_error: Option>, - - /// Queue of events to produce in `poll()`. - events_out: SmallVec<[RPCEvent; 4]>, - - /// Queue of outbound substreams to open. - dial_queue: SmallVec<[RPCEvent; 4]>, - - /// Current number of concurrent outbound substreams being opened. - dial_negotiated: u32, - - /// Map of current substreams awaiting a response to an RPC request. - inbound_substreams: FnvHashMap, - - /// The vector of outbound substream states to progress. - outbound_substreams: Vec, - - /// Sequential ID for new substreams. - current_substream_id: RequestId, - - /// After the given duration has elapsed, an inactive connection will shutdown. - inactive_timeout: Duration, - - /// Maximum number of concurrent outbound substreams being opened. Value is never modified. - max_dial_negotiated: u32, - - /// Flag determining whether to maintain the connection to the peer. - keep_alive: KeepAlive, -} - -impl RPCHandler { - /// Constructor for new RPC handler - pub fn new(inactive_timeout: Duration) -> Self { - RPCHandler { - listen_protocol: SubstreamProtocol::new(RPCInbound), - pending_error: None, - events_out: SmallVec::new(), - dial_queue: SmallVec::new(), - dial_negotiated: 0, - inbound_substreams: FnvHashMap::default(), - outbound_substreams: Vec::new(), - current_substream_id: 1, - inactive_timeout, - max_dial_negotiated: 8, - keep_alive: KeepAlive::Yes, - } - } - - /// Returns the number of pending requests. - pub fn pending_requests(&self) -> u32 { - self.dial_negotiated + self.dial_queue.len() as u32 - } - - /// Opens an outbound substream with a request. - pub fn send_request(&mut self, event: RPCEvent) { - self.keep_alive = KeepAlive::Yes; - - self.dial_queue.push(event); - } -} - -impl Default for RPCHandler { - fn default() -> Self { - RPCHandler::new(Duration::from_secs(30)) - } -} - -/// State of inbound substreams. -enum InboundSubstreamState { - /// Waiting for message from the remote. - WaitingInput(Framed), - /// An outbound substream is waiting a response from the user. - WaitingResponse { - /// The framed negotiated substream. - substream: Framed, - /// The time when the substream is closed. - timeout: Instant, - }, - /// Substream is being closed. - Closing(Framed), - /// Inserted to ensure no state remains unhandled. - Poisoned, -} - -/// State of the outbound substream, opened either by us or by the remote. -enum SubstreamState { - /// Waiting to send a message to the remote. - PendingSend { - substream: Framed, - response: RPCResponse, - }, - /// Request has been sent, awaiting response - PendingResponse { - substream: Framed, - event: RPCEvent, - timeout: Instant, - }, -} - -impl ProtocolsHandler for RPCHandler { - type InEvent = RPCEvent; - type OutEvent = RPCEvent; - type Error = RPCError; - type InboundProtocol = RPCInbound; - type OutboundProtocol = RPCRequest; - type OutboundOpenInfo = RPCEvent; - - fn listen_protocol(&self) -> SubstreamProtocol { - self.listen_protocol.clone() - } - - fn inject_fully_negotiated_inbound( - &mut self, - substream: >::Output, - ) { - // New inbound request. Store the stream and tag the output. - let awaiting_stream = InboundSubstreamState::WaitingInput(substream); - self.inbound_substreams - .insert(self.current_substream_id, awaiting_stream); - - self.current_substream_id += 1; - } - - fn inject_fully_negotiated_outbound( - &mut self, - substream: >::Output, - event: Self::OutboundOpenInfo, - ) { - // Decrement pending outbound substreams when processing new - self.dial_negotiated -= 1; - - if self.dial_negotiated == 0 - && self.dial_queue.is_empty() - && self.inbound_substreams.is_empty() - { - self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout); - } else { - self.keep_alive = KeepAlive::Yes; - } - - // add the stream to substreams if we expect a response, otherwise drop the stream - if let RPCEvent::Request(id, req) = event { - if req.expect_response() { - let awaiting_stream = SubstreamState::PendingResponse { - substream, - event: RPCEvent::Request(id, req), - timeout: Instant::now() + Duration::from_secs(RESPONSE_TIMEOUT), - }; - - self.outbound_substreams.push(awaiting_stream); - } - } - } - - fn inject_event(&mut self, event: Self::InEvent) { - match event { - RPCEvent::Request(_, _) => self.send_request(event), - RPCEvent::Response(rpc_id, response) => { - // check if the stream matching the response still exists - if let Some(InboundSubstreamState::WaitingResponse { substream, .. }) = - self.inbound_substreams.remove(&rpc_id) - { - // only send one response per stream. This must be in the waiting state - self.outbound_substreams.push(SubstreamState::PendingSend { - substream, - response, - }); - } - } - RPCEvent::Error(_, _) => {} - } - } - - fn inject_dial_upgrade_error( - &mut self, - _: Self::OutboundOpenInfo, - error: ProtocolsHandlerUpgrErr< - >::Error, - >, - ) { - if self.pending_error.is_none() { - self.pending_error = Some(error); - } - } - - fn connection_keep_alive(&self) -> KeepAlive { - self.keep_alive - } - - #[allow(clippy::type_complexity)] - fn poll( - &mut self, - cx: &mut Context, - ) -> Poll< - ProtocolsHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, - >, - > { - if let Some(err) = self.pending_error.take() { - // Log error, shouldn't necessarily return error and drop peer here - debug!("{}", err); - } - - // return any events that need to be reported - if !self.events_out.is_empty() { - return Poll::Ready(ProtocolsHandlerEvent::Custom(self.events_out.remove(0))); - } else { - self.events_out.shrink_to_fit(); - } - - let mut remove_list: Vec = Vec::new(); - for (req_id, state) in self.inbound_substreams.iter_mut() { - loop { - match std::mem::replace(state, InboundSubstreamState::Poisoned) { - InboundSubstreamState::WaitingInput(mut substream) => { - match substream.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(message))) => { - *state = InboundSubstreamState::WaitingResponse { - substream, - timeout: Instant::now() + Duration::from_secs(RESPONSE_TIMEOUT), - }; - return Poll::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Request(*req_id, message), - )); - } - Poll::Ready(Some(Err(e))) => { - debug!("Inbound substream error while awaiting input: {:?}", e); - *state = InboundSubstreamState::Closing(substream); - } - // peer closed the stream - Poll::Ready(None) => { - *state = InboundSubstreamState::Closing(substream); - } - Poll::Pending => { - *state = InboundSubstreamState::WaitingInput(substream); - break; - } - } - } - InboundSubstreamState::Closing(mut substream) => { - match Sink::poll_close(Pin::new(&mut substream), cx) { - Poll::Ready(res) => { - if let Err(e) = res { - // Don't close the connection but just drop the inbound substream. - // In case the remote has more to send, they will open up a new - // substream. - debug!("Inbound substream error while closing: {:?}", e); - } - remove_list.push(*req_id); - break; - } - Poll::Pending => { - *state = InboundSubstreamState::Closing(substream); - break; - } - } - } - InboundSubstreamState::Poisoned => { - panic!("Tried to process a poisoned substream state") - } - st @ InboundSubstreamState::WaitingResponse { .. } => { - *state = st; - break; - } - } - } - } - - // remove expired inbound substreams - self.inbound_substreams - .retain(|req_id, waiting_stream| match waiting_stream { - InboundSubstreamState::WaitingResponse { timeout, .. } => { - Instant::now() <= *timeout - } - _ => !remove_list.contains(&req_id), - }); - - // drive streams that need to be processed - for n in (0..self.outbound_substreams.len()).rev() { - let stream = self.outbound_substreams.swap_remove(n); - match stream { - SubstreamState::PendingSend { - mut substream, - response, - } => match Sink::poll_ready(Pin::new(&mut substream), cx) { - Poll::Ready(Ok(())) => { - // Poll until message is sent - if let Err(e) = Sink::start_send(Pin::new(&mut substream), response) { - return Poll::Ready(ProtocolsHandlerEvent::Close(e)); - } - // Poll until data sent to flush the substream - loop { - match Sink::poll_flush(Pin::new(&mut substream), cx) { - Poll::Ready(Ok(())) => { - break; - } - Poll::Ready(Err(e)) => { - return Poll::Ready(ProtocolsHandlerEvent::Close(e)); - } - _ => (), - } - } - } - Poll::Ready(Err(err)) => { - return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Error( - 0, - RPCError::Custom(err.to_string()), - ))); - } - Poll::Pending => { - self.outbound_substreams.push(SubstreamState::PendingSend { - substream, - response, - }); - } - }, - SubstreamState::PendingResponse { - mut substream, - event, - timeout, - } => { - // TODO fix polling for response (polls partial written bytes in delayed cases) - match substream.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(response))) => { - return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Response( - event.id(), - response, - ))); - } - Poll::Ready(Some(Err(err))) => { - return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Error( - event.id(), - RPCError::Custom(err.to_string()), - ))); - } - Poll::Ready(None) => { - // stream closed early or nothing was sent - return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Error( - event.id(), - RPCError::Custom("Stream closed early. Empty response".to_owned()), - ))); - } - Poll::Pending => { - if Instant::now() < timeout { - self.outbound_substreams - .push(SubstreamState::PendingResponse { - substream, - event, - timeout, - }); - } - } - } - } - } - } - - // establish outbound substreams - if !self.dial_queue.is_empty() && self.dial_negotiated < self.max_dial_negotiated { - self.dial_negotiated += 1; - let event = self.dial_queue.remove(0); - self.dial_queue.shrink_to_fit(); - if let RPCEvent::Request(id, req) = event { - return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(req.clone()), - info: RPCEvent::Request(id, req), - }); - } - } - - Poll::Pending - } -} diff --git a/node/forest_libp2p/src/rpc/mod.rs b/node/forest_libp2p/src/rpc/mod.rs index 114d920f1aff..95f3ecc2217e 100644 --- a/node/forest_libp2p/src/rpc/mod.rs +++ b/node/forest_libp2p/src/rpc/mod.rs @@ -1,51 +1,20 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -mod behaviour; -mod codec; -mod error; -mod handler; -mod protocol; +use crate::blocksync::{BlockSyncRequest, BlockSyncResponse}; +use crate::hello::{HelloRequest, HelloResponse}; +pub use libp2p::request_response::{RequestId, ResponseChannel}; -pub use self::behaviour::{RPCMessage, RPC}; -pub use self::codec::{InboundCodec, OutboundCodec}; -pub use self::error::RPCError; -pub use self::handler::{RPCHandler, RESPONSE_TIMEOUT}; -pub use self::protocol::{RPCRequest, RPCResponse}; - -pub type RequestId = usize; - -/// The return type used in the behaviour and the resultant event from the protocols handler. +/// RPCResponse payloads for request/response calls #[derive(Debug, Clone, PartialEq)] -pub enum RPCEvent { - /// An inbound/outbound request for RPC protocol. The first parameter is a sequential - /// id which tracks an awaiting substream for the response. - Request(RequestId, RPCRequest), - /// A response that is being sent or has been received from the RPC protocol. The first parameter returns - /// that which was sent with the corresponding request, the second is a single chunk of a - /// response. - Response(RequestId, RPCResponse), - /// Error in RPC request - Error(RequestId, RPCError), +pub enum RPCResponse { + BlockSync(BlockSyncResponse), + Hello(HelloResponse), } -impl RPCEvent { - /// Returns the id which is used to track the substream - pub fn id(&self) -> usize { - match *self { - RPCEvent::Request(id, _) => id, - RPCEvent::Response(id, _) => id, - RPCEvent::Error(id, _) => id, - } - } -} - -impl std::fmt::Display for RPCEvent { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - RPCEvent::Request(id, _) => write!(f, "RPC Request(id: {:?})", id), - RPCEvent::Response(id, _) => write!(f, "RPC Response(id: {:?})", id), - RPCEvent::Error(_, err) => write!(f, "RPC Error(error: {:?})", err), - } - } +/// RPCRequest payloads for request/response calls +#[derive(Debug, Clone, PartialEq)] +pub enum RPCRequest { + BlockSync(BlockSyncRequest), + Hello(HelloRequest), } diff --git a/node/forest_libp2p/src/rpc/protocol.rs b/node/forest_libp2p/src/rpc/protocol.rs deleted file mode 100644 index 2cb5696db1a6..000000000000 --- a/node/forest_libp2p/src/rpc/protocol.rs +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright 2020 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT - -use super::{InboundCodec, OutboundCodec, RPCError}; -use crate::blocksync::{BlockSyncRequest, BlockSyncResponse, BLOCKSYNC_PROTOCOL_ID}; -use crate::hello::{HelloMessage, HelloResponse, HELLO_PROTOCOL_ID}; -use bytes::BytesMut; -use futures::prelude::*; -use futures::{AsyncRead, AsyncWrite}; -use futures_codec::{Encoder, Framed}; -use libp2p::core::UpgradeInfo; -use libp2p::{InboundUpgrade, OutboundUpgrade}; -use std::pin::Pin; - -/// RPCResponse payloads for request/response calls -#[derive(Debug, Clone, PartialEq)] -pub enum RPCResponse { - BlockSync(BlockSyncResponse), - Hello(HelloResponse), -} - -/// Protocol upgrade for inbound RPC requests. -#[derive(Debug, Clone)] -pub struct RPCInbound; - -impl UpgradeInfo for RPCInbound { - type Info = &'static [u8]; - type InfoIter = Vec; - - fn protocol_info(&self) -> Self::InfoIter { - vec![BLOCKSYNC_PROTOCOL_ID, HELLO_PROTOCOL_ID] - } -} - -impl InboundUpgrade for RPCInbound -where - TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static, -{ - type Output = Framed; - type Error = RPCError; - #[allow(clippy::type_complexity)] - type Future = Pin> + Send>>; - - fn upgrade_inbound(self, socket: TSocket, protocol: Self::Info) -> Self::Future { - Box::pin(future::ok(Framed::new(socket, InboundCodec::new(protocol)))) - } -} - -/// RPCRequest payloads for request/response calls -#[derive(Debug, Clone, PartialEq)] -pub enum RPCRequest { - BlockSync(BlockSyncRequest), - Hello(HelloMessage), -} - -impl UpgradeInfo for RPCRequest { - type Info = &'static [u8]; - type InfoIter = Vec; - - fn protocol_info(&self) -> Self::InfoIter { - self.supported_protocols() - } -} - -impl RPCRequest { - pub fn supported_protocols(&self) -> Vec<&'static [u8]> { - match self { - RPCRequest::BlockSync(_) => vec![BLOCKSYNC_PROTOCOL_ID], - RPCRequest::Hello(_) => vec![HELLO_PROTOCOL_ID], - } - } - pub fn expect_response(&self) -> bool { - match self { - RPCRequest::BlockSync(_) => true, - RPCRequest::Hello(_) => true, - } - } -} - -impl OutboundUpgrade for RPCRequest -where - TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static, -{ - type Output = Framed; - type Error = RPCError; - #[allow(clippy::type_complexity)] - type Future = Pin> + Send>>; - - fn upgrade_outbound(self, mut socket: TSocket, protocol: Self::Info) -> Self::Future { - Box::pin(async move { - let mut bm = BytesMut::with_capacity(1024); - let mut codec = OutboundCodec::new(protocol); - codec.encode(self, &mut bm)?; - socket.write_all(&bm).await?; - socket.close().await?; - Ok(Framed::new(socket, codec)) - }) - } -} diff --git a/node/forest_libp2p/src/service.rs b/node/forest_libp2p/src/service.rs index f4200cdd1f92..323ddd7b88d5 100644 --- a/node/forest_libp2p/src/service.rs +++ b/node/forest_libp2p/src/service.rs @@ -1,14 +1,15 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use super::blocksync::BlockSyncResponse; -use super::hello::HelloMessage; -use super::rpc::{RPCEvent, RPCRequest, RPCResponse}; +use super::blocksync::{BlockSyncRequest, BlockSyncResponse}; +use super::rpc::RPCRequest; use super::{ForestBehaviour, ForestBehaviourEvent, Libp2pConfig}; +use crate::hello::{HelloRequest, HelloResponse}; use async_std::stream; use async_std::sync::{channel, Receiver, Sender}; use futures::select; use futures_util::stream::StreamExt; +use libp2p::request_response::{RequestId, ResponseChannel}; use libp2p::{ core, core::muxing::StreamMuxerBox, @@ -25,24 +26,28 @@ use utils::read_file_to_vec; const PUBSUB_TOPICS: [&str; 2] = ["/fil/blocks", "/fil/msgs"]; /// Events emitted by this Service -#[derive(Clone, Debug)] +#[derive(Debug)] pub enum NetworkEvent { PubsubMessage { source: PeerId, topics: Vec, message: Vec, }, - RPCRequest { - req_id: usize, - request: RPCRequest, + HelloRequest { + request: HelloRequest, + channel: ResponseChannel, }, - RPCResponse { - req_id: usize, - response: RPCResponse, + HelloResponse { + request_id: RequestId, + response: HelloResponse, }, - Hello { - source: PeerId, - message: HelloMessage, + BlockSyncRequest { + request: BlockSyncRequest, + channel: ResponseChannel, + }, + BlockSyncResponse { + request_id: RequestId, + response: BlockSyncResponse, }, PeerDialed { peer_id: PeerId, @@ -52,8 +57,15 @@ pub enum NetworkEvent { /// Events into this Service #[derive(Clone, Debug)] pub enum NetworkMessage { - PubsubMessage { topic: Topic, message: Vec }, - RPC { peer_id: PeerId, event: RPCEvent }, + PubsubMessage { + topic: Topic, + message: Vec, + }, + RPC { + peer_id: PeerId, + request: RPCRequest, + id: RequestId, + }, } /// The Libp2pService listens to events from the Libp2p swarm. pub struct Libp2pService { @@ -131,43 +143,48 @@ impl Libp2pService { message }).await; } - ForestBehaviourEvent::RPC(peer_id, event) => { - debug!("RPC event {:?}", event); - match event { - RPCEvent::Response(req_id, res) => { - self.network_sender_out.send(NetworkEvent::RPCResponse { - req_id, - response: res, - }).await; - } - RPCEvent::Request(req_id, RPCRequest::BlockSync(r)) => { - // TODO implement handling incoming blocksync requests - swarm_stream.get_mut().send_rpc(peer_id, RPCEvent::Response(1, RPCResponse::BlockSync(BlockSyncResponse { - chain: vec![], - status: 203, - message: "handling requests not implemented".to_owned(), - }))); - } - RPCEvent::Request(req_id, RPCRequest::Hello(message)) => { - self.network_sender_out.send(NetworkEvent::Hello{ - message, source: peer_id}).await; - } - RPCEvent::Error(req_id, err) => info!("Error with request {}: {:?}", req_id, err), - } + ForestBehaviourEvent::HelloRequest { request, channel, .. } => { + debug!("Received hello request: {:?}", request); + self.network_sender_out.send(NetworkEvent::HelloRequest { + request, + channel, + }).await; + } + ForestBehaviourEvent::HelloResponse { request_id, response, .. } => { + debug!("Received hello response (id: {:?}): {:?}", request_id, response); + self.network_sender_out.send(NetworkEvent::HelloResponse { + request_id, + response, + }).await; + } + ForestBehaviourEvent::BlockSyncRequest { channel, .. } => { + // TODO implement blocksync provider + let _ = channel.send(BlockSyncResponse { + chain: vec![], + status: 203, + message: "handling requests not implemented".to_owned(), + }); + } + ForestBehaviourEvent::BlockSyncResponse { request_id, response, .. } => { + debug!("Received blocksync response (id: {:?}): {:?}", request_id, response); + self.network_sender_out.send(NetworkEvent::BlockSyncResponse { + request_id, + response, + }).await; } } - None => {break;} + None => { break; } }, rpc_message = network_stream.next() => match rpc_message { Some(message) => match message { - NetworkMessage::PubsubMessage{topic, message} => { + NetworkMessage::PubsubMessage { topic, message } => { swarm_stream.get_mut().publish(&topic, message); } - NetworkMessage::RPC{peer_id, event} => { - swarm_stream.get_mut().send_rpc(peer_id, event); + NetworkMessage::RPC { peer_id, request, id } => { + swarm_stream.get_mut().send_rpc_request(&peer_id, request, id); } } - None => {break;} + None => { break; } }, interval_event = interval.next() => if interval_event.is_some() { info!("Peers connected: {}", swarm_stream.get_ref().peers().len()); diff --git a/node/forest_libp2p/tests/blocksync_test.rs b/node/forest_libp2p/tests/blocksync_test.rs deleted file mode 100644 index ac96bdefcf20..000000000000 --- a/node/forest_libp2p/tests/blocksync_test.rs +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright 2020 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT - -mod rpc_test_utils; - -use self::rpc_test_utils::*; -use forest_libp2p::blocksync::{BlockSyncRequest, BlockSyncResponse}; -use forest_libp2p::rpc::{RPCEvent, RPCMessage, RPCRequest, RPCResponse}; -use futures::future; - -#[test] -fn test_empty_blocksync() { - let (mut sender, mut receiver) = build_node_pair(); - - let rpc_request = RPCRequest::BlockSync(BlockSyncRequest { - start: vec![], - request_len: 0, - options: 0, - }); - - let c_request = rpc_request.clone(); - let rpc_response = RPCResponse::BlockSync(BlockSyncResponse { - chain: vec![], - status: 1, - message: "message".to_owned(), - }); - let c_response = rpc_response.clone(); - - let sender_fut = async move { - loop { - match sender.next().await { - RPCMessage::PeerDialed(peer_id) => { - // Send a BlocksByRange request - sender.send_rpc(peer_id, RPCEvent::Request(1, c_request.clone())); - } - RPCMessage::RPC(_peer_id, event) => match event { - RPCEvent::Response(req_id, res) => { - return (req_id, res); - } - ev => panic!("Sender invalid RPC received, {:?}", ev), - }, - e => panic!("unexpected {:?}", e), - } - } - }; - - let receiver_fut = async move { - loop { - match receiver.next().await { - RPCMessage::RPC(peer_id, event) => { - match event { - RPCEvent::Request(req_id, req) => { - assert_eq!(rpc_request.clone(), req); - assert_eq!(req_id, 1); - // send the response - receiver.send_rpc(peer_id, RPCEvent::Response(1, c_response.clone())); - } - ev => panic!("Receiver invalid RPC received, {:?}", ev), - } - } - RPCMessage::PeerDialed(_) => (), - e => panic!("unexpected {:?}", e), - } - } - }; - - let result = future::select(Box::pin(sender_fut), Box::pin(receiver_fut)); - let ((req_id, res), _) = async_std::task::block_on(result).factor_first(); - assert_eq!(res, rpc_response); - assert_eq!(req_id, 1); -} diff --git a/node/forest_libp2p/tests/hello_test.rs b/node/forest_libp2p/tests/hello_test.rs deleted file mode 100644 index 33f84b8ab6d3..000000000000 --- a/node/forest_libp2p/tests/hello_test.rs +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2020 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT - -mod rpc_test_utils; - -use self::rpc_test_utils::*; -use forest_cid::{multihash::Identity, Cid}; -use forest_libp2p::hello::{HelloMessage, HelloResponse}; -use forest_libp2p::rpc::{RPCEvent, RPCMessage, RPCRequest, RPCResponse}; -use futures::future; -use num_bigint::BigUint; - -fn empty_cid() -> Cid { - Cid::new_from_cbor(&[], Identity) -} - -#[test] -fn test_empty_rpc() { - let (mut sender, mut receiver) = build_node_pair(); - - let rpc_request = RPCRequest::Hello(HelloMessage { - heaviest_tip_set: vec![empty_cid()], - heaviest_tipset_weight: BigUint::from(1u8), - heaviest_tipset_height: 2, - genesis_hash: empty_cid(), - }); - - let c_request = rpc_request.clone(); - let rpc_response = RPCResponse::Hello(HelloResponse { - arrival: 4, - sent: 5, - }); - let c_response = rpc_response.clone(); - - let sender_fut = async move { - loop { - match sender.next().await { - RPCMessage::PeerDialed(peer_id) => { - sender.send_rpc(peer_id, RPCEvent::Request(1, c_request.clone())); - } - RPCMessage::RPC(_peer_id, event) => match event { - RPCEvent::Response(req_id, res) => { - return (req_id, res); - } - ev => panic!("Sender invalid RPC received, {:?}", ev), - }, - e => panic!("unexpected {:?}", e), - } - } - }; - - let receiver_fut = async move { - loop { - match receiver.next().await { - RPCMessage::RPC(peer_id, event) => { - match event { - RPCEvent::Request(req_id, req) => { - assert_eq!(rpc_request.clone(), req); - assert_eq!(req_id, 1); - // send the response - receiver.send_rpc(peer_id, RPCEvent::Response(1, c_response.clone())); - } - ev => panic!("Receiver invalid RPC received, {:?}", ev), - } - } - RPCMessage::PeerDialed(_) => (), - e => panic!("unexpected {:?}", e), - } - } - }; - - let result = future::select(Box::pin(sender_fut), Box::pin(receiver_fut)); - let ((req_id, res), _) = async_std::task::block_on(result).factor_first(); - assert_eq!(res, rpc_response); - assert_eq!(req_id, 1); -} diff --git a/node/forest_libp2p/tests/rpc_test_utils.rs b/node/forest_libp2p/tests/rpc_test_utils.rs deleted file mode 100644 index 23f51edafd50..000000000000 --- a/node/forest_libp2p/tests/rpc_test_utils.rs +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2020 ChainSafe Systems -// SPDX-License-Identifier: Apache-2.0, MIT - -use forest_libp2p::rpc::RPC; -use std::io::Error; - -use libp2p::core::{ - identity, multiaddr::Protocol, muxing::StreamMuxerBox, transport::MemoryTransport, upgrade, - Multiaddr, Transport, -}; -use libp2p::plaintext::PlainText2Config; -use libp2p::yamux; -use libp2p::Swarm; - -pub fn build_node_pair() -> (TestSwarm, TestSwarm) { - let (_, mut s1) = build_node(10005); - let (mut a2, s2) = build_node(10006); - - let _ = a2.pop(); - // dial each other - Swarm::dial_addr(&mut s1, a2).unwrap(); - - (s1, s2) -} - -pub type TestSwarm = Swarm; -pub fn build_node(port: u64) -> (Multiaddr, TestSwarm) { - let key = identity::Keypair::generate_ed25519(); - let public_key = key.public(); - - let transport = MemoryTransport::default() - .upgrade(upgrade::Version::V1) - .authenticate(PlainText2Config { - local_public_key: public_key.clone(), - }) - .multiplex(yamux::Config::default()) - .map(|(p, m), _| (p, StreamMuxerBox::new(m))) - .map_err(|e| -> Error { panic!("Failed to create transport: {:?}", e) }) - .boxed(); - - let peer_id = public_key.clone().into_peer_id(); - let behaviour = RPC::new(); - let mut swarm = Swarm::new(transport, behaviour, peer_id); - - let mut addr: Multiaddr = Protocol::Memory(port).into(); - Swarm::listen_on(&mut swarm, addr.clone()).unwrap(); - - addr = addr.with(libp2p::core::multiaddr::Protocol::P2p( - public_key.into_peer_id().into(), - )); - - (addr, swarm) -} diff --git a/utils/test_utils/src/chain_structures.rs b/utils/test_utils/src/chain_structures.rs index d8bd6b4f9e00..67b5042ec04d 100644 --- a/utils/test_utils/src/chain_structures.rs +++ b/utils/test_utils/src/chain_structures.rs @@ -12,7 +12,6 @@ use cid::{multihash::Blake2b256, Cid}; use crypto::{Signature, Signer, VRFProof}; use encoding::{from_slice, to_vec}; use forest_libp2p::blocksync::{BlockSyncResponse, TipsetBundle}; -use forest_libp2p::rpc::RPCResponse; use message::{SignedMessage, UnsignedMessage}; use num_bigint::BigUint; use std::error::Error; @@ -193,9 +192,9 @@ pub fn construct_tipset_bundle(epoch: u64, weight: u64) -> TipsetBundle { } /// Returns a RPCResponse used for testing -pub fn construct_blocksync_response() -> RPCResponse { +pub fn construct_blocksync_response() -> BlockSyncResponse { // construct block sync response - RPCResponse::BlockSync(BlockSyncResponse { + BlockSyncResponse { chain: vec![ construct_tipset_bundle(3, 10), construct_tipset_bundle(2, 10), @@ -203,5 +202,5 @@ pub fn construct_blocksync_response() -> RPCResponse { ], status: 0, message: "message".to_owned(), - }) + } } diff --git a/vm/actor/Cargo.toml b/vm/actor/Cargo.toml index c1911556b381..e5bce3616ec6 100644 --- a/vm/actor/Cargo.toml +++ b/vm/actor/Cargo.toml @@ -31,4 +31,4 @@ ahash = "0.3" [dev-dependencies] db = { path = "../../node/db" } hex = "0.4.2" -libp2p = "0.20" +libp2p = { git = "https://github.com/ChainSafe/rust-libp2p", rev = "3661071120d3c07eb9e56932204efcd12cdffc49" }