diff --git a/src/services/pool/mod.rs b/src/services/pool/mod.rs index 817e7e4d1f..c8dc2eaf20 100644 --- a/src/services/pool/mod.rs +++ b/src/services/pool/mod.rs @@ -61,8 +61,8 @@ struct TransactionHandler { impl PoolWorkerHandler { fn process_msg(&mut self, raw_msg: &String, src_ind: usize) -> Result, PoolError> { - let msg = try_log!(Message::from_raw_str(raw_msg) - .map_err(PoolError::from_displayable_as_invalid_data)); + let msg = Message::from_raw_str(raw_msg) + .map_err(PoolError::from_displayable_as_invalid_data)?; match self { &mut PoolWorkerHandler::CatchupHandler(ref mut ch) => ch.process_msg(msg, raw_msg, src_ind), &mut PoolWorkerHandler::TransactionHandler(ref mut ch) => ch.process_msg(msg, raw_msg, src_ind), @@ -142,11 +142,11 @@ impl TransactionHandler { fn try_send_request(&mut self, cmd: &str, cmd_id: i32) -> Result<(), PoolError> { info!("cmd {:?}", cmd); - let request: Value = try_log!(serde_json::from_str(cmd) - .map_err(PoolError::from_displayable_as_invalid_data)); - let request_id: u64 = try_log!(request["reqId"] + let request: Value = serde_json::from_str(cmd) + .map_err(PoolError::from_displayable_as_invalid_data)?; + let request_id: u64 = request["reqId"] .as_u64() - .ok_or(PoolError::InvalidData("Invalid request: missed requestId field".to_string()))); + .ok_or(PoolError::InvalidData("Invalid request: missed requestId field".to_string()))?; if self.pending_commands.contains_key(&request_id) { self.pending_commands.get_mut(&request_id).unwrap().cmd_ids.push(cmd_id); } else { @@ -158,7 +158,7 @@ impl TransactionHandler { self.pending_commands.insert(request_id, pc); for node in &self.nodes { let node: &RemoteNode = node; - try_log!(node.send_str(cmd)); + node.send_str(cmd)?; } } Ok(()) @@ -174,12 +174,12 @@ impl TransactionHandler { for (_, pending_cmd) in &self.pending_commands { let pending_cmd: &CommandProcess = pending_cmd; for cmd_id in &pending_cmd.cmd_ids { - try_log!(CommandExecutor::instance() + CommandExecutor::instance() .send(Command::Ledger(LedgerCommand::SubmitAck( cmd_id.clone(), Err(PoolError::Terminate)))) .map_err(|err| { PoolError::InvalidState("Can't send ACK cmd".to_string()) - })); + })?; } } Ok(()) @@ -200,7 +200,7 @@ impl Default for TransactionHandler { impl PoolWorker { fn connect_to_known_nodes(&mut self, merkle_tree: Option<&MerkleTree>) -> Result<(), PoolError> { - let merkle_tree: MerkleTree = try_log!(merkle_tree.map(|x| { x.clone() }) + let merkle_tree: MerkleTree = merkle_tree.map(|x| { x.clone() }) .or_else(|| { match self.handler { //TODO default self.handler.get_default_mt() -> Result @@ -208,16 +208,16 @@ impl PoolWorker { PoolWorkerHandler::TransactionHandler(_) => None } }) - .ok_or(PoolError::InvalidState("Expect catchup state".to_string()))); + .ok_or(PoolError::InvalidState("Expect catchup state".to_string()))?; let ctx: zmq::Context = zmq::Context::new(); for gen_txn in &merkle_tree { - let gen_txn: GenTransaction = try_log!(GenTransaction::from_json(gen_txn) + let gen_txn: GenTransaction = GenTransaction::from_json(gen_txn) .map_err(|e| { PoolError::InvalidState(format!("MerkleTree contains invalid data {}", e)) - })); - let mut rn: RemoteNode = try_log!(RemoteNode::new(&gen_txn)); - try_log!(rn.connect(&ctx)); - try_log!(rn.send_str("pi")); + })?; + let mut rn: RemoteNode = RemoteNode::new(&gen_txn)?; + rn.connect(&ctx)?; + rn.send_str("pi")?; self.handler.nodes_mut().push(rn); } self.handler.set_f(PoolWorker::get_f(merkle_tree.count())); //TODO set cnt to connect @@ -226,19 +226,19 @@ impl PoolWorker { fn init_catchup(&mut self) -> Result<(), PoolError> { let catchup_handler = CatchupHandler { - merkle_tree: try_log!(PoolWorker::_restore_merkle_tree(self.name.as_str())), + merkle_tree: PoolWorker::_restore_merkle_tree(self.name.as_str())?, open_cmd_id: self.open_cmd_id, pool_id: self.pool_id, ..Default::default() }; self.handler = PoolWorkerHandler::CatchupHandler(catchup_handler); - try_log!(self.connect_to_known_nodes(None)); + self.connect_to_known_nodes(None)?; Ok(()) } pub fn run(&mut self) -> Result<(), PoolError> { self._run().or_else(|err: PoolError| { - try_log!(self.handler.flush_requests(Err(PoolError::Terminate))); + self.handler.flush_requests(Err(PoolError::Terminate))?; match err { PoolError::Terminate => Ok(()), _ => Err(err), @@ -247,14 +247,14 @@ impl PoolWorker { } fn _run(&mut self) -> Result<(), PoolError> { - try_log!(self.init_catchup()); //TODO consider error as PoolOpen error + self.init_catchup()?; //TODO consider error as PoolOpen error loop { trace!("zmq poll loop >>"); - let actions = try_log!(self.poll_zmq()); + let actions = self.poll_zmq()?; - try_log!(self.process_actions(actions)); + self.process_actions(actions)?; trace!("zmq poll loop <<"); } @@ -267,20 +267,20 @@ impl PoolWorker { return Err(PoolError::Terminate); } &ZMQLoopAction::MessageToProcess(ref msg) => { - if let Some(new_mt) = try_log!(self.handler.process_msg(&msg.message, msg.node_idx)) { - try_log!(self.handler.flush_requests(Ok(()))); + if let Some(new_mt) = self.handler.process_msg(&msg.message, msg.node_idx)? { + self.handler.flush_requests(Ok(()))?; self.handler = PoolWorkerHandler::TransactionHandler(Default::default()); - try_log!(self.connect_to_known_nodes(Some(&new_mt))); + self.connect_to_known_nodes(Some(&new_mt))?; } } &ZMQLoopAction::RequestToSend(ref req) => { - try_log!(self.handler.send_request(req.request.as_str(), req.id).or_else(|err| { + self.handler.send_request(req.request.as_str(), req.id).or_else(|err| { CommandExecutor::instance() .send(Command::Ledger(LedgerCommand::SubmitAck(req.id, Err(err)))) .map_err(|err| { PoolError::InvalidState("Can't send ACK cmd".to_string()) }) - })); + })?; } } } @@ -290,13 +290,13 @@ impl PoolWorker { fn poll_zmq(&mut self) -> Result, PoolError> { let mut actions: Vec = Vec::new(); - let mut poll_items = try_log!(self.get_zmq_poll_items()); - let r = try_log!(zmq::poll(poll_items.as_mut_slice(), -1)); + let mut poll_items = self.get_zmq_poll_items()?; + let r = zmq::poll(poll_items.as_mut_slice(), -1)?; trace!("zmq poll {:?}", r); for i in 0..self.handler.nodes().len() { if poll_items[1 + i].is_readable() { - if let Some(msg) = try_log!(self.handler.nodes()[i].recv_msg()) { + if let Some(msg) = self.handler.nodes()[i].recv_msg()? { actions.push(ZMQLoopAction::MessageToProcess(MessageToProcess { node_idx: i, message: msg, @@ -305,10 +305,10 @@ impl PoolWorker { } } if poll_items[0].is_readable() { - let cmd = try_log!(self.cmd_sock.recv_multipart(zmq::DONTWAIT)); + let cmd = self.cmd_sock.recv_multipart(zmq::DONTWAIT)?; trace!("cmd {:?}", cmd); - let cmd_s = try_log!(String::from_utf8(cmd[0].clone()) - .map_err(PoolError::from_displayable_as_invalid_data)); + let cmd_s = String::from_utf8(cmd[0].clone()) + .map_err(PoolError::from_displayable_as_invalid_data)?; if "exit".eq(cmd_s.as_str()) { actions.push(ZMQLoopAction::Terminate); } else { @@ -325,9 +325,9 @@ impl PoolWorker { let mut poll_items: Vec = Vec::new(); poll_items.push(self.cmd_sock.as_poll_item(zmq::POLLIN)); for ref node in self.handler.nodes() { - let s: &zmq::Socket = try_log!(node.zsock.as_ref() + let s: &zmq::Socket = node.zsock.as_ref() .ok_or(PoolError::InvalidState( - "Try to poll from ZMQ socket for unconnected RemoteNode".to_string()))); + "Try to poll from ZMQ socket for unconnected RemoteNode".to_string()))?; poll_items.push(s.as_poll_item(zmq::POLLIN)); } Ok(poll_items) @@ -336,15 +336,15 @@ impl PoolWorker { fn _restore_merkle_tree(pool_name: &str) -> Result { let mut p = EnvironmentUtils::pool_path(pool_name); - let mut mt = try_log!(MerkleTree::from_vec(Vec::new())); + let mut mt = MerkleTree::from_vec(Vec::new())?; //TODO firstly try to deserialize merkle tree p.push(pool_name); p.set_extension("txn"); - let f = try_log!(fs::File::open(p)); + let f = fs::File::open(p)?; let reader = io::BufReader::new(&f); for line in reader.lines() { - let line: String = try_log!(line); - try_log!(mt.append(line)); + let line: String = line?; + mt.append(line)?; } Ok(mt) } @@ -360,13 +360,13 @@ impl PoolWorker { impl Pool { pub fn new(name: &str, cmd_id: i32) -> Result { let zmq_ctx = zmq::Context::new(); - let recv_cmd_sock = try_log!(zmq_ctx.socket(zmq::SocketType::PAIR)); - let send_cmd_sock = try_log!(zmq_ctx.socket(zmq::SocketType::PAIR)); + let recv_cmd_sock = zmq_ctx.socket(zmq::SocketType::PAIR)?; + let send_cmd_sock = zmq_ctx.socket(zmq::SocketType::PAIR)?; let inproc_sock_name: String = format!("inproc://pool_{}", name); - try_log!(recv_cmd_sock.bind(inproc_sock_name.as_str())); + recv_cmd_sock.bind(inproc_sock_name.as_str())?; - try_log!(send_cmd_sock.connect(inproc_sock_name.as_str())); + send_cmd_sock.connect(inproc_sock_name.as_str())?; let pool_id = SequenceUtils::get_next_id(); let mut pool_worker: PoolWorker = PoolWorker { cmd_sock: recv_cmd_sock, @@ -395,7 +395,7 @@ impl Pool { pub fn send_tx(&self, cmd_id: i32, json: &str) -> Result<(), PoolError> { let mut buf = [0u8; 4]; LittleEndian::write_i32(&mut buf, cmd_id); - Ok(try_log!(self.cmd_sock.send_multipart(&[json.as_bytes(), &buf], zmq::DONTWAIT))) + Ok(self.cmd_sock.send_multipart(&[json.as_bytes(), &buf], zmq::DONTWAIT)?) } } @@ -428,8 +428,8 @@ impl Debug for RemoteNode { impl RemoteNode { fn new(txn: &GenTransaction) -> Result { - let public_key = try_log!(txn.dest.as_str().from_base58() - .map_err(|e| { PoolError::InvalidData("Invalid field dest in genesis transaction".to_string()) })); + let public_key = txn.dest.as_str().from_base58() + .map_err(|e| { PoolError::InvalidData("Invalid field dest in genesis transaction".to_string()) })?; Ok(RemoteNode { verify_key: ED25519::pk_to_curve25519(&public_key), public_key: public_key, @@ -440,16 +440,17 @@ impl RemoteNode { } fn connect(&mut self, ctx: &zmq::Context) -> Result<(), PoolError> { - let key_pair = try_log!(zmq::CurveKeyPair::new()); - let s = try_log!(ctx.socket(zmq::SocketType::DEALER)); - try_log!(s.set_identity(key_pair.public_key.as_bytes())); - try_log!(s.set_curve_secretkey(key_pair.secret_key.as_str())); - try_log!(s.set_curve_publickey(key_pair.public_key.as_str())); - let server_key = try_log!(zmq::z85_encode(self.verify_key.as_slice()) - .map_err(|err| { PoolError::InvalidData("Can't encode server key as z85".to_string()) })); - try_log!(s.set_curve_serverkey(server_key.as_str())); - try_log!(s.set_linger(0)); //TODO set correct timeout - try_log!(s.connect(self.zaddr.as_str())); + let key_pair = zmq::CurveKeyPair::new()?; + let s = ctx.socket(zmq::SocketType::DEALER)?; + s.set_identity(key_pair.public_key.as_bytes())?; + s.set_curve_secretkey(key_pair.secret_key.as_str())?; + s.set_curve_publickey(key_pair.public_key.as_str())?; + s.set_curve_serverkey( + zmq::z85_encode(self.verify_key.as_slice()) + .map_err(|err| { PoolError::InvalidData("Can't encode server key as z85".to_string()) })? + .as_str())?; + s.set_linger(0)?; //TODO set correct timeout + s.connect(self.zaddr.as_str())?; self.zsock = Some(s); Ok(()) } @@ -460,12 +461,9 @@ impl RemoteNode { PoolError::Io(io::Error::from(io::ErrorKind::InvalidData)) } } - let msg: String = try_log!( - try_log!( - try_log!(self.zsock.as_ref().ok_or(PoolError::InvalidState("Try to receive msg for unconnected RemoteNode".to_string())) - ).recv_string(zmq::DONTWAIT) - ).map_err(|err|{PoolError::InvalidData(format!("{:?}", err))}) - ); + let msg: String = self.zsock.as_ref() + .ok_or(PoolError::InvalidState("Try to receive msg for unconnected RemoteNode".to_string()))? + .recv_string(zmq::DONTWAIT)??; info!(target: "RemoteNode_recv_msg", "{} {}", self.name, msg); Ok(Some(msg)) @@ -473,14 +471,14 @@ impl RemoteNode { fn send_str(&self, str: &str) -> Result<(), PoolError> { info!("Sending {:?}", str); - try_log!(try_log!(self.zsock.as_ref() - .ok_or(PoolError::InvalidState("Try to send str for unconnected RemoteNode".to_string()))) - .send_str(str, zmq::DONTWAIT)); + self.zsock.as_ref() + .ok_or(PoolError::InvalidState("Try to send str for unconnected RemoteNode".to_string()))? + .send_str(str, zmq::DONTWAIT)?; Ok(()) } fn send_msg(&self, msg: &Message) -> Result<(), PoolError> { - self.send_str(try_log!(msg.to_json().map_err(PoolError::from_displayable_as_invalid_data)).as_str()) + self.send_str(msg.to_json().map_err(PoolError::from_displayable_as_invalid_data)?.as_str()) } } @@ -494,8 +492,8 @@ impl PoolService { pub fn create(&self, name: &str, config: Option<&str>) -> Result<(), PoolError> { let mut path = EnvironmentUtils::pool_path(name); let pool_config = match config { - Some(config) => try_log!(PoolConfig::from_json(config) - .map_err(PoolError::from_displayable_as_invalid_config)), + Some(config) => PoolConfig::from_json(config) + .map_err(PoolError::from_displayable_as_invalid_config)?, None => PoolConfig::default_for_name(name) }; @@ -503,19 +501,19 @@ impl PoolService { return Err(PoolError::NotCreated("Already created".to_string())); } - try_log!(fs::create_dir_all(path.as_path())); + fs::create_dir_all(path.as_path())?; path.push(name); path.set_extension("txn"); - try_log!(fs::copy(&pool_config.genesis_txn, path.as_path())); + fs::copy(&pool_config.genesis_txn, path.as_path())?; path.pop(); path.push("config"); path.set_extension("json"); - let mut f: fs::File = try_log!(fs::File::create(path.as_path())); - try_log!(f.write(try_log!(pool_config.to_json() - .map_err(PoolError::from_displayable_as_invalid_config)).as_bytes())); - try_log!(f.flush()); + let mut f: fs::File = fs::File::create(path.as_path())?; + f.write(pool_config.to_json() + .map_err(PoolError::from_displayable_as_invalid_config)?.as_bytes())?; + f.flush()?; // TODO probably create another one file pool.json with pool description, // but now there is no info to save (except name witch equal to directory) @@ -528,7 +526,7 @@ impl PoolService { } pub fn open(&self, name: &str, config: Option<&str>) -> Result { - for pool in try_log!(self.pools.try_borrow()).values() { + for pool in self.pools.try_borrow()?.values() { if name.eq(pool.name.as_str()) { //TODO change error return Err(PoolError::InvalidHandle("Already opened".to_string())); @@ -536,18 +534,18 @@ impl PoolService { } let cmd_id: i32 = SequenceUtils::get_next_id(); - let new_pool = try_log!(Pool::new(name, cmd_id)); + let new_pool = Pool::new(name, cmd_id)?; //FIXME process config: check None (use default), transfer to Pool instance - try_log!(self.pools.try_borrow_mut()).insert(new_pool.id, new_pool); + self.pools.try_borrow_mut()?.insert(new_pool.id, new_pool); return Ok(cmd_id); } pub fn send_tx(&self, handle: i32, json: &str) -> Result { let cmd_id: i32 = SequenceUtils::get_next_id(); - try_log!(try_log!(try_log!(self.pools.try_borrow()) - .get(&handle).ok_or(PoolError::InvalidHandle("No pool with requested handle".to_string()))) - .send_tx(cmd_id, json)); + self.pools.try_borrow()? + .get(&handle).ok_or(PoolError::InvalidHandle("No pool with requested handle".to_string()))? + .send_tx(cmd_id, json)?; Ok(cmd_id) } @@ -560,7 +558,7 @@ impl PoolService { } pub fn get_pool_name(&self, handle: i32) -> Result { - try_log!(self.pools.try_borrow()).get(&handle).map_or( + self.pools.try_borrow()?.get(&handle).map_or( Err(PoolError::InvalidHandle("Doesn't exists".to_string())), |pool: &Pool| Ok(pool.name.clone())) }