From 5d027c0ccc4f1c24225e5639f0a44142d4b7b1ae Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Mon, 8 Jan 2024 11:41:48 +0800 Subject: [PATCH] [#1407] feat(rust): refactor localfile store to speed up writing (#1422) ### What changes were proposed in this pull request? 1. using `opendal` crate to simplify read/write 2. avoid using dashmap.get to fix potential deadlock ### Why are the changes needed? For: #1407 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit tests ### Commit logs 1. using opendal crate to simplify read/write 2. avoid using dashmap.get to fix potential deadlock --- .gitignore | 1 - rust/experimental/server/Cargo.lock | 645 ++++++++++++++++- rust/experimental/server/Cargo.toml | 1 + rust/experimental/server/README.md | 17 +- rust/experimental/server/src/store/hybrid.rs | 21 +- .../server/src/store/local/disk.rs | 372 ++++++++++ .../server/src/store/local/mod.rs | 18 + .../server/src/store/localfile.rs | 670 +++--------------- rust/experimental/server/src/store/mod.rs | 1 + 9 files changed, 1151 insertions(+), 595 deletions(-) create mode 100644 rust/experimental/server/src/store/local/disk.rs create mode 100644 rust/experimental/server/src/store/local/mod.rs diff --git a/.gitignore b/.gitignore index df4bb57a5d..6f7717539f 100644 --- a/.gitignore +++ b/.gitignore @@ -29,7 +29,6 @@ deploy/kubernetes/docker/hadoopconfig/* *.dll *.so *.dylib -local vendor VERSION testbin/* diff --git a/rust/experimental/server/Cargo.lock b/rust/experimental/server/Cargo.lock index 94a281877a..0d3e7ac2fc 100644 --- a/rust/experimental/server/Cargo.lock +++ b/rust/experimental/server/Cargo.lock @@ -38,6 +38,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anyhow" version = "1.0.75" @@ -61,6 +76,19 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-compat" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f68a707c1feb095d8c07f8a65b9f506b117d30af431cab89374357de7c11461b" +dependencies = [ + "futures-core", + "futures-io", + "once_cell", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-trait" version = "0.1.73" @@ -157,6 +185,18 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backon" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c1a6197b2120bb2185a267f6515038558b019e92b832bb0320e96d66268dcf9" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "pin-project 1.1.3", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -184,6 +224,12 @@ version = "0.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" +[[package]] +name = "base64ct" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" + [[package]] name = "bindgen" version = "0.64.0" @@ -293,6 +339,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-targets 0.48.5", +] + [[package]] name = "clang-sys" version = "1.7.0" @@ -437,6 +497,32 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + +[[package]] +name = "const-random" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aaf16c9c2c612020bcfd042e170f6e32de9b9d75adb5277cdbbd2e2c8c8299a" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom", + "once_cell", + "tiny-keccak", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -607,6 +693,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + [[package]] name = "crypto-common" version = "0.1.6" @@ -674,6 +766,17 @@ dependencies = [ "uuid", ] +[[package]] +name = "der" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c" +dependencies = [ + "const-oid", + "pem-rfc7468", + "zeroize", +] + [[package]] name = "deranged" version = "0.3.8" @@ -718,7 +821,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", + "const-oid", "crypto-common", + "subtle", ] [[package]] @@ -741,6 +846,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "dlv-list" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", +] + [[package]] name = "either" version = "1.9.0" @@ -812,6 +926,15 @@ dependencies = [ "once_cell", ] +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.0.0" @@ -836,6 +959,12 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flagset" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d52a7e408202050813e6f1d9addadcaafef3dca7530c7ddfb005d4081cce6779" + [[package]] name = "flate2" version = "1.0.27" @@ -1047,8 +1176,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1184,6 +1315,15 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "home" version = "0.5.5" @@ -1257,6 +1397,20 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http", + "hyper", + "rustls 0.21.7", + "tokio", + "tokio-rustls 0.24.1", +] + [[package]] name = "hyper-timeout" version = "0.4.1" @@ -1282,6 +1436,29 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "iana-time-zone" +version = "0.1.59" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6a67363e2aa4443928ce15e57ebae94fd8949958fd1223c4cfc0cd473ad7539" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -1343,11 +1520,20 @@ dependencies = [ "log", "num-format", "once_cell", - "quick-xml", + "quick-xml 0.26.0", "rgb", "str_stack", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + [[package]] name = "io-lifetimes" version = "1.0.11" @@ -1400,11 +1586,29 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonwebtoken" +version = "9.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c7ea04a7c5c055c175f189b6dc6ba036fd62306b58c66c9f6389036c503a3f4" +dependencies = [ + "base64 0.21.4", + "js-sys", + "pem", + "ring 0.17.7", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "lazy_static" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +dependencies = [ + "spin 0.5.2", +] [[package]] name = "lazycell" @@ -1449,6 +1653,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "libm" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" + [[package]] name = "linux-raw-sys" version = "0.1.4" @@ -1492,6 +1702,16 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.6.3" @@ -1604,6 +1824,34 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-bigint" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-bigint-dig" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc84195820f291c7697304f3cbdadd1cb7199c0efc917ff5eafd71225c136151" +dependencies = [ + "byteorder", + "lazy_static", + "libm", + "num-integer", + "num-iter", + "num-traits", + "rand 0.8.5", + "smallvec", + "zeroize", +] + [[package]] name = "num-format" version = "0.4.4" @@ -1614,6 +1862,27 @@ dependencies = [ "itoa", ] +[[package]] +name = "num-integer" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-iter" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.16" @@ -1621,6 +1890,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" dependencies = [ "autocfg", + "libm", ] [[package]] @@ -1648,6 +1918,39 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +[[package]] +name = "opendal" +version = "0.44.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c32736a48ef08a5d2212864e2295c8e54f4d6b352b7f49aa0c29a12fc410ff66" +dependencies = [ + "anyhow", + "async-compat", + "async-trait", + "backon", + "base64 0.21.4", + "bytes 1.5.0", + "chrono", + "flagset", + "futures", + "getrandom", + "http", + "log", + "md-5", + "once_cell", + "parking_lot", + "percent-encoding", + "pin-project 1.1.3", + "quick-xml 0.30.0", + "reqsign", + "reqwest", + "serde", + "serde_json", + "sha2", + "tokio", + "uuid", +] + [[package]] name = "openssl" version = "0.10.57" @@ -1692,6 +1995,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-multimap" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4d6a8c22fc714f0c2373e6091bf6f5e9b37b1bc0b1184874b7e0a4e303d318f" +dependencies = [ + "dlv-list", + "hashbrown 0.14.0", +] + [[package]] name = "os_str_bytes" version = "6.5.1" @@ -1745,6 +2058,25 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" +[[package]] +name = "pem" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310" +dependencies = [ + "base64 0.21.4", + "serde", +] + +[[package]] +name = "pem-rfc7468" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.3.0" @@ -1813,6 +2145,27 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs1" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" +dependencies = [ + "der", + "pkcs8", + "spki", +] + +[[package]] +name = "pkcs8" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" +dependencies = [ + "der", + "spki", +] + [[package]] name = "pkg-config" version = "0.3.27" @@ -2065,6 +2418,26 @@ dependencies = [ "memchr", ] +[[package]] +name = "quick-xml" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956" +dependencies = [ + "memchr", + "serde", +] + +[[package]] +name = "quick-xml" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.33" @@ -2236,6 +2609,38 @@ dependencies = [ "winapi", ] +[[package]] +name = "reqsign" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce87f66ba6c6acef277a729f989a0eca946cb9ce6a15bcc036bda0f72d4b9fd" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.21.4", + "chrono", + "form_urlencoded", + "getrandom", + "hex", + "hmac", + "home", + "http", + "jsonwebtoken", + "log", + "once_cell", + "percent-encoding", + "quick-xml 0.31.0", + "rand 0.8.5", + "reqwest", + "rsa", + "rust-ini", + "serde", + "serde_json", + "sha1", + "sha2", + "tokio", +] + [[package]] name = "reqwest" version = "0.11.20" @@ -2251,6 +2656,7 @@ dependencies = [ "http", "http-body", "hyper", + "hyper-rustls", "hyper-tls", "ipnet", "js-sys", @@ -2260,15 +2666,21 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", + "rustls 0.21.7", + "rustls-native-certs", + "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", "tokio", "tokio-native-tls", + "tokio-rustls 0.24.1", + "tokio-util 0.7.9", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "winreg", ] @@ -2301,11 +2713,25 @@ dependencies = [ "libc", "once_cell", "spin 0.5.2", - "untrusted", + "untrusted 0.7.1", "web-sys", "winapi", ] +[[package]] +name = "ring" +version = "0.17.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74" +dependencies = [ + "cc", + "getrandom", + "libc", + "spin 0.9.8", + "untrusted 0.9.0", + "windows-sys 0.48.0", +] + [[package]] name = "roxmltree" version = "0.18.1" @@ -2315,6 +2741,36 @@ dependencies = [ "xmlparser", ] +[[package]] +name = "rsa" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0e5124fcb30e76a7e79bfee683a2746db83784b86289f6251b54b7950a0dfc" +dependencies = [ + "const-oid", + "digest", + "num-bigint-dig", + "num-integer", + "num-traits", + "pkcs1", + "pkcs8", + "rand_core 0.6.4", + "signature", + "spki", + "subtle", + "zeroize", +] + +[[package]] +name = "rust-ini" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e0698206bcb8882bf2a9ecb4c1e7785db57ff052297085a6efd4fe42302068a" +dependencies = [ + "cfg-if", + "ordered-multimap", +] + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -2362,7 +2818,7 @@ checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" dependencies = [ "base64 0.13.1", "log", - "ring", + "ring 0.16.20", "sct 0.6.1", "webpki", ] @@ -2374,11 +2830,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" dependencies = [ "log", - "ring", + "ring 0.16.20", "rustls-webpki", "sct 0.7.0", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.3" @@ -2394,8 +2862,8 @@ version = "0.101.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe" dependencies = [ - "ring", - "untrusted", + "ring 0.16.20", + "untrusted 0.7.1", ] [[package]] @@ -2431,8 +2899,8 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" dependencies = [ - "ring", - "untrusted", + "ring 0.16.20", + "untrusted 0.7.1", ] [[package]] @@ -2441,8 +2909,8 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" dependencies = [ - "ring", - "untrusted", + "ring 0.16.20", + "untrusted 0.7.1", ] [[package]] @@ -2531,6 +2999,17 @@ dependencies = [ "digest", ] +[[package]] +name = "sha2" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.4" @@ -2576,6 +3055,28 @@ dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" +dependencies = [ + "digest", + "rand_core 0.6.4", +] + +[[package]] +name = "simple_asn1" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adc4e5204eb1910f40f9cfa375f6f05b68c3abac4b6fd879c8ff5e7ae8a0a085" +dependencies = [ + "num-bigint", + "num-traits", + "thiserror", + "time", +] + [[package]] name = "slab" version = "0.4.9" @@ -2626,6 +3127,16 @@ dependencies = [ "lock_api", ] +[[package]] +name = "spki" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" +dependencies = [ + "base64ct", + "der", +] + [[package]] name = "sse-codec" version = "0.3.2" @@ -2662,6 +3173,12 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "subtle" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" + [[package]] name = "symbolic-common" version = "10.2.1" @@ -2730,7 +3247,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" dependencies = [ "cfg-if", - "fastrand", + "fastrand 2.0.0", "redox_syscall 0.3.5", "rustix 0.38.14", "windows-sys 0.48.0", @@ -2840,6 +3357,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -3278,6 +3804,7 @@ dependencies = [ "hyper", "log", "once_cell", + "opendal", "pin-project-lite", "poem", "pprof", @@ -3315,6 +3842,12 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.4.1" @@ -3342,6 +3875,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" dependencies = [ "getrandom", + "serde", ] [[package]] @@ -3443,6 +3977,19 @@ version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +[[package]] +name = "wasm-streams" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.64" @@ -3459,8 +4006,8 @@ version = "0.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" dependencies = [ - "ring", - "untrusted", + "ring 0.16.20", + "untrusted 0.7.1", ] [[package]] @@ -3506,6 +4053,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-sys" version = "0.45.0" @@ -3554,6 +4110,21 @@ dependencies = [ "windows_x86_64_msvc 0.48.5", ] +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm 0.52.0", + "windows_aarch64_msvc 0.52.0", + "windows_i686_gnu 0.52.0", + "windows_i686_msvc 0.52.0", + "windows_x86_64_gnu 0.52.0", + "windows_x86_64_gnullvm 0.52.0", + "windows_x86_64_msvc 0.52.0", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.2" @@ -3566,6 +4137,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + [[package]] name = "windows_aarch64_msvc" version = "0.42.2" @@ -3578,6 +4155,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + [[package]] name = "windows_i686_gnu" version = "0.42.2" @@ -3590,6 +4173,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + [[package]] name = "windows_i686_msvc" version = "0.42.2" @@ -3602,6 +4191,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + [[package]] name = "windows_x86_64_gnu" version = "0.42.2" @@ -3614,6 +4209,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + [[package]] name = "windows_x86_64_gnullvm" version = "0.42.2" @@ -3626,6 +4227,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + [[package]] name = "windows_x86_64_msvc" version = "0.42.2" @@ -3638,6 +4245,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" + [[package]] name = "winnow" version = "0.5.15" @@ -3662,3 +4275,9 @@ name = "xmlparser" version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + +[[package]] +name = "zeroize" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d" diff --git a/rust/experimental/server/Cargo.toml b/rust/experimental/server/Cargo.toml index acb0829866..ad9f5fc92f 100644 --- a/rust/experimental/server/Cargo.toml +++ b/rust/experimental/server/Cargo.toml @@ -97,6 +97,7 @@ clap = "3.0.14" socket2 = { version="0.4", features = ["all"]} cap = "0.1.2" spin = "0.9.8" +opendal = { version = "0.44.0", features = ["services-fs"]} [dependencies.hdfs-native] version = "0.5.0" diff --git a/rust/experimental/server/README.md b/rust/experimental/server/README.md index 73d48e8334..9cec4df486 100644 --- a/rust/experimental/server/README.md +++ b/rust/experimental/server/README.md @@ -71,14 +71,15 @@ memory_spill_low_watermark = 0.7 ``` #### TeraSort cost times -| type/buffer capacity | 250G (compressed) | comment | -|--------------------------------------|:------------------:|:--------------------------------------------------------:| -| vanilla uniffle (grpc-based) / 10g | 5.3min (2.3m/3m) | 1.9G/s | -| vanilla uniffle (grpc-based) / 300g | 5.6min (3.7m/1.9m) | GC occurs frequently / 2.5G/s | -| vanilla uniffle (netty-based) / 10g | / | read failed. 2.5G/s (write is better due to zero copy) | -| vanilla uniffle (netty-based) / 300g | / | app hang | -| rust based shuffle server / 10g | 4.6min (2.2m/2.4m) | 2.0 G/s | -| rust based shuffle server / 300g | 4min (1.5m/2.5m) | 3.5 G/s | +| type/buffer capacity | 250G (compressed) | comment | +|--------------------------------------|:------------------:|:------------------------------------------------------------------------:| +| vanilla spark ess | 5.0min (2.2/2.8) | ess use 400 nodes but uniffle only one. But the rss speed is still fast! | +| vanilla uniffle (grpc-based) / 10g | 5.3min (2.3m/3m) | 1.9G/s | +| vanilla uniffle (grpc-based) / 300g | 5.6min (3.7m/1.9m) | GC occurs frequently / 2.5G/s | +| vanilla uniffle (netty-based) / 10g | / | read failed. 2.5G/s (write is better due to zero copy) | +| vanilla uniffle (netty-based) / 300g | / | app hang | +| rust based shuffle server / 10g | 4.6min (2.2m/2.4m) | 2.4 G/s | +| rust based shuffle server / 300g | 4min (1.5m/2.5m) | 3.5 G/s | Compared with grpc based server, rust-based server has less memory footprint and stable performance. diff --git a/rust/experimental/server/src/store/hybrid.rs b/rust/experimental/server/src/store/hybrid.rs index 32be535114..f9feaa091c 100644 --- a/rust/experimental/server/src/store/hybrid.rs +++ b/rust/experimental/server/src/store/hybrid.rs @@ -230,7 +230,10 @@ impl HybridStore { ctx.data_blocks.sort_by_key(|block| block.task_attempt_id); // when throwing the data lost error, it should fast fail for this partition data. - let inserted = candidate_store.insert(ctx).await; + let inserted = candidate_store + .insert(ctx) + .instrument_await("inserting into the persistent store, invoking [write]") + .await; if let Err(err) = inserted { match err { WorkerError::PARTIAL_DATA_LOST(msg) => { @@ -335,15 +338,19 @@ impl Store for HybridStore { let store = self.clone(); let concurrency_limiter = Arc::new(Semaphore::new(store.memory_spill_max_concurrency as usize)); - self.runtime_manager.default_runtime.spawn(async move { + self.runtime_manager.write_runtime.spawn(async move { while let Ok(message) = store.memory_spill_recv.recv().await { let await_root = await_tree_registry - .register(format!("hot->warm flush.")) + .register(format!("hot->warm flush. uid: {:#?}", &message.ctx.uid)) .await; // using acquire_owned(), refer to https://github.com/tokio-rs/tokio/issues/1998 - let concurrency_guarder = - concurrency_limiter.clone().acquire_owned().await.unwrap(); + let concurrency_guarder = concurrency_limiter + .clone() + .acquire_owned() + .instrument_await("waiting for the spill concurrent lock.") + .await + .unwrap(); TOTAL_MEMORY_SPILL_OPERATION.inc(); GAUGE_MEMORY_SPILL_OPERATION.inc(); @@ -358,6 +365,7 @@ impl Store for HybridStore { } match store_cloned .memory_spill_to_persistent_store(message.ctx.clone(), message.id) + .instrument_await("memory_spill_to_persistent_store.") .await { Ok(msg) => { @@ -768,8 +776,9 @@ mod tests { let reading_view_ctx = ReadingViewContext { uid: uid.clone(), reading_options: ReadingOptions::FILE_OFFSET_AND_LEN(offset, length as i64), - serialized_expected_task_ids_bitmap: Default::default(), + serialized_expected_task_ids_bitmap: None, }; + println!("reading. offset: {:?}. len: {:?}", offset, length); let read_data = store.get(reading_view_ctx).await.unwrap(); match read_data { ResponseData::Local(local_data) => { diff --git a/rust/experimental/server/src/store/local/disk.rs b/rust/experimental/server/src/store/local/disk.rs new file mode 100644 index 0000000000..a0e16af740 --- /dev/null +++ b/rust/experimental/server/src/store/local/disk.rs @@ -0,0 +1,372 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::runtime::manager::RuntimeManager; +use anyhow::{anyhow, Result}; +use await_tree::InstrumentAwait; +use bytes::{Bytes, BytesMut}; +use futures::AsyncWriteExt; +use log::{error, info, warn}; +use opendal::services::Fs; +use opendal::Operator; +use std::io::SeekFrom; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; +use tokio::sync::Semaphore; + +pub struct LocalDiskConfig { + pub(crate) high_watermark: f32, + pub(crate) low_watermark: f32, + pub(crate) max_concurrency: i32, +} + +impl LocalDiskConfig { + pub fn create_mocked_config() -> Self { + LocalDiskConfig { + high_watermark: 1.0, + low_watermark: 0.6, + max_concurrency: 20, + } + } +} + +impl Default for LocalDiskConfig { + fn default() -> Self { + LocalDiskConfig { + high_watermark: 0.8, + low_watermark: 0.6, + max_concurrency: 40, + } + } +} + +pub struct LocalDisk { + pub(crate) root: String, + operator: Operator, + concurrency_limiter: Semaphore, + is_corrupted: AtomicBool, + is_healthy: AtomicBool, + config: LocalDiskConfig, +} + +impl LocalDisk { + pub fn new( + root: String, + config: LocalDiskConfig, + runtime_manager: RuntimeManager, + ) -> Arc { + let mut builder = Fs::default(); + builder.root(&root); + let operator: Operator = Operator::new(builder).unwrap().finish(); + + let instance = LocalDisk { + root, + operator, + concurrency_limiter: Semaphore::new(config.max_concurrency as usize), + is_corrupted: AtomicBool::new(false), + is_healthy: AtomicBool::new(true), + config, + }; + let instance = Arc::new(instance); + + let runtime = runtime_manager.default_runtime.clone(); + let cloned = instance.clone(); + runtime.spawn(async { + info!("Starting the disk healthy check, root: {}", &cloned.root); + LocalDisk::loop_check_disk(cloned).await; + }); + + instance + } + + async fn write_read_check(local_disk: Arc) -> Result<()> { + let temp_path = "corruption_check.file"; + // cleanup remaining files before checking. + local_disk.delete(temp_path).await?; + + let written_data = Bytes::copy_from_slice(b"file corruption check"); + local_disk.write(written_data.clone(), temp_path).await?; + let read_data = local_disk.read(temp_path, 0, None).await?; + local_disk.delete(temp_path).await?; + + if written_data != read_data { + let msg = format!( + "The local disk has been corrupted. path: {}. expected: {:?}, actual: {:?}", + &local_disk.root, &written_data, &read_data + ); + Err(anyhow!(msg)) + } else { + Ok(()) + } + } + + async fn loop_check_disk(local_disk: Arc) { + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + + if local_disk.is_corrupted().unwrap() { + return; + } + + let check_succeed: Result<()> = LocalDisk::write_read_check(local_disk.clone()).await; + if check_succeed.is_err() { + local_disk.mark_corrupted(); + error!( + "Errors on checking local disk corruption. err: {:#?}", + check_succeed.err() + ); + } + + // check the capacity + let used_ratio = local_disk.get_disk_used_ratio(); + if used_ratio.is_err() { + error!( + "Errors on getting the used ratio of the disk capacity. err: {:?}", + used_ratio.err() + ); + continue; + } + + let used_ratio = used_ratio.unwrap(); + if local_disk.is_healthy().unwrap() + && used_ratio > local_disk.config.high_watermark as f64 + { + warn!("Disk={} has been unhealthy.", &local_disk.root); + local_disk.mark_unhealthy(); + continue; + } + + if !local_disk.is_healthy().unwrap() + && used_ratio < local_disk.config.low_watermark as f64 + { + warn!("Disk={} has been healthy.", &local_disk.root); + local_disk.mark_healthy(); + continue; + } + } + } + + pub async fn create_dir(&self, dir: &str) -> Result<()> { + self.operator.create_dir(dir).await?; + Ok(()) + } + + // this will ensure the data flushed into the file + async fn write(&self, data: Bytes, path: &str) -> Result<()> { + self.operator.write(path, data).await?; + Ok(()) + } + + pub async fn append(&self, data: Bytes, path: &str) -> Result<()> { + let _concurrency_guarder = self + .concurrency_limiter + .acquire() + .instrument_await("meet the concurrency limiter") + .await?; + + let mut writer = self + .operator + .writer_with(path) + .append(true) + .instrument_await("creating the writer...") + .await?; + // we must use the write_all to ensure the buffer consumed by the OS. + // Please see the detail: https://doc.rust-lang.org/std/io/trait.Write.html#method.write_all + writer + .write_all(&*data) + .instrument_await("writing the data into buffer...") + .await?; + writer + .flush() + .instrument_await("committing the data into file...") + .await?; + + Ok(()) + } + + pub async fn get_file_len(&self, path: &str) -> Result { + match self.operator.stat(path).await { + Ok(meta) => Ok(meta.content_length() as i64), + Err(_) => Ok(0), + } + } + + pub async fn read(&self, path: &str, offset: i64, length: Option) -> Result { + if length.is_none() { + return Ok(Bytes::from(self.operator.read(path).await?)); + } + + let mut reader = self.operator.reader(path).await?; + reader.seek(SeekFrom::Start(offset as u64)).await?; + + let mut buffer = vec![0; length.unwrap() as usize]; + reader.read_exact(buffer.as_mut()).await?; + + let mut bytes_buffer = BytesMut::new(); + bytes_buffer.extend_from_slice(&*buffer); + Ok(bytes_buffer.freeze()) + } + + pub async fn delete(&self, path: &str) -> Result<()> { + self.operator.remove_all(path).await?; + Ok(()) + } + + fn mark_corrupted(&self) { + self.is_corrupted.store(true, Ordering::SeqCst); + } + + fn mark_unhealthy(&self) { + self.is_healthy.store(false, Ordering::SeqCst); + } + + fn mark_healthy(&self) { + self.is_healthy.store(true, Ordering::SeqCst); + } + + pub fn is_corrupted(&self) -> Result { + Ok(self.is_corrupted.load(Ordering::SeqCst)) + } + + pub fn is_healthy(&self) -> Result { + Ok(self.is_healthy.load(Ordering::SeqCst)) + } + + fn get_disk_used_ratio(&self) -> Result { + // Get the total and available space in bytes + let available_space = fs2::available_space(&self.root)?; + let total_space = fs2::total_space(&self.root)?; + Ok(1.0 - (available_space as f64 / total_space as f64)) + } +} + +#[cfg(test)] +mod tests { + use crate::runtime::manager::RuntimeManager; + use crate::store::local::disk::{LocalDisk, LocalDiskConfig}; + use bytes::Bytes; + use std::time::Duration; + + #[test] + fn test_local_disk_delete_operation() { + let temp_dir = tempdir::TempDir::new("test_local_disk_delete_operation-dir").unwrap(); + let temp_path = temp_dir.path().to_str().unwrap().to_string(); + + println!("init the path: {}", &temp_path); + + let runtime: RuntimeManager = Default::default(); + let local_disk = LocalDisk::new( + temp_path.clone(), + LocalDiskConfig::default(), + runtime.clone(), + ); + + let data = b"hello!"; + runtime.wait(local_disk.create_dir("a/")).unwrap(); + runtime + .wait(local_disk.append(Bytes::copy_from_slice(data), "a/b")) + .unwrap(); + + assert_eq!( + true, + runtime + .wait(tokio::fs::try_exists(format!( + "{}/{}", + &temp_path, + "a/b".to_string() + ))) + .unwrap() + ); + + runtime + .wait(local_disk.delete("a/")) + .expect("TODO: panic message"); + assert_eq!( + false, + runtime + .wait(tokio::fs::try_exists(format!( + "{}/{}", + &temp_path, + "a/b".to_string() + ))) + .unwrap() + ); + } + + #[test] + fn local_disk_corruption_healthy_check() { + let temp_dir = tempdir::TempDir::new("test_directory").unwrap(); + let temp_path = temp_dir.path().to_str().unwrap().to_string(); + + let local_disk = LocalDisk::new( + temp_path.clone(), + LocalDiskConfig::create_mocked_config(), + Default::default(), + ); + + awaitility::at_most(Duration::from_secs(10)).until(|| local_disk.is_healthy().unwrap()); + assert_eq!(false, local_disk.is_corrupted().unwrap()); + } + + #[test] + fn local_disk_test() { + let temp_dir = tempdir::TempDir::new("test_directory").unwrap(); + let temp_path = temp_dir.path().to_str().unwrap().to_string(); + + let runtime: RuntimeManager = Default::default(); + let local_disk = LocalDisk::new( + temp_path.clone(), + LocalDiskConfig::default(), + runtime.clone(), + ); + + let data = b"Hello, World!"; + + let relative_path = "app-id/test_file.txt"; + + runtime.wait(local_disk.create_dir("app-id/")).unwrap(); + + for _ in 0..2 { + let write_result = + runtime.wait(local_disk.append(Bytes::copy_from_slice(data), relative_path)); + assert!(write_result.is_ok()); + } + + let read_result = runtime.wait(local_disk.read(relative_path, 0, Some(data.len() as i64))); + assert!(read_result.is_ok()); + let read_data = read_result.unwrap(); + let expected = b"Hello, World!"; + assert_eq!(read_data.as_ref(), expected); + + // read the middle word + let read_result = runtime.wait(local_disk.read( + relative_path, + data.len() as i64, + Some(data.len() as i64), + )); + assert_eq!(read_result.unwrap().as_ref(), expected); + + // read all words + let read_result = runtime.wait(local_disk.read(relative_path, 0, None)); + let expected = b"Hello, World!Hello, World!"; + assert_eq!(read_result.unwrap().as_ref(), expected); + + temp_dir.close().unwrap(); + } +} diff --git a/rust/experimental/server/src/store/local/mod.rs b/rust/experimental/server/src/store/local/mod.rs new file mode 100644 index 0000000000..63ef41a5bb --- /dev/null +++ b/rust/experimental/server/src/store/local/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod disk; diff --git a/rust/experimental/server/src/store/localfile.rs b/rust/experimental/server/src/store/localfile.rs index e527d8cdb1..cc2a881f60 100644 --- a/rust/experimental/server/src/store/localfile.rs +++ b/rust/experimental/server/src/store/localfile.rs @@ -28,38 +28,42 @@ use crate::store::{ LocalDataIndex, PartitionedLocalData, Persistent, RequireBufferResponse, ResponseData, ResponseDataIndex, Store, }; +use std::ops::Deref; +use std::path::Path; use anyhow::Result; use async_trait::async_trait; use await_tree::InstrumentAwait; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{BufMut, BytesMut}; use dashmap::DashMap; -use log::{debug, error, info, warn}; +use log::{debug, error, warn}; use crate::runtime::manager::RuntimeManager; -use std::io::SeekFrom; -use std::path::Path; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; -use std::time::Duration; -use tokio::fs::OpenOptions; -use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; -use tokio::sync::{RwLock, Semaphore}; - -fn create_directory_if_not_exists(dir_path: &str) { - if !std::fs::metadata(dir_path).is_ok() { - std::fs::create_dir_all(dir_path).expect("Errors on creating dirs."); + +use crate::store::local::disk::{LocalDisk, LocalDiskConfig}; + +struct LockedObj { + disk: Arc, + pointer: AtomicI64, +} + +impl From> for LockedObj { + fn from(value: Arc) -> Self { + Self { + disk: value.clone(), + pointer: Default::default(), + } } } pub struct LocalFileStore { local_disks: Vec>, - partition_written_disk_map: DashMap>>>, - partition_file_locks: DashMap>>, healthy_check_min_disks: i32, - runtime_manager: RuntimeManager, + partition_locks: DashMap>, } impl Persistent for LocalFileStore {} @@ -81,10 +85,9 @@ impl LocalFileStore { } LocalFileStore { local_disks: local_disk_instances, - partition_written_disk_map: DashMap::new(), - partition_file_locks: DashMap::new(), healthy_check_min_disks: 1, runtime_manager, + partition_locks: Default::default(), } } @@ -101,10 +104,9 @@ impl LocalFileStore { } LocalFileStore { local_disks: local_disk_instances, - partition_written_disk_map: DashMap::new(), - partition_file_locks: DashMap::new(), healthy_check_min_disks: localfile_config.healthy_check_min_disks.unwrap_or(1), runtime_manager, + partition_locks: Default::default(), } } @@ -129,69 +131,6 @@ impl LocalFileStore { ) } - fn get_app_all_partitions(&self, app_id: &str) -> Vec<(i32, i32)> { - let stage_entry = self.partition_written_disk_map.get(app_id); - if stage_entry.is_none() { - return vec![]; - } - - let stages = stage_entry.unwrap(); - let mut partition_ids = vec![]; - for stage_item in stages.iter() { - let (shuffle_id, partitions) = stage_item.pair(); - for partition_item in partitions.iter() { - let (partition_id, _) = partition_item.pair(); - partition_ids.push((*shuffle_id, *partition_id)); - } - } - - partition_ids - } - - fn delete_app(&self, app_id: &str) -> Result<()> { - self.partition_written_disk_map.remove(app_id); - Ok(()) - } - - fn get_owned_disk(&self, uid: PartitionedUId) -> Option> { - let app_id = uid.app_id; - let shuffle_id = uid.shuffle_id; - let partition_id = uid.partition_id; - - let shuffle_entry = self - .partition_written_disk_map - .entry(app_id) - .or_insert_with(|| DashMap::new()); - let partition_entry = shuffle_entry - .entry(shuffle_id) - .or_insert_with(|| DashMap::new()); - - partition_entry - .get(&partition_id) - .map(|v| v.value().clone()) - } - - async fn get_or_create_owned_disk(&self, uid: PartitionedUId) -> Result> { - let uid_ref = &uid.clone(); - let app_id = uid.app_id; - let shuffle_id = uid.shuffle_id; - let partition_id = uid.partition_id; - - let shuffle_entry = self - .partition_written_disk_map - .entry(app_id) - .or_insert_with(|| DashMap::new()); - let partition_entry = shuffle_entry - .entry(shuffle_id) - .or_insert_with(|| DashMap::new()); - let local_disk = partition_entry - .entry(partition_id) - .or_insert(self.select_disk(uid_ref).await?) - .clone(); - - Ok(local_disk) - } - fn healthy_check(&self) -> Result { let mut available = 0; for local_disk in &self.local_disks { @@ -207,7 +146,7 @@ impl LocalFileStore { Ok(available > self.healthy_check_min_disks) } - async fn select_disk(&self, uid: &PartitionedUId) -> Result, WorkerError> { + fn select_disk(&self, uid: &PartitionedUId) -> Result, WorkerError> { let hash_value = PartitionedUId::get_hash(uid); let mut candidates = vec![]; @@ -244,36 +183,33 @@ impl Store for LocalFileStore { } let uid = ctx.uid; - let _pid = uid.partition_id; let (data_file_path, index_file_path) = LocalFileStore::gen_relative_path_for_partition(&uid); - let local_disk = self.get_or_create_owned_disk(uid.clone()).await?; - - if local_disk.is_corrupted()? { - return Err(WorkerError::PARTIAL_DATA_LOST( - local_disk.base_path.to_string(), - )); - } - let lock_cloned = self - .partition_file_locks + let mut parent_dir_is_created = false; + let locked_obj = self + .partition_locks .entry(data_file_path.clone()) - .or_insert_with(|| Arc::new(RwLock::new(()))) + .or_insert_with(|| { + parent_dir_is_created = true; + Arc::new(LockedObj::from(self.select_disk(&uid).unwrap())) + }) .clone(); - let _lock_guard = lock_cloned - .write() - .instrument_await(format!( - "localfile partition file lock. path: {}", - &data_file_path - )) - .await; - // write index file and data file - // todo: split multiple pieces - let mut next_offset = local_disk - .get_file_len(data_file_path.clone()) - .instrument_await(format!("getting the file len. path: {}", &data_file_path)) - .await?; + let local_disk = &locked_obj.disk; + let mut next_offset = locked_obj.pointer.load(Ordering::SeqCst); + + if local_disk.is_corrupted()? { + return Err(WorkerError::PARTIAL_DATA_LOST(local_disk.root.to_string())); + } + + if !parent_dir_is_created { + if let Some(path) = Path::new(&data_file_path).parent() { + local_disk + .create_dir(format!("{:?}/", path.to_str().unwrap()).as_str()) + .await?; + } + } let mut index_bytes_holder = BytesMut::new(); let mut data_bytes_holder = BytesMut::new(); @@ -296,25 +232,30 @@ impl Store for LocalFileStore { index_bytes_holder.put_i64(task_attempt_id); let data = block.data; - // if get_crc(&data) != crc { - // error!("The crc value is not the same. partition id: {}, block id: {}", pid, block_id); - // } data_bytes_holder.extend_from_slice(&data); next_offset += length as i64; } local_disk - .write(data_bytes_holder.freeze(), data_file_path.clone()) - .instrument_await(format!("localfile writing data. path: {}", data_file_path)) + .append(data_bytes_holder.freeze(), &data_file_path) + .instrument_await(format!("localfile writing data. path: {}", &data_file_path)) .await?; local_disk - .write(index_bytes_holder.freeze(), index_file_path.clone()) - .instrument_await(format!("localfile writing index. path: {}", data_file_path)) + .append(index_bytes_holder.freeze(), &index_file_path) + .instrument_await(format!( + "localfile writing index. path: {}", + &index_file_path + )) .await?; TOTAL_LOCALFILE_USED.inc_by(total_size as u64); + locked_obj + .deref() + .pointer + .store(next_offset, Ordering::SeqCst); + Ok(()) } @@ -333,21 +274,10 @@ impl Store for LocalFileStore { } let (data_file_path, _) = LocalFileStore::gen_relative_path_for_partition(&uid); - let lock_cloned = self - .partition_file_locks - .entry(data_file_path.clone()) - .or_insert_with(|| Arc::new(RwLock::new(()))) - .clone(); - let _lock_guard = lock_cloned - .read() - .instrument_await("getting file read lock") - .await; - - let local_disk: Option> = self.get_owned_disk(uid.clone()); - if local_disk.is_none() { + if !self.partition_locks.contains_key(&data_file_path) { warn!( - "This should not happen of local disk not found for [{:?}]", + "There is no cached data in localfile store for [{:?}]", &uid ); return Ok(ResponseData::Local(PartitionedLocalData { @@ -355,17 +285,26 @@ impl Store for LocalFileStore { })); } - let local_disk = local_disk.unwrap(); + let locked_object = self + .partition_locks + .entry(data_file_path.clone()) + .or_insert_with(|| Arc::new(LockedObj::from(self.select_disk(&uid).unwrap()))) + .clone(); + + let local_disk = &locked_object.disk; if local_disk.is_corrupted()? { return Err(WorkerError::LOCAL_DISK_OWNED_BY_PARTITION_CORRUPTED( - local_disk.base_path.to_string(), + local_disk.root.to_string(), )); } let data = local_disk - .read(data_file_path, offset, Some(len)) - .instrument_await("getting data from localfile") + .read(&data_file_path, offset, Some(len)) + .instrument_await(format!( + "getting data from localfile: {:?}", + &data_file_path + )) .await?; Ok(ResponseData::Local(PartitionedLocalData { data })) } @@ -378,21 +317,9 @@ impl Store for LocalFileStore { let (data_file_path, index_file_path) = LocalFileStore::gen_relative_path_for_partition(&uid); - let lock_cloned = self - .partition_file_locks - .entry(data_file_path.clone()) - .or_insert_with(|| Arc::new(RwLock::new(()))) - .clone(); - let _lock_guard = lock_cloned - .read() - .instrument_await("waiting file lock to read index data") - .await; - - let local_disk: Option> = self.get_owned_disk(uid.clone()); - - if local_disk.is_none() { + if !self.partition_locks.contains_key(&data_file_path) { warn!( - "This should not happen of local disk not found for [{:?}]", + "There is no cached data in localfile store for [{:?}]", &uid ); return Ok(Local(LocalDataIndex { @@ -401,21 +328,29 @@ impl Store for LocalFileStore { })); } - let local_disk = local_disk.unwrap(); + let locked_object = self + .partition_locks + .entry(data_file_path.clone()) + .or_insert_with(|| Arc::new(LockedObj::from(self.select_disk(&uid).unwrap()))) + .clone(); + let local_disk = &locked_object.disk; if local_disk.is_corrupted()? { return Err(WorkerError::LOCAL_DISK_OWNED_BY_PARTITION_CORRUPTED( - local_disk.base_path.to_string(), + local_disk.root.to_string(), )); } let index_data_result = local_disk - .read(index_file_path, 0, None) - .instrument_await("reading index data from file") + .read(&index_file_path, 0, None) + .instrument_await(format!( + "reading index data from file: {:?}", + &index_file_path + )) .await?; let len = local_disk - .get_file_len(data_file_path) - .instrument_await("getting file len from file") + .get_file_len(&data_file_path) + .instrument_await(format!("getting file len from file: {:?}", &data_file_path)) .await?; Ok(Local(LocalDataIndex { index_data: index_data_result, @@ -423,17 +358,6 @@ impl Store for LocalFileStore { })) } - async fn require_buffer( - &self, - _ctx: RequireBufferContext, - ) -> Result { - todo!() - } - - async fn release_buffer(&self, _ctx: ReleaseBufferContext) -> Result { - todo!() - } - async fn purge(&self, ctx: PurgeDataContext) -> Result<()> { let app_id = ctx.app_id; let shuffle_id_option = ctx.shuffle_id; @@ -445,28 +369,18 @@ impl Store for LocalFileStore { for local_disk_ref in &self.local_disks { let disk = local_disk_ref.clone(); - disk.delete(data_relative_dir_path.to_string()).await?; + disk.delete(&data_relative_dir_path).await?; } - if shuffle_id_option.is_none() { - let all_partition_ids = self.get_app_all_partitions(&app_id); - if all_partition_ids.is_empty() { - return Ok(()); - } - - for (shuffle_id, partition_id) in all_partition_ids.into_iter() { - // delete lock - let uid = PartitionedUId { - app_id: app_id.clone(), - shuffle_id, - partition_id, - }; - let (data_file_path, _) = LocalFileStore::gen_relative_path_for_partition(&uid); - self.partition_file_locks.remove(&data_file_path); - } + let keys_to_delete: Vec<_> = self + .partition_locks + .iter() + .filter(|entry| entry.key().starts_with(&data_relative_dir_path)) + .map(|entry| entry.key().to_string()) + .collect(); - // delete disk mapping - self.delete_app(&app_id)?; + for key in keys_to_delete { + self.partition_locks.remove(&key); } Ok(()) @@ -475,280 +389,16 @@ impl Store for LocalFileStore { async fn is_healthy(&self) -> Result { self.healthy_check() } -} - -struct LocalDiskConfig { - high_watermark: f32, - low_watermark: f32, - max_concurrency: i32, -} - -impl LocalDiskConfig { - fn create_mocked_config() -> Self { - LocalDiskConfig { - high_watermark: 1.0, - low_watermark: 0.6, - max_concurrency: 20, - } - } -} - -impl Default for LocalDiskConfig { - fn default() -> Self { - LocalDiskConfig { - high_watermark: 0.8, - low_watermark: 0.6, - max_concurrency: 40, - } - } -} - -struct LocalDisk { - base_path: String, - concurrency_limiter: Semaphore, - is_corrupted: AtomicBool, - is_healthy: AtomicBool, - config: LocalDiskConfig, -} - -impl LocalDisk { - fn new(path: String, config: LocalDiskConfig, runtime_manager: RuntimeManager) -> Arc { - create_directory_if_not_exists(&path); - let instance = LocalDisk { - base_path: path, - concurrency_limiter: Semaphore::new(config.max_concurrency as usize), - is_corrupted: AtomicBool::new(false), - is_healthy: AtomicBool::new(true), - config, - }; - let instance = Arc::new(instance); - - let runtime = runtime_manager.default_runtime.clone(); - let cloned = instance.clone(); - runtime.spawn(async { - info!( - "Starting the disk healthy checking, base path: {}", - &cloned.base_path - ); - LocalDisk::loop_check_disk(cloned).await; - }); - - instance - } - - async fn write_read_check(local_disk: Arc) -> Result<()> { - let temp_path = format!("{}/{}", &local_disk.base_path, "corruption_check.file"); - let data = Bytes::copy_from_slice(b"file corruption check"); - { - let mut file = OpenOptions::new() - .write(true) - .create(true) - .open(&temp_path) - .await?; - file.write_all(&data).await?; - file.flush().await?; - } - - let mut read_data = Vec::new(); - { - let mut file = tokio::fs::File::open(&temp_path).await?; - file.read_to_end(&mut read_data).await?; - - tokio::fs::remove_file(&temp_path).await?; - } - - if data != Bytes::copy_from_slice(&read_data) { - local_disk.mark_corrupted(); - error!( - "The local disk has been corrupted. path: {}", - &local_disk.base_path - ); - } - - Ok(()) - } - - async fn loop_check_disk(local_disk: Arc) { - loop { - tokio::time::sleep(Duration::from_secs(10)).await; - - if local_disk.is_corrupted().unwrap() { - return; - } - - let check_succeed: Result<()> = LocalDisk::write_read_check(local_disk.clone()).await; - if check_succeed.is_err() { - local_disk.mark_corrupted(); - error!( - "Errors on checking local disk corruption. err: {:#?}", - check_succeed.err() - ); - } - - // check the capacity - let used_ratio = local_disk.get_disk_used_ratio(); - if used_ratio.is_err() { - error!( - "Errors on getting the used ratio of the disk capacity. err: {:?}", - used_ratio.err() - ); - continue; - } - - let used_ratio = used_ratio.unwrap(); - if local_disk.is_healthy().unwrap() - && used_ratio > local_disk.config.high_watermark as f64 - { - warn!("Disk={} has been unhealthy.", &local_disk.base_path); - local_disk.mark_unhealthy(); - continue; - } - - if !local_disk.is_healthy().unwrap() - && used_ratio < local_disk.config.low_watermark as f64 - { - warn!("Disk={} has been healthy.", &local_disk.base_path); - local_disk.mark_healthy(); - continue; - } - } - } - - fn append_path(&self, path: String) -> String { - format!("{}/{}", self.base_path.clone(), path) - } - - async fn write(&self, data: Bytes, relative_file_path: String) -> Result<()> { - let _concurrency_guarder = self - .concurrency_limiter - .acquire() - .instrument_await("meet the concurrency limiter") - .await?; - let absolute_path = self.append_path(relative_file_path.clone()); - let path = Path::new(&absolute_path); - - match path.parent() { - Some(parent) => { - if !parent.exists() { - create_directory_if_not_exists(parent.to_str().unwrap()) - } - } - _ => todo!(), - } - - debug!("data file: {}", &absolute_path); - - let mut output_file = OpenOptions::new() - .append(true) - .create(true) - .open(absolute_path) - .await?; - output_file.write_all(data.as_ref()).await?; - output_file.flush().await?; - - Ok(()) - } - async fn get_file_len(&self, relative_file_path: String) -> Result { - let file_path = self.append_path(relative_file_path); - - Ok( - match tokio::fs::metadata(file_path) - .instrument_await("getting metadata of path") - .await - { - Ok(metadata) => metadata.len() as i64, - _ => 0i64, - }, - ) - } - - async fn read( + async fn require_buffer( &self, - relative_file_path: String, - offset: i64, - length: Option, - ) -> Result { - let file_path = self.append_path(relative_file_path); - - let file = tokio::fs::File::open(&file_path) - .instrument_await(format!("opening file. path: {}", &file_path)) - .await?; - - let read_len = match length { - Some(len) => len, - _ => file - .metadata() - .instrument_await(format!("getting file metadata. path: {}", &file_path)) - .await? - .len() - .try_into() - .unwrap(), - } as usize; - - let mut reader = tokio::io::BufReader::new(file); - let mut buffer = vec![0; read_len]; - reader - .seek(SeekFrom::Start(offset as u64)) - .instrument_await(format!( - "seeking file [{}:{}] of path: {}", - offset, read_len, &file_path - )) - .await?; - reader - .read_exact(buffer.as_mut()) - .instrument_await(format!( - "reading data of len: {} from path: {}", - read_len, &file_path - )) - .await?; - - let mut bytes_buffer = BytesMut::new(); - bytes_buffer.extend_from_slice(&*buffer); - Ok(bytes_buffer.freeze()) - } - - async fn delete(&self, relative_file_path: String) -> Result<()> { - let delete_path = self.append_path(relative_file_path); - if !tokio::fs::try_exists(&delete_path).await? { - info!("The path:{} does not exist, ignore purging.", &delete_path); - return Ok(()); - } - - let metadata = tokio::fs::metadata(&delete_path).await?; - if metadata.is_dir() { - tokio::fs::remove_dir_all(delete_path).await?; - } else { - tokio::fs::remove_file(delete_path).await?; - } - Ok(()) - } - - fn mark_corrupted(&self) { - self.is_corrupted.store(true, Ordering::SeqCst); - } - - fn mark_unhealthy(&self) { - self.is_healthy.store(false, Ordering::SeqCst); - } - - fn mark_healthy(&self) { - self.is_healthy.store(true, Ordering::SeqCst); - } - - fn is_corrupted(&self) -> Result { - Ok(self.is_corrupted.load(Ordering::SeqCst)) - } - - fn is_healthy(&self) -> Result { - Ok(self.is_healthy.load(Ordering::SeqCst)) + _ctx: RequireBufferContext, + ) -> Result { + todo!() } - fn get_disk_used_ratio(&self) -> Result { - // Get the total and available space in bytes - let available_space = fs2::available_space(&self.base_path)?; - let total_space = fs2::total_space(&self.base_path)?; - Ok(1.0 - (available_space as f64 / total_space as f64)) + async fn release_buffer(&self, _ctx: ReleaseBufferContext) -> Result { + todo!() } } @@ -758,17 +408,12 @@ mod test { PartitionedUId, PurgeDataContext, ReadingIndexViewContext, ReadingOptions, ReadingViewContext, WritingViewContext, }; - use crate::store::localfile::{LocalDisk, LocalDiskConfig, LocalFileStore}; + use crate::store::localfile::LocalFileStore; use crate::store::{PartitionedDataBlock, ResponseData, ResponseDataIndex, Store}; use bytes::{Buf, Bytes, BytesMut}; use log::info; - use crate::runtime::manager::RuntimeManager; - use std::io::Read; - use std::thread; - use std::time::Duration; - #[test] fn purge_test() -> anyhow::Result<()> { let temp_dir = tempdir::TempDir::new("test_local_store").unwrap(); @@ -840,10 +485,6 @@ mod test { false, runtime.wait(tokio::fs::try_exists(format!("{}/{}", &temp_path, &app_id)))? ); - assert!(!local_store - .partition_file_locks - .contains_key(&format!("{}/{}/{}/{}.data", &temp_path, &app_id, 0, 0))); - assert!(!local_store.partition_written_disk_map.contains_key(&app_id)); Ok(()) } @@ -968,109 +609,4 @@ mod test { temp_dir.close().unwrap(); } - - #[test] - fn test_local_disk_delete_operation() { - let temp_dir = tempdir::TempDir::new("test_local_disk_delete_operation-dir").unwrap(); - let temp_path = temp_dir.path().to_str().unwrap().to_string(); - - println!("init the path: {}", &temp_path); - - let runtime: RuntimeManager = Default::default(); - let local_disk = LocalDisk::new( - temp_path.clone(), - LocalDiskConfig::default(), - runtime.clone(), - ); - - let data = b"hello!"; - runtime - .wait(local_disk.write(Bytes::copy_from_slice(data), "a/b".to_string())) - .unwrap(); - - assert_eq!( - true, - runtime - .wait(tokio::fs::try_exists(format!( - "{}/{}", - &temp_path, - "a/b".to_string() - ))) - .unwrap() - ); - - runtime - .wait(local_disk.delete("a/".to_string())) - .expect("TODO: panic message"); - assert_eq!( - false, - runtime - .wait(tokio::fs::try_exists(format!( - "{}/{}", - &temp_path, - "a/b".to_string() - ))) - .unwrap() - ); - } - - #[test] - fn local_disk_corruption_healthy_check() { - let temp_dir = tempdir::TempDir::new("test_directory").unwrap(); - let temp_path = temp_dir.path().to_str().unwrap().to_string(); - - let local_disk = LocalDisk::new( - temp_path.clone(), - LocalDiskConfig::create_mocked_config(), - Default::default(), - ); - - thread::sleep(Duration::from_secs(12)); - assert_eq!(true, local_disk.is_healthy().unwrap()); - assert_eq!(false, local_disk.is_corrupted().unwrap()); - } - - #[test] - fn local_disk_test() { - let temp_dir = tempdir::TempDir::new("test_directory").unwrap(); - let temp_path = temp_dir.path().to_str().unwrap().to_string(); - - let runtime: RuntimeManager = Default::default(); - let local_disk = LocalDisk::new( - temp_path.clone(), - LocalDiskConfig::default(), - runtime.clone(), - ); - - let data = b"Hello, World!"; - - let relative_path = "app-id/test_file.txt"; - let write_result = - runtime.wait(local_disk.write(Bytes::copy_from_slice(data), relative_path.to_string())); - assert!(write_result.is_ok()); - - // test whether the content is written - let file_path = format!("{}/{}", local_disk.base_path, relative_path); - let mut file = std::fs::File::open(file_path).unwrap(); - let mut file_content = Vec::new(); - file.read_to_end(&mut file_content).unwrap(); - assert_eq!(file_content, data); - - // if the file has been created, append some content - let write_result = - runtime.wait(local_disk.write(Bytes::copy_from_slice(data), relative_path.to_string())); - assert!(write_result.is_ok()); - - let read_result = runtime.wait(local_disk.read( - relative_path.to_string(), - 0, - Some(data.len() as i64 * 2), - )); - assert!(read_result.is_ok()); - let read_data = read_result.unwrap(); - let expected = b"Hello, World!Hello, World!"; - assert_eq!(read_data.as_ref(), expected); - - temp_dir.close().unwrap(); - } } diff --git a/rust/experimental/server/src/store/mod.rs b/rust/experimental/server/src/store/mod.rs index f29c7bf564..caf00a0686 100644 --- a/rust/experimental/server/src/store/mod.rs +++ b/rust/experimental/server/src/store/mod.rs @@ -18,6 +18,7 @@ #[cfg(feature = "hdfs")] pub mod hdfs; pub mod hybrid; +pub mod local; pub mod localfile; pub mod mem; pub mod memory;