Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update dependencies to get rid of the yanked deps #4994

Merged
merged 10 commits into from
Dec 15, 2023
243 changes: 184 additions & 59 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -120,12 +120,12 @@ fnv = "1"
fs2 = "0.4"
futures = "0.3"
hex = "0.4"
hyper = "0.14"
hyper = "1"
itertools = "0.10"
lazy_static = "1"
libsecp256k1 = "0.7"
log = "0.4"
lru = "0.7"
lru = "0.12"
maplit = "1"
num_cpus = "1"
parking_lot = "0.12"
@@ -143,7 +143,7 @@ rusqlite = { version = "0.28", features = ["bundled"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_repr = "0.1"
serde_yaml = "0.8"
serde_yaml = "0.9"
sha2 = "0.9"
slog = { version = "2", features = ["max_level_trace", "release_max_level_trace", "nested-values"] }
slog-async = "2"
@@ -220,7 +220,7 @@ swap_or_not_shuffle = { path = "consensus/swap_or_not_shuffle" }
task_executor = { path = "common/task_executor" }
types = { path = "consensus/types" }
unused_port = { path = "common/unused_port" }
validator_client = { path = "validator_client/" }
validator_client = { path = "validator_client" }
validator_dir = { path = "common/validator_dir" }
warp_utils = { path = "common/warp_utils" }

4 changes: 3 additions & 1 deletion beacon_node/beacon_chain/src/beacon_proposer_cache.rs
Original file line number Diff line number Diff line change
@@ -14,13 +14,15 @@ use lru::LruCache;
use smallvec::SmallVec;
use state_processing::state_advance::partial_state_advance;
use std::cmp::Ordering;
use std::num::NonZeroUsize;
use types::non_zero_usize::new_non_zero_usize;
use types::{
BeaconState, BeaconStateError, ChainSpec, CloneConfig, Epoch, EthSpec, Fork, Hash256, Slot,
Unsigned,
};

/// The number of sets of proposer indices that should be cached.
const CACHE_SIZE: usize = 16;
const CACHE_SIZE: NonZeroUsize = new_non_zero_usize(16);

/// This value is fairly unimportant, it's used to avoid heap allocations. The result of it being
/// incorrect is non-substantial from a consensus perspective (and probably also from a
7 changes: 5 additions & 2 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ use slog::{debug, error, Logger};
use slot_clock::SlotClock;
use std::fmt;
use std::fmt::Debug;
use std::num::NonZeroUsize;
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::beacon_block_body::{KzgCommitmentOpts, KzgCommitments};
@@ -32,15 +33,17 @@ mod processing_cache;
mod state_lru_cache;

pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
use types::non_zero_usize::new_non_zero_usize;

/// The LRU Cache stores `PendingComponents` which can store up to
/// `MAX_BLOBS_PER_BLOCK = 6` blobs each. A `BlobSidecar` is 0.131256 MB. So
/// the maximum size of a `PendingComponents` is ~ 0.787536 MB. Setting this
/// to 1024 means the maximum size of the cache is ~ 0.8 GB. But the cache
/// will target a size of less than 75% of capacity.
pub const OVERFLOW_LRU_CAPACITY: usize = 1024;
pub const OVERFLOW_LRU_CAPACITY: NonZeroUsize = new_non_zero_usize(1024);
/// Until tree-states is implemented, we can't store very many states in memory :(
pub const STATE_LRU_CAPACITY: usize = 2;
pub const STATE_LRU_CAPACITY_NON_ZERO: NonZeroUsize = new_non_zero_usize(2);
pub const STATE_LRU_CAPACITY: usize = STATE_LRU_CAPACITY_NON_ZERO.get();

/// This includes a cache for any blocks or blobs that have been received over gossip or RPC
/// and are awaiting more components before they can be imported. Additionally the
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@ use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use ssz_types::{FixedVector, VariableList};
use std::num::NonZeroUsize;
use std::{collections::HashSet, sync::Arc};
use types::blob_sidecar::BlobIdentifier;
use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256};
@@ -288,7 +289,7 @@ struct Critical<T: BeaconChainTypes> {
}

impl<T: BeaconChainTypes> Critical<T> {
pub fn new(capacity: usize) -> Self {
pub fn new(capacity: NonZeroUsize) -> Self {
Self {
in_memory: LruCache::new(capacity),
store_keys: HashSet::new(),
@@ -329,7 +330,7 @@ impl<T: BeaconChainTypes> Critical<T> {
pending_components: PendingComponents<T::EthSpec>,
overflow_store: &OverflowStore<T>,
) -> Result<(), AvailabilityCheckError> {
if self.in_memory.len() == self.in_memory.cap() {
if self.in_memory.len() == self.in_memory.cap().get() {
// cache will overflow, must write lru entry to disk
if let Some((lru_key, lru_value)) = self.in_memory.pop_lru() {
overflow_store.persist_pending_components(lru_key, lru_value)?;
@@ -377,12 +378,12 @@ pub struct OverflowLRUCache<T: BeaconChainTypes> {
/// Mutex to guard maintenance methods which move data between disk and memory
maintenance_lock: Mutex<()>,
/// The capacity of the LRU cache
capacity: usize,
capacity: NonZeroUsize,
}

impl<T: BeaconChainTypes> OverflowLRUCache<T> {
pub fn new(
capacity: usize,
capacity: NonZeroUsize,
beacon_store: BeaconStore<T>,
spec: ChainSpec,
) -> Result<Self, AvailabilityCheckError> {
@@ -514,7 +515,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
/// maintain the cache
pub fn do_maintenance(&self, cutoff_epoch: Epoch) -> Result<(), AvailabilityCheckError> {
// ensure memory usage is below threshold
let threshold = self.capacity * 3 / 4;
let threshold = self.capacity.get() * 3 / 4;
self.maintain_threshold(threshold, cutoff_epoch)?;
// clean up any keys on the disk that shouldn't be there
self.prune_disk(cutoff_epoch)?;
@@ -753,6 +754,7 @@ mod test {
use std::ops::AddAssign;
use store::{HotColdDB, ItemStore, LevelDB, StoreConfig};
use tempfile::{tempdir, TempDir};
use types::non_zero_usize::new_non_zero_usize;
use types::{ChainSpec, ExecPayload, MinimalEthSpec};

const LOW_VALIDATOR_COUNT: usize = 32;
@@ -974,8 +976,9 @@ mod test {
let harness = get_deneb_chain(log.clone(), &chain_db_path).await;
let spec = harness.spec.clone();
let test_store = harness.chain.store.clone();
let capacity_non_zero = new_non_zero_usize(capacity);
let cache = Arc::new(
OverflowLRUCache::<T>::new(capacity, test_store, spec.clone())
OverflowLRUCache::<T>::new(capacity_non_zero, test_store, spec.clone())
.expect("should create cache"),
);
(harness, cache, chain_db_path)
@@ -1477,7 +1480,7 @@ mod test {

// create a new cache with the same store
let recovered_cache = OverflowLRUCache::<T>::new(
capacity,
new_non_zero_usize(capacity),
harness.chain.store.clone(),
harness.chain.spec.clone(),
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::block_verification_types::AsBlock;
use crate::{
block_verification_types::BlockImportData,
data_availability_checker::{AvailabilityCheckError, STATE_LRU_CAPACITY},
data_availability_checker::{AvailabilityCheckError, STATE_LRU_CAPACITY_NON_ZERO},
eth1_finalization_cache::Eth1FinalizationData,
AvailabilityPendingExecutedBlock, BeaconChainTypes, BeaconStore, PayloadVerificationOutcome,
};
@@ -61,7 +61,7 @@ pub struct StateLRUCache<T: BeaconChainTypes> {
impl<T: BeaconChainTypes> StateLRUCache<T> {
pub fn new(store: BeaconStore<T>, spec: ChainSpec) -> Self {
Self {
states: RwLock::new(LruCache::new(STATE_LRU_CAPACITY)),
states: RwLock::new(LruCache::new(STATE_LRU_CAPACITY_NON_ZERO)),
store,
spec,
}
8 changes: 5 additions & 3 deletions beacon_node/beacon_chain/src/pre_finalization_cache.rs
Original file line number Diff line number Diff line change
@@ -3,11 +3,13 @@ use itertools::process_results;
use lru::LruCache;
use parking_lot::Mutex;
use slog::debug;
use std::num::NonZeroUsize;
use std::time::Duration;
use types::non_zero_usize::new_non_zero_usize;
use types::Hash256;

const BLOCK_ROOT_CACHE_LIMIT: usize = 512;
const LOOKUP_LIMIT: usize = 8;
const BLOCK_ROOT_CACHE_LIMIT: NonZeroUsize = new_non_zero_usize(512);
const LOOKUP_LIMIT: NonZeroUsize = new_non_zero_usize(8);
const METRICS_TIMEOUT: Duration = Duration::from_millis(100);

/// Cache for rejecting attestations to blocks from before finalization.
@@ -78,7 +80,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// 3. Check the network with a single block lookup.
cache.in_progress_lookups.put(block_root, ());
if cache.in_progress_lookups.len() == LOOKUP_LIMIT {
if cache.in_progress_lookups.len() == LOOKUP_LIMIT.get() {
// NOTE: we expect this to occur sometimes if a lot of blocks that we look up fail to be
// imported for reasons other than being pre-finalization. The cache will eventually
// self-repair in this case by replacing old entries with new ones until all the failed
4 changes: 3 additions & 1 deletion beacon_node/execution_layer/src/engines.rs
Original file line number Diff line number Diff line change
@@ -8,17 +8,19 @@ use crate::HttpJsonRpc;
use lru::LruCache;
use slog::{debug, error, info, warn, Logger};
use std::future::Future;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::{watch, Mutex, RwLock};
use tokio_stream::wrappers::WatchStream;
use types::non_zero_usize::new_non_zero_usize;
use types::ExecutionBlockHash;

/// The number of payload IDs that will be stored for each `Engine`.
///
/// Since the size of each value is small (~800 bytes) a large number is used for safety.
const PAYLOAD_ID_LRU_CACHE_SIZE: usize = 512;
const PAYLOAD_ID_LRU_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(512);
const CACHED_ENGINE_CAPABILITIES_AGE_LIMIT: Duration = Duration::from_secs(900); // 15 minutes

/// Stores the remembered state of a engine.
4 changes: 3 additions & 1 deletion beacon_node/execution_layer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ use std::collections::HashMap;
use std::fmt;
use std::future::Future;
use std::io::Write;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
@@ -42,6 +43,7 @@ use tokio_stream::wrappers::WatchStream;
use tree_hash::TreeHash;
use types::beacon_block_body::KzgCommitments;
use types::builder_bid::BuilderBid;
use types::non_zero_usize::new_non_zero_usize;
use types::payload::BlockProductionVersion;
use types::{
AbstractExecPayload, BlobsList, ExecutionPayloadDeneb, KzgProofs, SignedBlindedBeaconBlock,
@@ -68,7 +70,7 @@ pub const DEFAULT_JWT_FILE: &str = "jwt.hex";

/// Each time the `ExecutionLayer` retrieves a block from an execution node, it stores that block
/// in an LRU cache to avoid redundant lookups. This is the size of that cache.
const EXECUTION_BLOCKS_LRU_CACHE_SIZE: usize = 128;
const EXECUTION_BLOCKS_LRU_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(128);

/// A fee recipient address for use during block production. Only used as a very last resort if
/// there is no address provided by the user.
4 changes: 3 additions & 1 deletion beacon_node/execution_layer/src/payload_cache.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use eth2::types::FullPayloadContents;
use lru::LruCache;
use parking_lot::Mutex;
use std::num::NonZeroUsize;
use tree_hash::TreeHash;
use types::non_zero_usize::new_non_zero_usize;
use types::{EthSpec, Hash256};

pub const DEFAULT_PAYLOAD_CACHE_SIZE: usize = 10;
pub const DEFAULT_PAYLOAD_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(10);

/// A cache mapping execution payloads by tree hash roots.
pub struct PayloadCache<T: EthSpec> {
14 changes: 5 additions & 9 deletions beacon_node/http_api/src/block_rewards.rs
Original file line number Diff line number Diff line change
@@ -3,13 +3,13 @@ use eth2::lighthouse::{BlockReward, BlockRewardsQuery};
use lru::LruCache;
use slog::{debug, warn, Logger};
use state_processing::BlockReplayer;
use std::num::NonZeroUsize;
use std::sync::Arc;
use types::beacon_block::BlindedBeaconBlock;
use warp_utils::reject::{
beacon_chain_error, beacon_state_error, custom_bad_request, custom_server_error,
};
use types::non_zero_usize::new_non_zero_usize;
use warp_utils::reject::{beacon_chain_error, beacon_state_error, custom_bad_request};

const STATE_CACHE_SIZE: usize = 2;
const STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(2);

/// Fetch block rewards for blocks from the canonical chain.
pub fn get_block_rewards<T: BeaconChainTypes>(
@@ -164,11 +164,7 @@ pub fn compute_block_rewards<T: BeaconChainTypes>(
.build_all_committee_caches(&chain.spec)
.map_err(beacon_state_error)?;

state_cache
.get_or_insert((parent_root, block.slot()), || state)
.ok_or_else(|| {
custom_server_error("LRU cache insert should always succeed".into())
})?
state_cache.get_or_insert((parent_root, block.slot()), || state)
};

// Compute block reward.
6 changes: 5 additions & 1 deletion beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@ pub use libp2p::{
use lru::LruCache;
use slog::{crit, debug, error, info, trace, warn};
use ssz::Encode;
use std::num::NonZeroUsize;
use std::{
collections::{HashMap, VecDeque},
net::{IpAddr, SocketAddr},
@@ -49,6 +50,7 @@ use types::{EnrForkId, EthSpec};

mod subnet_predicate;
pub use subnet_predicate::subnet_predicate;
use types::non_zero_usize::new_non_zero_usize;

/// Local ENR storage filename.
pub const ENR_FILENAME: &str = "enr.dat";
@@ -70,6 +72,8 @@ const MAX_SUBNETS_IN_QUERY: usize = 3;
pub const FIND_NODE_QUERY_CLOSEST_PEERS: usize = 16;
/// The threshold for updating `min_ttl` on a connected peer.
const DURATION_DIFFERENCE: Duration = Duration::from_millis(1);
/// The capacity of the Discovery ENR cache.
const ENR_CACHE_CAPACITY: NonZeroUsize = new_non_zero_usize(50);

/// A query has completed. This result contains a mapping of discovered peer IDs to the `min_ttl`
/// of the peer if it is specified.
@@ -318,7 +322,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
};

Ok(Self {
cached_enrs: LruCache::new(50),
cached_enrs: LruCache::new(ENR_CACHE_CAPACITY),
network_globals,
find_peer_active: false,
queued_queries: VecDeque::with_capacity(10),
10 changes: 6 additions & 4 deletions beacon_node/store/src/config.rs
Original file line number Diff line number Diff line change
@@ -2,12 +2,14 @@ use crate::{DBColumn, Error, StoreItem};
use serde::{Deserialize, Serialize};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::num::NonZeroUsize;
use types::non_zero_usize::new_non_zero_usize;
use types::{EthSpec, MinimalEthSpec};

pub const PREV_DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192;
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5;
pub const DEFAULT_HISTORIC_STATE_CACHE_SIZE: usize = 1;
pub const DEFAULT_BLOCK_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(5);
pub const DEFAULT_HISTORIC_STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(1);
pub const DEFAULT_EPOCHS_PER_BLOB_PRUNE: u64 = 1;
pub const DEFAULT_BLOB_PUNE_MARGIN_EPOCHS: u64 = 0;

@@ -19,9 +21,9 @@ pub struct StoreConfig {
/// Flag indicating whether the `slots_per_restore_point` was set explicitly by the user.
pub slots_per_restore_point_set_explicitly: bool,
/// Maximum number of blocks to store in the in-memory block cache.
pub block_cache_size: usize,
pub block_cache_size: NonZeroUsize,
/// Maximum number of states from freezer database to store in the in-memory state cache.
pub historic_state_cache_size: usize,
pub historic_state_cache_size: NonZeroUsize,
/// Whether to compact the database on initialization.
pub compact_on_init: bool,
/// Whether to compact the database during database pruning.
3 changes: 2 additions & 1 deletion beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@ use state_processing::{
use std::cmp::min;
use std::convert::TryInto;
use std::marker::PhantomData;
use std::num::NonZeroUsize;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
@@ -85,7 +86,7 @@ struct BlockCache<E: EthSpec> {
}

impl<E: EthSpec> BlockCache<E> {
pub fn new(size: usize) -> Self {
pub fn new(size: NonZeroUsize) -> Self {
Self {
block_cache: LruCache::new(size),
blob_cache: LruCache::new(size),
3 changes: 2 additions & 1 deletion common/account_utils/src/validator_definitions.rs
Original file line number Diff line number Diff line change
@@ -367,7 +367,8 @@ impl ValidatorDefinitions {
pub fn save<P: AsRef<Path>>(&self, validators_dir: P) -> Result<(), Error> {
let config_path = validators_dir.as_ref().join(CONFIG_FILENAME);
let temp_path = validators_dir.as_ref().join(CONFIG_TEMP_FILENAME);
let bytes = serde_yaml::to_vec(self).map_err(Error::UnableToEncodeFile)?;
let mut bytes = vec![];
serde_yaml::to_writer(&mut bytes, self).map_err(Error::UnableToEncodeFile)?;
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved

write_file_via_temporary(&config_path, &temp_path, &bytes)
.map_err(Error::UnableToWriteFile)?;
1 change: 1 addition & 0 deletions consensus/types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -100,6 +100,7 @@ pub mod sqlite;

pub mod blob_sidecar;
pub mod light_client_header;
pub mod non_zero_usize;

use ethereum_types::{H160, H256};

8 changes: 8 additions & 0 deletions consensus/types/src/non_zero_usize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use std::num::NonZeroUsize;

pub const fn new_non_zero_usize(x: usize) -> NonZeroUsize {
match NonZeroUsize::new(x) {
Some(n) => n,
None => panic!("Expected a non zero usize."),
}
}
15 changes: 12 additions & 3 deletions lighthouse/tests/beacon_node.rs
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ use std::str::FromStr;
use std::string::ToString;
use std::time::Duration;
use tempfile::TempDir;
use types::non_zero_usize::new_non_zero_usize;
use types::{
Address, Checkpoint, Epoch, ExecutionBlockHash, ForkName, Hash256, MainnetEthSpec,
ProgressiveBalancesMode,
@@ -1769,14 +1770,19 @@ fn block_cache_size_flag() {
CommandLineTest::new()
.flag("block-cache-size", Some("4"))
.run_with_zero_port()
.with_config(|config| assert_eq!(config.store.block_cache_size, 4_usize));
.with_config(|config| assert_eq!(config.store.block_cache_size, new_non_zero_usize(4)));
}
#[test]
fn historic_state_cache_size_flag() {
CommandLineTest::new()
.flag("historic-state-cache-size", Some("4"))
.run_with_zero_port()
.with_config(|config| assert_eq!(config.store.historic_state_cache_size, 4_usize));
.with_config(|config| {
assert_eq!(
config.store.historic_state_cache_size,
new_non_zero_usize(4)
)
});
}
#[test]
fn historic_state_cache_size_default() {
@@ -2001,7 +2007,10 @@ fn slasher_attestation_cache_size_flag() {
.slasher
.as_ref()
.expect("Unable to parse Slasher config");
assert_eq!(slasher_config.attestation_root_cache_size, 10000);
assert_eq!(
slasher_config.attestation_root_cache_size,
new_non_zero_usize(10000)
);
});
}
#[test]
6 changes: 4 additions & 2 deletions slasher/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::Error;
use serde::{Deserialize, Serialize};
use std::num::NonZeroUsize;
use std::path::PathBuf;
use strum::{Display, EnumString, EnumVariantNames};
use types::non_zero_usize::new_non_zero_usize;
use types::{Epoch, EthSpec, IndexedAttestation};

pub const DEFAULT_CHUNK_SIZE: usize = 16;
@@ -10,7 +12,7 @@ pub const DEFAULT_HISTORY_LENGTH: usize = 4096;
pub const DEFAULT_UPDATE_PERIOD: u64 = 12;
pub const DEFAULT_SLOT_OFFSET: f64 = 10.5;
pub const DEFAULT_MAX_DB_SIZE: usize = 256 * 1024; // 256 GiB
pub const DEFAULT_ATTESTATION_ROOT_CACHE_SIZE: usize = 100_000;
pub const DEFAULT_ATTESTATION_ROOT_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(100_000);
pub const DEFAULT_BROADCAST: bool = false;

#[cfg(all(feature = "mdbx", not(feature = "lmdb")))]
@@ -38,7 +40,7 @@ pub struct Config {
/// Maximum size of the database in megabytes.
pub max_db_size_mbs: usize,
/// Maximum size of the in-memory cache for attestation roots.
pub attestation_root_cache_size: usize,
pub attestation_root_cache_size: NonZeroUsize,
/// Whether to broadcast slashings found to the network.
pub broadcast: bool,
/// Database backend to use.
5 changes: 2 additions & 3 deletions watch/Cargo.toml
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ types = { workspace = true }
eth2 = { workspace = true }
beacon_node = { workspace = true }
tokio = { workspace = true }
axum = "0.6.18"
axum = "0.7"
hyper = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
@@ -41,8 +41,7 @@ tokio-postgres = "0.7.5"
http_api = { workspace = true }
beacon_chain = { workspace = true }
network = { workspace = true }
# TODO: update to 0.15 when released: https://github.com/testcontainers/testcontainers-rs/issues/497
testcontainers = { git = "https://github.com/testcontainers/testcontainers-rs/", rev = "0f2c9851" }
testcontainers = "0.15"
unused_port = { workspace = true }
task_executor = { workspace = true }
logging = { workspace = true }
10 changes: 3 additions & 7 deletions watch/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::{config::Config, logger, server, updater};
use clap::{App, Arg};
use tokio::sync::oneshot;

pub const SERVE: &str = "serve";
pub const RUN_UPDATER: &str = "run-updater";
@@ -44,12 +43,9 @@ pub async fn run() -> Result<(), String> {
(RUN_UPDATER, Some(_)) => updater::run_updater(config)
.await
.map_err(|e| format!("Failure: {:?}", e)),
(SERVE, Some(_)) => {
let (_shutdown_tx, shutdown_rx) = oneshot::channel();
server::serve(config, shutdown_rx)
.await
.map_err(|e| format!("Failure: {:?}", e))
}
(SERVE, Some(_)) => server::serve(config)
.await
.map_err(|e| format!("Failure: {:?}", e)),
_ => Err("Unsupported subcommand. See --help".into()),
}
}
8 changes: 8 additions & 0 deletions watch/src/server/error.rs
Original file line number Diff line number Diff line change
@@ -3,12 +3,14 @@ use axum::Error as AxumError;
use axum::{http::StatusCode, response::IntoResponse, Json};
use hyper::Error as HyperError;
use serde_json::json;
use std::io::Error as IoError;

#[derive(Debug)]
pub enum Error {
Axum(AxumError),
Hyper(HyperError),
Database(DbError),
IoError(IoError),
BadRequest,
NotFound,
Other(String),
@@ -43,6 +45,12 @@ impl From<DbError> for Error {
}
}

impl From<IoError> for Error {
fn from(e: IoError) -> Self {
Error::IoError(e)
}
}

impl From<String> for Error {
fn from(e: String) -> Self {
Error::Other(e)
25 changes: 9 additions & 16 deletions watch/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -11,9 +11,8 @@ use axum::{
};
use eth2::types::ErrorMessage;
use log::info;
use std::future::Future;
use std::net::SocketAddr;
use tokio::sync::oneshot;
use std::future::{Future, IntoFuture};
use std::net::{SocketAddr, TcpListener};

pub use config::Config;
pub use error::Error;
@@ -22,7 +21,7 @@ mod config;
mod error;
mod handler;

pub async fn serve(config: FullConfig, shutdown: oneshot::Receiver<()>) -> Result<(), Error> {
pub async fn serve(config: FullConfig) -> Result<(), Error> {
let db = database::build_connection_pool(&config.database)?;
let (_, slots_per_epoch) = database::get_active_config(&mut database::get_connection(&db)?)?
.ok_or_else(|| {
@@ -32,9 +31,7 @@ pub async fn serve(config: FullConfig, shutdown: oneshot::Receiver<()>) -> Resul
)
})?;

let server = start_server(&config, slots_per_epoch as u64, db, async {
let _ = shutdown.await;
})?;
let server = start_server(&config, slots_per_epoch as u64, db)?;

server.await?;

@@ -61,8 +58,7 @@ pub fn start_server(
config: &FullConfig,
slots_per_epoch: u64,
pool: PgPool,
shutdown: impl Future<Output = ()> + Send + Sync + 'static,
) -> Result<impl Future<Output = Result<(), hyper::Error>> + 'static, Error> {
) -> Result<impl Future<Output = Result<(), std::io::Error>> + 'static, Error> {
let mut routes = Router::new()
.route("/v1/slots", get(handler::get_slots_by_range))
.route("/v1/slots/:slot", get(handler::get_slot))
@@ -108,16 +104,13 @@ pub fn start_server(
.layer(Extension(slots_per_epoch));

let addr = SocketAddr::new(config.server.listen_addr, config.server.listen_port);

let server = axum::Server::try_bind(&addr)?.serve(app.into_make_service());

let server = server.with_graceful_shutdown(async {
shutdown.await;
});
let listener = TcpListener::bind(addr)?;
listener.set_nonblocking(true)?;
let serve = axum::serve(tokio::net::TcpListener::from_std(listener)?, app);

info!("HTTP server listening on {}", addr);

Ok(server)
Ok(serve.into_future())
}

// The default route indicating that no available routes matched the request.
9 changes: 1 addition & 8 deletions watch/tests/tests.rs
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@ use std::env;
use std::net::SocketAddr;
use std::time::Duration;
use testcontainers::{clients::Cli, core::WaitFor, Image, RunnableImage};
use tokio::sync::oneshot;
use tokio::{runtime, task::JoinHandle};
use tokio_postgres::{config::Config as PostgresConfig, Client, NoTls};
use types::{Hash256, MainnetEthSpec, Slot};
@@ -188,11 +187,7 @@ impl TesterBuilder {
/*
* Spawn a Watch HTTP API.
*/
let (_watch_shutdown_tx, watch_shutdown_rx) = oneshot::channel();
let watch_server = start_server(&self.config, SLOTS_PER_EPOCH, pool, async {
let _ = watch_shutdown_rx.await;
})
.unwrap();
let watch_server = start_server(&self.config, SLOTS_PER_EPOCH, pool).unwrap();
tokio::spawn(watch_server);

let addr = SocketAddr::new(
@@ -228,7 +223,6 @@ impl TesterBuilder {
config: self.config,
updater,
_bn_network_rx: self._bn_network_rx,
_watch_shutdown_tx,
}
}
async fn initialize_database(&self) -> PgPool {
@@ -245,7 +239,6 @@ struct Tester {
pub config: Config,
pub updater: UpdateHandler<E>,
_bn_network_rx: NetworkReceivers<E>,
_watch_shutdown_tx: oneshot::Sender<()>,
}

impl Tester {