From 114b8d35cffcbe078b21b7a13f84f4e72680e5de Mon Sep 17 00:00:00 2001 From: Alexis Asseman Date: Fri, 13 Oct 2023 17:37:42 -0700 Subject: [PATCH] WIP Signed-off-by: Alexis Asseman --- ...77f92c832fb3ae514c27a3293cb899e5e246d.json | 29 + Cargo.lock | 844 ++++++++++++++++-- tap_agent/Cargo.toml | 4 + tap_agent/src/agent.rs | 6 +- tap_agent/src/config.rs | 24 + tap_agent/src/main.rs | 12 +- tap_agent/src/tap/escrow_adapter.rs | 8 +- tap_agent/src/tap/manager.rs | 148 ++- tap_agent/src/tap/managers.rs | 126 +-- 9 files changed, 1045 insertions(+), 156 deletions(-) create mode 100644 .sqlx/query-d7e47b7dd017573de0e46f6f86377f92c832fb3ae514c27a3293cb899e5e246d.json diff --git a/.sqlx/query-d7e47b7dd017573de0e46f6f86377f92c832fb3ae514c27a3293cb899e5e246d.json b/.sqlx/query-d7e47b7dd017573de0e46f6f86377f92c832fb3ae514c27a3293cb899e5e246d.json new file mode 100644 index 00000000..6041e759 --- /dev/null +++ b/.sqlx/query-d7e47b7dd017573de0e46f6f86377f92c832fb3ae514c27a3293cb899e5e246d.json @@ -0,0 +1,29 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT MAX(id), SUM(value)\n FROM scalar_tap_receipts\n WHERE allocation_id = $1 AND sender_address = $2\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "max", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "sum", + "type_info": "Numeric" + } + ], + "parameters": { + "Left": [ + "Bpchar", + "Bpchar" + ] + }, + "nullable": [ + null, + null + ] + }, + "hash": "d7e47b7dd017573de0e46f6f86377f92c832fb3ae514c27a3293cb899e5e246d" +} diff --git a/Cargo.lock b/Cargo.lock index 115b0550..7d7202ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,6 +61,15 @@ dependencies = [ "version_check", ] +[[package]] +name = "aho-corasick" +version = "0.7.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac" +dependencies = [ + "memchr", +] + [[package]] name = "aho-corasick" version = "1.0.1" @@ -70,6 +79,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "allocator-api2" version = "0.2.16" @@ -95,6 +119,26 @@ dependencies = [ "tiny-keccak", ] +[[package]] +name = "alloy-primitives" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0628ec0ba5b98b3370bb6be17b12f23bfce8ee4ad83823325a20546d9b03b78" +dependencies = [ + "alloy-rlp", + "bytes", + "cfg-if", + "const-hex", + "derive_more", + "hex-literal", + "itoa", + "proptest", + "rand 0.8.5", + "ruint", + "serde", + "tiny-keccak", +] + [[package]] name = "alloy-rlp" version = "0.3.2" @@ -117,7 +161,24 @@ dependencies = [ "proc-macro2", "quote", "syn 2.0.28", - "syn-solidity", + "syn-solidity 0.3.2", + "tiny-keccak", +] + +[[package]] +name = "alloy-sol-macro" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a98ad1696a2e17f010ae8e43e9f2a1e930ed176a8e3ff77acfeff6dfb07b42c" +dependencies = [ + "const-hex", + "dunce", + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 2.0.28", + "syn-solidity 0.4.2", "tiny-keccak", ] @@ -127,8 +188,20 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eaa7c9a4354b1ff9f1c85adf22802af046e20e4bb55e19b9dc6ca8cbc6f7f4e5" dependencies = [ - "alloy-primitives", - "alloy-sol-macro", + "alloy-primitives 0.3.3", + "alloy-sol-macro 0.3.2", + "const-hex", + "serde", +] + +[[package]] +name = "alloy-sol-types" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98d7107bed88e8f09f0ddcc3335622d87bfb6821f3e0c7473329fb1cfad5e015" +dependencies = [ + "alloy-primitives 0.4.2", + "alloy-sol-macro 0.4.2", "const-hex", "serde", ] @@ -199,6 +272,130 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" +[[package]] +name = "ark-ff" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b3235cc41ee7a12aaaf2c575a2ad7b46713a8a50bda2fc3b003a04845c05dd6" +dependencies = [ + "ark-ff-asm 0.3.0", + "ark-ff-macros 0.3.0", + "ark-serialize 0.3.0", + "ark-std 0.3.0", + "derivative", + "num-bigint", + "num-traits", + "paste", + "rustc_version 0.3.3", + "zeroize", +] + +[[package]] +name = "ark-ff" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec847af850f44ad29048935519032c33da8aa03340876d351dfab5660d2966ba" +dependencies = [ + "ark-ff-asm 0.4.2", + "ark-ff-macros 0.4.2", + "ark-serialize 0.4.2", + "ark-std 0.4.0", + "derivative", + "digest 0.10.7", + "itertools 0.10.5", + "num-bigint", + "num-traits", + "paste", + "rustc_version 0.4.0", + "zeroize", +] + +[[package]] +name = "ark-ff-asm" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db02d390bf6643fb404d3d22d31aee1c4bc4459600aef9113833d17e786c6e44" +dependencies = [ + "quote", + "syn 1.0.109", +] + +[[package]] +name = "ark-ff-asm" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ed4aa4fe255d0bc6d79373f7e31d2ea147bcf486cba1be5ba7ea85abdb92348" +dependencies = [ + "quote", + "syn 1.0.109", +] + +[[package]] +name = "ark-ff-macros" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db2fd794a08ccb318058009eefdf15bcaaaaf6f8161eb3345f907222bac38b20" +dependencies = [ + "num-bigint", + "num-traits", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "ark-ff-macros" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7abe79b0e4288889c4574159ab790824d0033b9fdcb2a112a3182fac2e514565" +dependencies = [ + "num-bigint", + "num-traits", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "ark-serialize" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d6c2b318ee6e10f8c2853e73a83adc0ccb88995aa978d8a3408d492ab2ee671" +dependencies = [ + "ark-std 0.3.0", + "digest 0.9.0", +] + +[[package]] +name = "ark-serialize" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adb7b85a02b83d2f22f89bd5cac66c9c89474240cb6207cb1efc16d098e822a5" +dependencies = [ + "ark-std 0.4.0", + "digest 0.10.7", + "num-bigint", +] + +[[package]] +name = "ark-std" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1df2c09229cbc5a028b1d70e00fdb2acee28b1055dfb5ca73eea49c5a25c4e7c" +dependencies = [ + "num-traits", + "rand 0.8.5", +] + +[[package]] +name = "ark-std" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94893f1e0c6eeab764ade8dc4c0db24caf4fe7cbbaafc0eba0a9030f447b5185" +dependencies = [ + "num-traits", + "rand 0.8.5", +] + [[package]] name = "array-init" version = "0.0.4" @@ -256,6 +453,22 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-compression" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f658e2baef915ba0f26f1f7c42bfb8e12f532a01f449a090ded75ae7a07e9ba2" +dependencies = [ + "brotli", + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "zstd 0.13.0", + "zstd-safe 7.0.0", +] + [[package]] name = "async-graphql" version = "4.0.16" @@ -296,7 +509,7 @@ checksum = "c91ac174c05670edffb720bc376b9d4c274c3d127ac08ed3d38144c9415502cd" dependencies = [ "async-graphql", "async-trait", - "axum", + "axum 0.5.17", "bytes", "futures-util", "http-body", @@ -386,7 +599,7 @@ checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" dependencies = [ "futures", "pharos", - "rustc_version", + "rustc_version 0.4.0", ] [[package]] @@ -462,7 +675,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acee9fd5073ab6b045a275b3e709c163dd36c90685219cb21804a147b58dba43" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.2.9", "base64 0.13.1", "bitflags 1.3.2", "bytes", @@ -472,7 +685,7 @@ dependencies = [ "http-body", "hyper", "itoa", - "matchit", + "matchit 0.5.0", "memchr", "mime", "percent-encoding", @@ -480,7 +693,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sha-1", + "sha-1 0.10.1", "sync_wrapper", "tokio", "tokio-tungstenite 0.17.2", @@ -490,6 +703,38 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core 0.3.4", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit 0.7.3", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", +] + [[package]] name = "axum-core" version = "0.2.9" @@ -506,6 +751,23 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.68" @@ -551,6 +813,15 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dabbe35f96fb9507f7330793dc490461b2962659ac5d427181e451a623751d1" +[[package]] +name = "beef" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a8241f3ebb85c056b509d4327ad0358fbbba6ffb340bf388f26350aeda225b1" +dependencies = [ + "serde", +] + [[package]] name = "bigdecimal" version = "0.3.1" @@ -686,6 +957,27 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "brotli" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "516074a47ef4bce09577a3b379392300159ce5b1ba2e501ff1c819950066100f" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da74e2b81409b1b743f8f0c62cc6254afefb8b8e50bbfe3735550f7aeefa3448" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bs58" version = "0.4.0" @@ -704,6 +996,16 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "bstr" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c79ad7fb2dd38f3dabd76b09c6a5a20c038fc0213ef1e9afd30eb777f120f019" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "bumpalo" version = "3.13.0" @@ -812,7 +1114,7 @@ checksum = "e7daec1a2a2129eeba1644b220b4647ec537b0b5d4bfd6876fcc5a540056b592" dependencies = [ "camino", "cargo-platform", - "semver", + "semver 1.0.17", "serde", "serde_json", "thiserror", @@ -1303,6 +1605,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "derivative" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "derive_more" version = "0.99.17" @@ -1312,7 +1625,7 @@ dependencies = [ "convert_case", "proc-macro2", "quote", - "rustc_version", + "rustc_version 0.4.0", "syn 1.0.109", ] @@ -1753,7 +2066,7 @@ checksum = "0e53451ea4a8128fbce33966da71132cf9e1040dcfd2a2084fd7733ada7b2045" dependencies = [ "ethers-core", "reqwest", - "semver", + "semver 1.0.17", "serde", "serde_json", "thiserror", @@ -1862,7 +2175,7 @@ dependencies = [ "path-slash", "rayon", "regex", - "semver", + "semver 1.0.17", "serde", "serde_json", "solang-parser", @@ -1927,6 +2240,17 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" +[[package]] +name = "fastrlp" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "139834ddba373bbdd213dffe02c8d110508dcf1726c2be27e8d1f7d7e1856418" +dependencies = [ + "arrayvec", + "auto_impl", + "bytes", +] + [[package]] name = "faux" version = "0.1.10" @@ -2243,6 +2567,19 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "globset" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "029d74589adefde59de1a0c4f4732695c32805624aec7b68d91503d4dba79afc" +dependencies = [ + "aho-corasick 0.7.20", + "bstr", + "fnv", + "log", + "regex", +] + [[package]] name = "gloo-timers" version = "0.2.6" @@ -2533,7 +2870,9 @@ checksum = "0646026eb1b3eea4cd9ba47912ea5ce9cc07713d105b1a14698f4e6433d348b7" dependencies = [ "http", "hyper", + "log", "rustls", + "rustls-native-certs", "tokio", "tokio-rustls", ] @@ -2615,7 +2954,7 @@ checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" name = "indexer-common" version = "0.1.0" dependencies = [ - "alloy-primitives", + "alloy-primitives 0.3.3", "anyhow", "arc-swap", "bs58 0.5.0", @@ -2641,8 +2980,8 @@ dependencies = [ name = "indexer_tap_agent" version = "0.1.0" dependencies = [ - "alloy-primitives", - "alloy-sol-types", + "alloy-primitives 0.3.3", + "alloy-sol-types 0.3.2", "anyhow", "async-trait", "clap", @@ -2653,6 +2992,7 @@ dependencies = [ "eventuals", "faux", "indexer-common", + "jsonrpsee 0.20.2", "lazy_static", "log", "reqwest", @@ -2660,9 +3000,12 @@ dependencies = [ "serde", "serde_json", "sqlx", + "tap_aggregator", "tap_core 0.5.1 (git+https://github.com/semiotic-ai/timeline-aggregation-protocol.git?branch=aasseman/rav_bulk_receipt_checks)", "thiserror", "tokio", + "tower", + "tower-http 0.4.4", "tracing", "tracing-subscriber", ] @@ -2724,63 +3067,226 @@ dependencies = [ ] [[package]] -name = "ipnet" -version = "2.7.2" +name = "ipnet" +version = "2.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12b6ee2129af8d4fb011108c73d99a1b83a85977f23b82460c0ae2e25bb4b57f" + +[[package]] +name = "is-terminal" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f" +dependencies = [ + "hermit-abi 0.3.1", + "io-lifetimes", + "rustix 0.37.19", + "windows-sys 0.48.0", +] + +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + +[[package]] +name = "itertools" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +dependencies = [ + "either", +] + +[[package]] +name = "itoa" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" + +[[package]] +name = "jobserver" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2" +dependencies = [ + "libc", +] + +[[package]] +name = "js-sys" +version = "0.3.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f37a4a5928311ac501dee68b3c7613a1037d0edb30c8e5427bd832d55d1b790" +dependencies = [ + "wasm-bindgen", +] + +[[package]] +name = "jsonrpsee" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1822d18e4384a5e79d94dc9e4d1239cfa9fad24e55b44d2efeff5b394c9fece4" +dependencies = [ + "jsonrpsee-core 0.18.2", + "jsonrpsee-proc-macros 0.18.2", + "jsonrpsee-server", + "jsonrpsee-types 0.18.2", + "tracing", +] + +[[package]] +name = "jsonrpsee" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de902baa44bf34a58b1a4906f8b840d7d60dcec5f41fe08b4dbc14cf9efa821c" +dependencies = [ + "jsonrpsee-core 0.20.2", + "jsonrpsee-http-client", + "jsonrpsee-proc-macros 0.20.2", + "jsonrpsee-types 0.20.2", + "tracing", +] + +[[package]] +name = "jsonrpsee-core" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64c6832a55f662b5a6ecc844db24b8b9c387453f923de863062c60ce33d62b81" +dependencies = [ + "anyhow", + "async-trait", + "beef", + "futures-util", + "globset", + "hyper", + "jsonrpsee-types 0.18.2", + "parking_lot", + "rand 0.8.5", + "rustc-hash", + "serde", + "serde_json", + "soketto", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "jsonrpsee-core" +version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12b6ee2129af8d4fb011108c73d99a1b83a85977f23b82460c0ae2e25bb4b57f" +checksum = "51f45d37af23707750136379f6799e76ebfcf2d425ec4e36d0deb7921da5e65c" +dependencies = [ + "anyhow", + "async-trait", + "beef", + "futures-util", + "hyper", + "jsonrpsee-types 0.20.2", + "serde", + "serde_json", + "thiserror", + "tokio", + "tracing", +] [[package]] -name = "is-terminal" -version = "0.4.7" +name = "jsonrpsee-http-client" +version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f" +checksum = "02308562f2e8162a32f8d6c3dc19c29c858d5d478047c886a5c3c25b5f7fa868" dependencies = [ - "hermit-abi 0.3.1", - "io-lifetimes", - "rustix 0.37.19", - "windows-sys 0.48.0", + "async-trait", + "hyper", + "hyper-rustls", + "jsonrpsee-core 0.20.2", + "jsonrpsee-types 0.20.2", + "serde", + "serde_json", + "thiserror", + "tokio", + "tower", + "tracing", + "url", ] [[package]] -name = "itertools" -version = "0.10.5" +name = "jsonrpsee-proc-macros" +version = "0.18.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +checksum = "c6027ac0b197ce9543097d02a290f550ce1d9432bf301524b013053c0b75cc94" dependencies = [ - "either", + "heck", + "proc-macro-crate 1.3.1", + "proc-macro2", + "quote", + "syn 1.0.109", ] [[package]] -name = "itertools" -version = "0.11.0" +name = "jsonrpsee-proc-macros" +version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +checksum = "f26b3675a943d083d0bf6e367ec755dccec56c41888afa13b191c1c4ff87c652" dependencies = [ - "either", + "heck", + "proc-macro-crate 1.3.1", + "proc-macro2", + "quote", + "syn 1.0.109", ] [[package]] -name = "itoa" -version = "1.0.6" +name = "jsonrpsee-server" +version = "0.18.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" +checksum = "4f06661d1a6b6e5b85469dc9c29acfbb9b3bb613797a6fd10a3ebb8a70754057" +dependencies = [ + "futures-util", + "hyper", + "jsonrpsee-core 0.18.2", + "jsonrpsee-types 0.18.2", + "serde", + "serde_json", + "soketto", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tracing", +] [[package]] -name = "jobserver" -version = "0.1.26" +name = "jsonrpsee-types" +version = "0.18.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2" +checksum = "6e5bf6c75ce2a4217421154adfc65a24d2b46e77286e59bba5d9fa6544ccc8f4" dependencies = [ - "libc", + "anyhow", + "beef", + "serde", + "serde_json", + "thiserror", + "tracing", ] [[package]] -name = "js-sys" -version = "0.3.63" +name = "jsonrpsee-types" +version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f37a4a5928311ac501dee68b3c7613a1037d0edb30c8e5427bd832d55d1b790" +checksum = "05eaff23af19f10ba6fbb76519bed6da4d3b9bbaef13d39b7c2b6c14e532d27e" dependencies = [ - "wasm-bindgen", + "anyhow", + "beef", + "serde", + "serde_json", + "thiserror", + "tracing", ] [[package]] @@ -2990,6 +3496,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "maybe-uninit" version = "2.0.0" @@ -3580,7 +4092,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414" dependencies = [ "futures", - "rustc_version", + "rustc_version 0.4.0", ] [[package]] @@ -4046,7 +4558,7 @@ version = "1.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" dependencies = [ - "aho-corasick", + "aho-corasick 1.0.1", "memchr", "regex-automata 0.3.8", "regex-syntax 0.7.5", @@ -4067,7 +4579,7 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" dependencies = [ - "aho-corasick", + "aho-corasick 1.0.1", "memchr", "regex-syntax 0.7.5", ] @@ -4262,7 +4774,7 @@ dependencies = [ "futures", "futures-timer", "rstest_macros 0.17.0", - "rustc_version", + "rustc_version 0.4.0", ] [[package]] @@ -4274,7 +4786,7 @@ dependencies = [ "futures", "futures-timer", "rstest_macros 0.18.2", - "rustc_version", + "rustc_version 0.4.0", ] [[package]] @@ -4286,7 +4798,7 @@ dependencies = [ "cfg-if", "proc-macro2", "quote", - "rustc_version", + "rustc_version 0.4.0", "syn 1.0.109", "unicode-ident", ] @@ -4303,7 +4815,7 @@ dependencies = [ "quote", "regex", "relative-path", - "rustc_version", + "rustc_version 0.4.0", "syn 2.0.28", "unicode-ident", ] @@ -4314,8 +4826,17 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95294d6e3a6192f3aabf91c38f56505a625aa495533442744185a36d75a790c4" dependencies = [ + "alloy-rlp", + "ark-ff 0.3.0", + "ark-ff 0.4.2", + "bytes", + "fastrlp", + "num-bigint", + "parity-scale-codec", + "primitive-types", "proptest", "rand 0.8.5", + "rlp", "ruint-macro", "serde", "valuable", @@ -4350,19 +4871,34 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hex" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e75f6a532d0fd9f7f13144f392b6ad56a32696bfcd9c78f797f16bbb6f072d6" +[[package]] +name = "rustc_version" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0dfe2087c51c460008730de8b57e6a320782fbfb312e1f4d520e6c6fae155ee" +dependencies = [ + "semver 0.11.0", +] + [[package]] name = "rustc_version" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" dependencies = [ - "semver", + "semver 1.0.17", ] [[package]] @@ -4404,6 +4940,18 @@ dependencies = [ "sct", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.2" @@ -4597,6 +5145,15 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6" +dependencies = [ + "semver-parser", +] + [[package]] name = "semver" version = "1.0.17" @@ -4606,6 +5163,15 @@ dependencies = [ "serde", ] +[[package]] +name = "semver-parser" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7" +dependencies = [ + "pest", +] + [[package]] name = "send_wrapper" version = "0.4.0" @@ -4660,6 +5226,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4beec8bce849d58d06238cb50db2e1c417cfeafa4c63f692b15c82b7c80f8335" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_qs" version = "0.8.5" @@ -4696,13 +5272,13 @@ dependencies = [ name = "service" version = "0.1.0" dependencies = [ - "alloy-primitives", - "alloy-sol-types", + "alloy-primitives 0.3.3", + "alloy-sol-types 0.3.2", "anyhow", "async-graphql", "async-graphql-axum", "autometrics", - "axum", + "axum 0.5.17", "cargo-husky", "clap", "confy", @@ -4729,12 +5305,25 @@ dependencies = [ "tokio", "toml 0.7.4", "tower", - "tower-http 0.4.0", + "tower-http 0.4.4", "tracing", "tracing-subscriber", "wiremock", ] +[[package]] +name = "sha-1" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6" +dependencies = [ + "block-buffer 0.9.0", + "cfg-if", + "cpufeatures", + "digest 0.9.0", + "opaque-debug", +] + [[package]] name = "sha-1" version = "0.10.1" @@ -4902,6 +5491,22 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "soketto" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d1c5305e39e09653383c2c7244f2f78b3bcae37cf50c64cb4789c9f5096ec2" +dependencies = [ + "base64 0.13.1", + "bytes", + "futures", + "http", + "httparse", + "log", + "rand 0.8.5", + "sha-1 0.9.8", +] + [[package]] name = "solang-parser" version = "0.3.2" @@ -5199,6 +5804,9 @@ name = "strum" version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" +dependencies = [ + "strum_macros 0.24.3", +] [[package]] name = "strum" @@ -5252,7 +5860,7 @@ dependencies = [ "hex", "once_cell", "reqwest", - "semver", + "semver 1.0.17", "serde", "serde_json", "sha2 0.10.7", @@ -5295,6 +5903,18 @@ dependencies = [ "syn 2.0.28", ] +[[package]] +name = "syn-solidity" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86b837ef12ab88835251726eb12237655e61ec8dc8a280085d1961cdc3dfd047" +dependencies = [ + "paste", + "proc-macro2", + "quote", + "syn 2.0.28", +] + [[package]] name = "sync_wrapper" version = "0.1.2" @@ -5307,14 +5927,42 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tap_aggregator" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e29a877040a076bcc3c3a442d2bef48456d9840b5db7c82512d65d249284a71" +dependencies = [ + "alloy-primitives 0.4.2", + "alloy-sol-types 0.4.2", + "anyhow", + "axum 0.6.20", + "clap", + "ethereum-types", + "ethers-core", + "ethers-signers", + "futures-util", + "jsonrpsee 0.18.2", + "lazy_static", + "log", + "prometheus", + "ruint", + "serde", + "serde_json", + "strum 0.24.1", + "tap_core 0.6.0", + "tokio", + "tracing-subscriber", +] + [[package]] name = "tap_core" version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2ef687d224045e494e4779d6d80cd9eca46f0c204c413d2b081e015241fa5597" dependencies = [ - "alloy-primitives", - "alloy-sol-types", + "alloy-primitives 0.3.3", + "alloy-sol-types 0.3.2", "anyhow", "async-trait", "ethereum-types", @@ -5337,8 +5985,33 @@ name = "tap_core" version = "0.5.1" source = "git+https://github.com/semiotic-ai/timeline-aggregation-protocol.git?branch=aasseman/rav_bulk_receipt_checks#26242a2bf56f125f2ff8628d7ba92acbc1e25208" dependencies = [ - "alloy-primitives", - "alloy-sol-types", + "alloy-primitives 0.3.3", + "alloy-sol-types 0.3.2", + "anyhow", + "async-trait", + "ethereum-types", + "ethers", + "ethers-contract", + "ethers-contract-derive", + "ethers-core", + "rand 0.8.5", + "rand_core 0.6.4", + "rstest 0.17.0", + "serde", + "strum 0.24.1", + "strum_macros 0.24.3", + "thiserror", + "tokio", +] + +[[package]] +name = "tap_core" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6de9bc4538c13ca369b1ff35d31be8eaeb955bed3805ce64d8fd2aee4f0ec0c7" +dependencies = [ + "alloy-primitives 0.4.2", + "alloy-sol-types 0.4.2", "anyhow", "async-trait", "ethereum-types", @@ -5665,11 +6338,12 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.4.0" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d1d42a9b3f3ec46ba828e8d376aec14592ea199f70a06a548587ecd1c4ab658" +checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" dependencies = [ - "bitflags 1.3.2", + "async-compression", + "bitflags 2.4.0", "bytes", "futures-core", "futures-util", @@ -5677,6 +6351,8 @@ dependencies = [ "http-body", "http-range-header", "pin-project-lite", + "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -5799,7 +6475,7 @@ dependencies = [ "httparse", "log", "rand 0.8.5", - "sha-1", + "sha-1 0.10.1", "thiserror", "url", "utf-8", @@ -6341,7 +7017,7 @@ dependencies = [ "js-sys", "log", "pharos", - "rustc_version", + "rustc_version 0.4.0", "send_wrapper 0.6.0", "thiserror", "wasm-bindgen", @@ -6369,6 +7045,20 @@ name = "zeroize" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" +dependencies = [ + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.28", +] [[package]] name = "zip" @@ -6387,7 +7077,7 @@ dependencies = [ "pbkdf2 0.11.0", "sha1", "time", - "zstd", + "zstd 0.11.2+zstd.1.5.2", ] [[package]] @@ -6396,7 +7086,16 @@ version = "0.11.2+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20cc960326ece64f010d2d2107537f26dc589a6573a316bd5b1dba685fa5fde4" dependencies = [ - "zstd-safe", + "zstd-safe 5.0.2+zstd.1.5.2", +] + +[[package]] +name = "zstd" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110" +dependencies = [ + "zstd-safe 7.0.0", ] [[package]] @@ -6409,6 +7108,15 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "zstd-safe" +version = "7.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e" +dependencies = [ + "zstd-sys", +] + [[package]] name = "zstd-sys" version = "2.0.8+zstd.1.5.5" diff --git a/tap_agent/Cargo.toml b/tap_agent/Cargo.toml index f69c0654..e6a96aff 100644 --- a/tap_agent/Cargo.toml +++ b/tap_agent/Cargo.toml @@ -19,15 +19,19 @@ dotenvy = "0.15.7" ethereum-types = "0.14.1" eventuals = "0.6.7" indexer-common = { version = "0.1.0", path = "../common" } +jsonrpsee = { version = "0.20.2", features = ["http-client", "macros"] } lazy_static = "1.4.0" log = "0.4.19" reqwest = "0.11.20" serde = "1.0.188" serde_json = "1.0.104" sqlx = { version = "0.7.1", features = ["postgres", "runtime-tokio", "bigdecimal", "rust_decimal"] } +tap_aggregator = "0.1.6" tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol.git", branch = "aasseman/rav_bulk_receipt_checks", version = "0.5.1" } thiserror = "1.0.44" tokio = { version = "1.29.1", features = ["rt"] } +tower = "0.4.13" +tower-http = { version = "0.4.4", features = ["compression-full", "decompression-full", "set-header"] } tracing = "0.1.37" tracing-subscriber = { version = "0.3", features = [ "env-filter", diff --git a/tap_agent/src/agent.rs b/tap_agent/src/agent.rs index 470a5612..b04fa10b 100644 --- a/tap_agent/src/agent.rs +++ b/tap_agent/src/agent.rs @@ -4,7 +4,7 @@ use indexer_common::prelude::{ AllocationMonitor, EscrowMonitor, GraphNodeInstance, NetworkSubgraph, }; -pub async fn start_agent(config: config::Cli) { +pub async fn start_agent(config: &'static config::Cli) { let pgpool = database::connect(&config.postgres).await; // We expect this to be called only once and to be used for the lifetime of the @@ -37,12 +37,10 @@ pub async fn start_agent(config: config::Cli) { .await .unwrap(); let _managers = managers::TapManagers::new( + config, pgpool, allocation_monitor, escrow_monitor, - config.escrow_subgraph.escrow_subgraph_deployment, - config.escrow_subgraph.escrow_syncing_interval, - config.ethereum.indexer_address, // TODO: chain id config 0, // TODO: tap eip712 version config diff --git a/tap_agent/src/config.rs b/tap_agent/src/config.rs index 8de8a27e..a1443560 100644 --- a/tap_agent/src/config.rs +++ b/tap_agent/src/config.rs @@ -25,6 +25,8 @@ pub struct Cli { pub network_subgraph: NetworkSubgraph, #[command(flatten)] pub escrow_subgraph: EscrowSubgraph, + #[command(flatten)] + pub tap: TAP, #[arg( short, value_name = "config", @@ -268,6 +270,28 @@ pub struct EscrowSubgraph { pub escrow_syncing_interval: u64, } +#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] +#[group(required = true, multiple = true)] +pub struct TAP { + #[clap( + long, + value_name = "rav-request-trigger-value", + env = "RAV_REQUEST_TRIGGER_VALUE", + help = "Value of unaggregated fees that triggers a RAV request (in GRT).", + default_value_t = 10 + )] + pub rav_request_trigger_value: u64, + #[clap( + long, + value_name = "rav-request-timestamp-buffer", + env = "RAV_REQUEST_TIMESTAMP_BUFFER", + help = "Buffer (in ns) to add between the current time and the timestamp of the \ + last unaggregated fee when triggering a RAV request.", + default_value_t = 1000 + )] + pub rav_request_timestamp_buffer_ns: u64, +} + /// Sets up tracing, allows log level to be set from the environment variables fn init_tracing(format: String) -> Result<(), SetGlobalDefaultError> { let filter = EnvFilter::from_default_env(); diff --git a/tap_agent/src/main.rs b/tap_agent/src/main.rs index 78603f57..8fec3923 100644 --- a/tap_agent/src/main.rs +++ b/tap_agent/src/main.rs @@ -1,6 +1,7 @@ // Copyright 2023-, GraphOps and Semiotic Labs. SPDX-License-Identifier: Apache-2.0 use crate::config::Cli; use anyhow::Result; +use lazy_static::lazy_static; use log::{debug, info}; use tokio::signal::unix::{signal, SignalKind}; @@ -9,17 +10,22 @@ mod config; mod database; mod tap; +lazy_static!{ + pub static ref CONFIG: Cli = Cli::args(); +} + #[tokio::main] async fn main() -> Result<()> { // Parse basic configurations, also initializes logging. - let config = Cli::args(); - debug!("Config: {:?}", config); + lazy_static::initialize(&CONFIG); + debug!("Config: {:?}", *CONFIG); // START THE SERVER Have tokio wait for SIGTERM or SIGINT. let mut signal_sigint = signal(SignalKind::interrupt())?; let mut signal_sigterm = signal(SignalKind::terminate())?; - let _agent = agent::start_agent(config).await; + agent::start_agent(&CONFIG).await; + debug!("Agent started."); tokio::select! { _ = signal_sigint.recv() => debug !("Received SIGINT."), diff --git a/tap_agent/src/tap/escrow_adapter.rs b/tap_agent/src/tap/escrow_adapter.rs index ba190089..3a6a8b7b 100644 --- a/tap_agent/src/tap/escrow_adapter.rs +++ b/tap_agent/src/tap/escrow_adapter.rs @@ -51,8 +51,12 @@ impl EscrowAdapterTrait for EscrowAdapter { .to_string(), })?; let balance: u128 = balance.try_into().map_err(|_| AdapterError::AdapterError { - error: format!("Gateway {} escrow balance is too large to fit in u128, \ - could not get available escrow.", sender).to_string(), + error: format!( + "Gateway {} escrow balance is too large to fit in u128, \ + could not get available escrow.", + sender + ) + .to_string(), })?; let fees = self .sender_pending_fees diff --git a/tap_agent/src/tap/manager.rs b/tap_agent/src/tap/manager.rs index c0d3fe12..1c2bfdf8 100644 --- a/tap_agent/src/tap/manager.rs +++ b/tap_agent/src/tap/manager.rs @@ -1,14 +1,28 @@ use super::managers::NewReceiptNotification; -use crate::tap::{ - escrow_adapter::EscrowAdapter, rav_storage_adapter::RAVStorageAdapter, - receipt_checks_adapter::ReceiptChecksAdapter, receipt_storage_adapter::ReceiptStorageAdapter, +use crate::{ + config::{self, TAP}, + tap::{ + escrow_adapter::EscrowAdapter, rav_storage_adapter::RAVStorageAdapter, + receipt_checks_adapter::ReceiptChecksAdapter, + receipt_storage_adapter::ReceiptStorageAdapter, + }, }; use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; use indexer_common::prelude::{EscrowMonitor, GraphNodeInstance}; +use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params}; use log::error; use sqlx::PgPool; -use tap_core::tap_receipt::get_full_list_of_checks; +use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse; +use tap_core::{ + eip_712_signed_message::EIP712SignedMessage, + receipt_aggregate_voucher::ReceiptAggregateVoucher, tap_manager::RAVRequest, + tap_receipt::get_full_list_of_checks, +}; +use tower_http::{ + compression::CompressionLayer, + decompression::{Decompression, DecompressionLayer}, +}; type TapManager = tap_core::tap_manager::Manager< EscrowAdapter, @@ -18,20 +32,23 @@ type TapManager = tap_core::tap_manager::Manager< >; pub struct Manager { + config: &'static config::Cli, + pgpool: PgPool, tap_manager: TapManager, + allocation_id: Address, + sender: Address, unaggregated_fees: u128, unaggregated_fees_last_id: u64, + rav_requester_task: Option>, } impl Manager { pub fn new( + config: &'static config::Cli, allocation_id: Address, sender: Address, escrow_monitor: EscrowMonitor, pgpool: PgPool, - escrow_subgraph_deployment: String, - escrow_subgraph_polling_interval: u64, - indexer_address: Address, tap_eip712_domain_separator: Eip712Domain, graph_node_instance: GraphNodeInstance, ) -> Self { @@ -43,10 +60,10 @@ impl Manager { allocation_id, escrow_monitor.clone(), graph_node_instance, - escrow_subgraph_deployment, - escrow_subgraph_polling_interval, + config.escrow_subgraph.escrow_subgraph_deployment.clone(), + config.escrow_subgraph.escrow_syncing_interval, sender, - indexer_address, + config.ethereum.indexer_address, ); let receipt_storage_adapter = ReceiptStorageAdapter::new(pgpool.clone(), allocation_id, sender); @@ -61,35 +78,130 @@ impl Manager { 0, ); Self { + config, + pgpool, tap_manager, + allocation_id, + sender, unaggregated_fees: 0, unaggregated_fees_last_id: 0, + rav_requester_task: None, } } pub fn handle_new_receipt_notification( &mut self, - new_receipt_notifiacation: NewReceiptNotification, + new_receipt_notification: NewReceiptNotification, ) { // Else we already processed that receipt, most likely from pulling the receipts // from the database. - if new_receipt_notifiacation.id > self.unaggregated_fees_last_id { + if new_receipt_notification.id > self.unaggregated_fees_last_id { self.unaggregated_fees = self .unaggregated_fees - .checked_add(new_receipt_notifiacation.value) + .checked_add(new_receipt_notification.value) .unwrap_or_else(|| { // This should never happen, but if it does, we want to know about it. error!( "Overflow when adding receipt value {} to total unaggregated fees {} for \ - allocation {} and sender {}. Setting total unaggregated fees to u128::MAX.", - new_receipt_notifiacation.value, + allocation {} and sender {}. Setting total unaggregated fees to u128::MAX.", + new_receipt_notification.value, self.unaggregated_fees, - new_receipt_notifiacation.allocation_id, - new_receipt_notifiacation.sender_address + new_receipt_notification.allocation_id, + new_receipt_notification.sender_address ); u128::MAX }); - self.unaggregated_fees_last_id = new_receipt_notifiacation.id; + self.unaggregated_fees_last_id = new_receipt_notification.id; + } + } + + /// Delete obsolete receipts in the DB w.r.t. the last RAV in DB, then update the tap manager + /// with the latest unaggregated fees from the database. + pub async fn update_unaggregated_fees(&mut self) -> Result<(), anyhow::Error> { + self.tap_manager.remove_obsolete_receipts().await?; + + let res = sqlx::query!( + r#" + SELECT MAX(id), SUM(value) + FROM scalar_tap_receipts + WHERE allocation_id = $1 AND sender_address = $2 + "#, + self.allocation_id + .to_string() + .strip_prefix("0x") + .unwrap() + .to_owned(), + self.sender + .to_string() + .strip_prefix("0x") + .unwrap() + .to_owned() + ) + .fetch_optional(&self.pgpool) + .await?; + + match res { + Some(res) => { + self.unaggregated_fees_last_id = res.max.unwrap_or(0).try_into()?; + self.unaggregated_fees = res.sum.unwrap_or(0.into()).to_string().parse::()?; + } + None => { + self.unaggregated_fees_last_id = 0; + self.unaggregated_fees = 0; + } + } + + Ok(()) + } + + async fn rav_requester( + manager: TapManager, + rav_request_timestamp_buffer_ns: u64, + sender_aggregator_endpoint: &str, + ) { + let RAVRequest { + valid_receipts, + previous_rav, + invalid_receipts, + expected_rav, + } = manager + .create_rav_request(rav_request_timestamp_buffer_ns) + .await + .unwrap(); + + // TODO: Request compression and response decompression. Also a fancy user agent? + let client = HttpClientBuilder::default() + .build(sender_aggregator_endpoint) + .unwrap(); + + let response: JsonRpcResponse> = client + .request( + "aggregate_receipts", + rpc_params!( + "0.0", // TODO: Set the version in a smarter place. + valid_receipts, + previous_rav + ), + ) + .await + .unwrap(); + + if let Some(warnings) = response.warnings { + for warning in warnings { + error!("Warning from sender's TAP aggregator: {:?}", warning); + } + } + + manager.verify_and_store_rav(expected_rav, response.data).await.unwrap(); + } +} + +// Destructor +impl Drop for Manager { + fn drop(&mut self) { + // Cancel the rav_requester task. + if let Some(rav_requester_task) = self.rav_requester_task.take() { + rav_requester_task.abort(); } } } diff --git a/tap_agent/src/tap/managers.rs b/tap_agent/src/tap/managers.rs index f425de61..4f5b43d8 100644 --- a/tap_agent/src/tap/managers.rs +++ b/tap_agent/src/tap/managers.rs @@ -1,3 +1,5 @@ +use crate::config; + use super::manager::Manager; use alloy_primitives::Address; use alloy_sol_types::{eip712_domain, Eip712Domain}; @@ -6,7 +8,7 @@ use log::warn; use serde::Deserialize; use sqlx::{postgres::PgListener, PgPool}; use std::{collections::HashMap, str::FromStr, sync::Arc}; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; #[derive(Deserialize)] pub struct NewReceiptNotification { @@ -18,14 +20,12 @@ pub struct NewReceiptNotification { } pub struct TapManagers { + config: &'static config::Cli, pgpool: PgPool, /// Map of (allocation_id, sender_address) to Manager. managers: Arc>>>, allocation_monitor: AllocationMonitor, escrow_monitor: EscrowMonitor, - escrow_subgraph_deployment: String, - escrow_subgraph_polling_interval: u64, - indexer_address: Address, tap_eip712_domain_separator: Eip712Domain, graph_node_instance: GraphNodeInstance, new_receipts_watcher: Option>, @@ -33,12 +33,10 @@ pub struct TapManagers { impl TapManagers { pub async fn new( + config: &'static config::Cli, pgpool: PgPool, allocation_monitor: AllocationMonitor, escrow_monitor: EscrowMonitor, - escrow_subgraph_deployment: String, - escrow_subgraph_polling_interval: u64, - indexer_address: Address, tap_eip712_chain_id: u64, tap_eip712_version: String, tap_eip712_verifying_contract: Address, @@ -51,48 +49,45 @@ impl TapManagers { verifying_contract: tap_eip712_verifying_contract, }; let mut tap_manager = Self { + config, pgpool, managers: Arc::new(RwLock::new(HashMap::new())), allocation_monitor, escrow_monitor, - escrow_subgraph_deployment: escrow_subgraph_deployment.clone(), - escrow_subgraph_polling_interval, - indexer_address, tap_eip712_domain_separator, graph_node_instance: graph_node_instance.clone(), new_receipts_watcher: None, }; - // Create managers for all currently eligible (allocation, sender) - let eligible_allocations: Vec
= tap_manager - .allocation_monitor - .get_eligible_allocations() - .await - .keys() - .copied() - .collect(); - let senders: Vec
= tap_manager - .escrow_monitor - .get_accounts() - .await - .keys() - .copied() - .collect(); + // Create scope to limit the lifetime of the `managers_write` write lock. Otherwise + // `tap_manager` remains borrowed at the end of the function. { + // Create managers for all currently eligible (allocation, sender) + let eligible_allocations: Vec
= tap_manager + .allocation_monitor + .get_eligible_allocations() + .await + .keys() + .copied() + .collect(); + let senders: Vec
= tap_manager + .escrow_monitor + .get_accounts() + .await + .keys() + .copied() + .collect(); let mut managers_write = tap_manager.managers.write().await; for allocation_id in eligible_allocations { for sender in &senders { managers_write.insert( (allocation_id, *sender), - // tap_manager.new_manager(allocation_id, *sender), RwLock::new(Manager::new( + config, allocation_id, *sender, tap_manager.escrow_monitor.clone(), tap_manager.pgpool.clone(), - escrow_subgraph_deployment.clone(), - escrow_subgraph_polling_interval, - indexer_address, tap_manager.tap_eip712_domain_separator.clone(), graph_node_instance.clone(), )), @@ -124,41 +119,64 @@ impl TapManagers { managers_write.entry((allocation_id, sender)) { e.insert(RwLock::new(Manager::new( + config, allocation_id, sender, tap_manager.escrow_monitor.clone(), tap_manager.pgpool.clone(), - escrow_subgraph_deployment.clone(), - escrow_subgraph_polling_interval, - indexer_address, tap_manager.tap_eip712_domain_separator.clone(), graph_node_instance.clone(), ))); } }); + + // Listen to pg_notify events. We start it before updating the unaggregated_fees for + // all managers, so that we don't miss any receipts. PG will buffer the notifications + // until we start consuming them with `new_receipts_watcher`. + let listener = Arc::new(Mutex::new( + PgListener::connect_with(&tap_manager.pgpool.clone()) + .await + .unwrap(), + )); + listener + .lock() + .await + .listen("scalar_tap_receipt_notification") + .await + .expect( + "should be able to subscribe to Postgres Notify events on the channel \ + 'scalar_tap_receipt_notification'", + ); + + // Update the unaggregated_fees for all managers by pulling the receipts from the + // database. + for manager in managers_write.values() { + manager + .write() + .await + .update_unaggregated_fees() + .await + .expect("should be able to update unaggregated_fees"); + } + + // Start the new_receipts_watcher task. + tap_manager.new_receipts_watcher = Some(tokio::spawn(Self::new_receipts_watcher( + listener, + tap_manager.managers.clone(), + ))); } - tap_manager.new_receipts_watcher = Some(tokio::spawn(Self::new_receipts_watcher( - tap_manager.pgpool.clone(), - tap_manager.managers.clone(), - ))); + tap_manager } async fn new_receipts_watcher( - pgpool: PgPool, + pglistener: Arc>, managers: Arc>>>, ) { - // Listen to pg_notify events - let mut listener = PgListener::connect_with(&pgpool).await.unwrap(); - listener - .listen("scalar_tap_receipt_notification") - .await - .expect( - "should be able to subscribe to Postgres Notify events on the channel \ - 'scalar_tap_receipt_notification'", - ); + let mut pglistener = pglistener.lock().await; + loop { - let pg_notification = listener.try_recv().await.expect( + let pg_notification = pglistener.try_recv().await.expect( "should be able to receive Postgres Notify events on the channel \ 'scalar_tap_receipt_notification'", ); @@ -187,20 +205,6 @@ impl TapManagers { } } } - - fn new_manager(&self, allocation_id: Address, sender: Address) -> Manager { - Manager::new( - allocation_id, - sender, - self.escrow_monitor.clone(), - self.pgpool.clone(), - self.escrow_subgraph_deployment.clone(), - self.escrow_subgraph_polling_interval, - self.indexer_address, - self.tap_eip712_domain_separator.clone(), - self.graph_node_instance.clone(), - ) - } } // destructor