Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Prevent sync restart if import queue full (#9381)
Browse files Browse the repository at this point in the history
  • Loading branch information
ascjones authored and andresilva committed Aug 27, 2018
1 parent acfad41 commit ddfa8e3
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 27 deletions.
36 changes: 11 additions & 25 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

use std::collections::{HashSet, BTreeMap, VecDeque};
use std::cmp;
use std::fmt;
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::sync::{Arc, Weak};
Expand Down Expand Up @@ -51,13 +50,16 @@ use client::{
};
use encoded;
use engines::{EthEngine, EpochTransition, ForkChoice};
use error::{ImportErrorKind, BlockImportErrorKind, ExecutionError, CallError, BlockError, ImportResult, Error as EthcoreError};
use error::{
ImportErrorKind, BlockImportErrorKind, ExecutionError, CallError, BlockError, ImportResult,
QueueError, QueueErrorKind, Error as EthcoreError
};
use vm::{EnvInfo, LastHashes};
use evm::Schedule;
use executive::{Executive, Executed, TransactOptions, contract_address};
use factory::{Factories, VmFactory};
use header::{BlockNumber, Header, ExtendedHeader};
use io::{IoChannel, IoError};
use io::IoChannel;
use log_entry::LocalizedLogEntry;
use miner::{Miner, MinerService};
use ethcore_miner::pool::VerifiedTransaction;
Expand Down Expand Up @@ -2079,7 +2081,7 @@ impl IoClient for Client {

let queued = self.queued_ancient_blocks.clone();
let lock = self.ancient_blocks_import_lock.clone();
match self.queue_ancient_blocks.queue(&self.io_channel.read(), 1, move |client| {
self.queue_ancient_blocks.queue(&self.io_channel.read(), 1, move |client| {
trace_time!("import_ancient_block");
// Make sure to hold the lock here to prevent importing out of order.
// We use separate lock, cause we don't want to block queueing.
Expand All @@ -2104,10 +2106,9 @@ impl IoClient for Client {
break;
}
}
}) {
Ok(_) => Ok(hash),
Err(e) => bail!(BlockImportErrorKind::Other(format!("{}", e))),
}
})?;

Ok(hash)
}

fn queue_consensus_message(&self, message: Bytes) {
Expand Down Expand Up @@ -2523,21 +2524,6 @@ mod tests {
}
}

#[derive(Debug)]
enum QueueError {
Channel(IoError),
Full(usize),
}

impl fmt::Display for QueueError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
QueueError::Channel(ref c) => fmt::Display::fmt(c, fmt),
QueueError::Full(limit) => write!(fmt, "The queue is full ({})", limit),
}
}
}

/// Queue some items to be processed by IO client.
struct IoChannelQueue {
currently_queued: Arc<AtomicUsize>,
Expand All @@ -2556,7 +2542,7 @@ impl IoChannelQueue {
F: Fn(&Client) + Send + Sync + 'static,
{
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
ensure!(queue_size < self.limit, QueueError::Full(self.limit));
ensure!(queue_size < self.limit, QueueErrorKind::Full(self.limit));

let currently_queued = self.currently_queued.clone();
let result = channel.send(ClientIoMessage::execute(move |client| {
Expand All @@ -2569,7 +2555,7 @@ impl IoChannelQueue {
self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst);
Ok(())
},
Err(e) => Err(QueueError::Channel(e)),
Err(e) => bail!(QueueErrorKind::Channel(e)),
}
}
}
19 changes: 19 additions & 0 deletions ethcore/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,24 @@ impl error::Error for BlockError {
}
}

error_chain! {
types {
QueueError, QueueErrorKind, QueueErrorResultExt, QueueErrorResult;
}

errors {
#[doc = "Queue is full"]
Full(limit: usize) {
description("Queue is full")
display("The queue is full ({})", limit)
}
}

foreign_links {
Channel(IoError) #[doc = "Io channel error"];
}
}

error_chain! {
types {
ImportError, ImportErrorKind, ImportErrorResultExt, ImportErrorResult;
Expand Down Expand Up @@ -184,6 +202,7 @@ error_chain! {

links {
Import(ImportError, ImportErrorKind) #[doc = "Import error"];
Queue(QueueError, QueueErrorKind) #[doc = "Io channel queue error"];
}

foreign_links {
Expand Down
7 changes: 5 additions & 2 deletions ethcore/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ use rlp::Rlp;
use ethcore::views::BlockView;
use ethcore::header::{BlockNumber, Header as BlockHeader};
use ethcore::client::{BlockStatus, BlockId, BlockImportError, BlockImportErrorKind};
use ethcore::block::Block;
use ethcore::error::{ImportErrorKind, BlockError};
use ethcore::error::{ImportErrorKind, QueueErrorKind, BlockError};
use sync_io::SyncIo;
use blocks::BlockCollection;

Expand Down Expand Up @@ -527,6 +526,10 @@ impl BlockDownloader {
debug!(target: "sync", "Block temporarily invalid, restarting sync");
break;
},
Err(BlockImportError(BlockImportErrorKind::Queue(QueueErrorKind::Full(limit)), _)) => {
debug!(target: "sync", "Block import queue full ({}), restarting sync", limit);
break;
},
Err(e) => {
debug!(target: "sync", "Bad block {:?} : {:?}", h, e);
bad = true;
Expand Down

0 comments on commit ddfa8e3

Please sign in to comment.