From 2ee8411fa70b4c1070c326133e07dae50751c4bd Mon Sep 17 00:00:00 2001 From: Sergey Minaev Date: Fri, 26 May 2017 18:06:42 +0300 Subject: [PATCH] Use try_log! macro in pool service. --- src/services/pool/mod.rs | 160 ++++++++++++++++++++------------------- 1 file changed, 81 insertions(+), 79 deletions(-) diff --git a/src/services/pool/mod.rs b/src/services/pool/mod.rs index c8dc2eaf20..817e7e4d1f 100644 --- a/src/services/pool/mod.rs +++ b/src/services/pool/mod.rs @@ -61,8 +61,8 @@ struct TransactionHandler { impl PoolWorkerHandler { fn process_msg(&mut self, raw_msg: &String, src_ind: usize) -> Result, PoolError> { - let msg = Message::from_raw_str(raw_msg) - .map_err(PoolError::from_displayable_as_invalid_data)?; + let msg = try_log!(Message::from_raw_str(raw_msg) + .map_err(PoolError::from_displayable_as_invalid_data)); match self { &mut PoolWorkerHandler::CatchupHandler(ref mut ch) => ch.process_msg(msg, raw_msg, src_ind), &mut PoolWorkerHandler::TransactionHandler(ref mut ch) => ch.process_msg(msg, raw_msg, src_ind), @@ -142,11 +142,11 @@ impl TransactionHandler { fn try_send_request(&mut self, cmd: &str, cmd_id: i32) -> Result<(), PoolError> { info!("cmd {:?}", cmd); - let request: Value = serde_json::from_str(cmd) - .map_err(PoolError::from_displayable_as_invalid_data)?; - let request_id: u64 = request["reqId"] + let request: Value = try_log!(serde_json::from_str(cmd) + .map_err(PoolError::from_displayable_as_invalid_data)); + let request_id: u64 = try_log!(request["reqId"] .as_u64() - .ok_or(PoolError::InvalidData("Invalid request: missed requestId field".to_string()))?; + .ok_or(PoolError::InvalidData("Invalid request: missed requestId field".to_string()))); if self.pending_commands.contains_key(&request_id) { self.pending_commands.get_mut(&request_id).unwrap().cmd_ids.push(cmd_id); } else { @@ -158,7 +158,7 @@ impl TransactionHandler { self.pending_commands.insert(request_id, pc); for node in &self.nodes { let node: &RemoteNode = node; - node.send_str(cmd)?; + try_log!(node.send_str(cmd)); } } Ok(()) @@ -174,12 +174,12 @@ impl TransactionHandler { for (_, pending_cmd) in &self.pending_commands { let pending_cmd: &CommandProcess = pending_cmd; for cmd_id in &pending_cmd.cmd_ids { - CommandExecutor::instance() + try_log!(CommandExecutor::instance() .send(Command::Ledger(LedgerCommand::SubmitAck( cmd_id.clone(), Err(PoolError::Terminate)))) .map_err(|err| { PoolError::InvalidState("Can't send ACK cmd".to_string()) - })?; + })); } } Ok(()) @@ -200,7 +200,7 @@ impl Default for TransactionHandler { impl PoolWorker { fn connect_to_known_nodes(&mut self, merkle_tree: Option<&MerkleTree>) -> Result<(), PoolError> { - let merkle_tree: MerkleTree = merkle_tree.map(|x| { x.clone() }) + let merkle_tree: MerkleTree = try_log!(merkle_tree.map(|x| { x.clone() }) .or_else(|| { match self.handler { //TODO default self.handler.get_default_mt() -> Result @@ -208,16 +208,16 @@ impl PoolWorker { PoolWorkerHandler::TransactionHandler(_) => None } }) - .ok_or(PoolError::InvalidState("Expect catchup state".to_string()))?; + .ok_or(PoolError::InvalidState("Expect catchup state".to_string()))); let ctx: zmq::Context = zmq::Context::new(); for gen_txn in &merkle_tree { - let gen_txn: GenTransaction = GenTransaction::from_json(gen_txn) + let gen_txn: GenTransaction = try_log!(GenTransaction::from_json(gen_txn) .map_err(|e| { PoolError::InvalidState(format!("MerkleTree contains invalid data {}", e)) - })?; - let mut rn: RemoteNode = RemoteNode::new(&gen_txn)?; - rn.connect(&ctx)?; - rn.send_str("pi")?; + })); + let mut rn: RemoteNode = try_log!(RemoteNode::new(&gen_txn)); + try_log!(rn.connect(&ctx)); + try_log!(rn.send_str("pi")); self.handler.nodes_mut().push(rn); } self.handler.set_f(PoolWorker::get_f(merkle_tree.count())); //TODO set cnt to connect @@ -226,19 +226,19 @@ impl PoolWorker { fn init_catchup(&mut self) -> Result<(), PoolError> { let catchup_handler = CatchupHandler { - merkle_tree: PoolWorker::_restore_merkle_tree(self.name.as_str())?, + merkle_tree: try_log!(PoolWorker::_restore_merkle_tree(self.name.as_str())), open_cmd_id: self.open_cmd_id, pool_id: self.pool_id, ..Default::default() }; self.handler = PoolWorkerHandler::CatchupHandler(catchup_handler); - self.connect_to_known_nodes(None)?; + try_log!(self.connect_to_known_nodes(None)); Ok(()) } pub fn run(&mut self) -> Result<(), PoolError> { self._run().or_else(|err: PoolError| { - self.handler.flush_requests(Err(PoolError::Terminate))?; + try_log!(self.handler.flush_requests(Err(PoolError::Terminate))); match err { PoolError::Terminate => Ok(()), _ => Err(err), @@ -247,14 +247,14 @@ impl PoolWorker { } fn _run(&mut self) -> Result<(), PoolError> { - self.init_catchup()?; //TODO consider error as PoolOpen error + try_log!(self.init_catchup()); //TODO consider error as PoolOpen error loop { trace!("zmq poll loop >>"); - let actions = self.poll_zmq()?; + let actions = try_log!(self.poll_zmq()); - self.process_actions(actions)?; + try_log!(self.process_actions(actions)); trace!("zmq poll loop <<"); } @@ -267,20 +267,20 @@ impl PoolWorker { return Err(PoolError::Terminate); } &ZMQLoopAction::MessageToProcess(ref msg) => { - if let Some(new_mt) = self.handler.process_msg(&msg.message, msg.node_idx)? { - self.handler.flush_requests(Ok(()))?; + if let Some(new_mt) = try_log!(self.handler.process_msg(&msg.message, msg.node_idx)) { + try_log!(self.handler.flush_requests(Ok(()))); self.handler = PoolWorkerHandler::TransactionHandler(Default::default()); - self.connect_to_known_nodes(Some(&new_mt))?; + try_log!(self.connect_to_known_nodes(Some(&new_mt))); } } &ZMQLoopAction::RequestToSend(ref req) => { - self.handler.send_request(req.request.as_str(), req.id).or_else(|err| { + try_log!(self.handler.send_request(req.request.as_str(), req.id).or_else(|err| { CommandExecutor::instance() .send(Command::Ledger(LedgerCommand::SubmitAck(req.id, Err(err)))) .map_err(|err| { PoolError::InvalidState("Can't send ACK cmd".to_string()) }) - })?; + })); } } } @@ -290,13 +290,13 @@ impl PoolWorker { fn poll_zmq(&mut self) -> Result, PoolError> { let mut actions: Vec = Vec::new(); - let mut poll_items = self.get_zmq_poll_items()?; - let r = zmq::poll(poll_items.as_mut_slice(), -1)?; + let mut poll_items = try_log!(self.get_zmq_poll_items()); + let r = try_log!(zmq::poll(poll_items.as_mut_slice(), -1)); trace!("zmq poll {:?}", r); for i in 0..self.handler.nodes().len() { if poll_items[1 + i].is_readable() { - if let Some(msg) = self.handler.nodes()[i].recv_msg()? { + if let Some(msg) = try_log!(self.handler.nodes()[i].recv_msg()) { actions.push(ZMQLoopAction::MessageToProcess(MessageToProcess { node_idx: i, message: msg, @@ -305,10 +305,10 @@ impl PoolWorker { } } if poll_items[0].is_readable() { - let cmd = self.cmd_sock.recv_multipart(zmq::DONTWAIT)?; + let cmd = try_log!(self.cmd_sock.recv_multipart(zmq::DONTWAIT)); trace!("cmd {:?}", cmd); - let cmd_s = String::from_utf8(cmd[0].clone()) - .map_err(PoolError::from_displayable_as_invalid_data)?; + let cmd_s = try_log!(String::from_utf8(cmd[0].clone()) + .map_err(PoolError::from_displayable_as_invalid_data)); if "exit".eq(cmd_s.as_str()) { actions.push(ZMQLoopAction::Terminate); } else { @@ -325,9 +325,9 @@ impl PoolWorker { let mut poll_items: Vec = Vec::new(); poll_items.push(self.cmd_sock.as_poll_item(zmq::POLLIN)); for ref node in self.handler.nodes() { - let s: &zmq::Socket = node.zsock.as_ref() + let s: &zmq::Socket = try_log!(node.zsock.as_ref() .ok_or(PoolError::InvalidState( - "Try to poll from ZMQ socket for unconnected RemoteNode".to_string()))?; + "Try to poll from ZMQ socket for unconnected RemoteNode".to_string()))); poll_items.push(s.as_poll_item(zmq::POLLIN)); } Ok(poll_items) @@ -336,15 +336,15 @@ impl PoolWorker { fn _restore_merkle_tree(pool_name: &str) -> Result { let mut p = EnvironmentUtils::pool_path(pool_name); - let mut mt = MerkleTree::from_vec(Vec::new())?; + let mut mt = try_log!(MerkleTree::from_vec(Vec::new())); //TODO firstly try to deserialize merkle tree p.push(pool_name); p.set_extension("txn"); - let f = fs::File::open(p)?; + let f = try_log!(fs::File::open(p)); let reader = io::BufReader::new(&f); for line in reader.lines() { - let line: String = line?; - mt.append(line)?; + let line: String = try_log!(line); + try_log!(mt.append(line)); } Ok(mt) } @@ -360,13 +360,13 @@ impl PoolWorker { impl Pool { pub fn new(name: &str, cmd_id: i32) -> Result { let zmq_ctx = zmq::Context::new(); - let recv_cmd_sock = zmq_ctx.socket(zmq::SocketType::PAIR)?; - let send_cmd_sock = zmq_ctx.socket(zmq::SocketType::PAIR)?; + let recv_cmd_sock = try_log!(zmq_ctx.socket(zmq::SocketType::PAIR)); + let send_cmd_sock = try_log!(zmq_ctx.socket(zmq::SocketType::PAIR)); let inproc_sock_name: String = format!("inproc://pool_{}", name); - recv_cmd_sock.bind(inproc_sock_name.as_str())?; + try_log!(recv_cmd_sock.bind(inproc_sock_name.as_str())); - send_cmd_sock.connect(inproc_sock_name.as_str())?; + try_log!(send_cmd_sock.connect(inproc_sock_name.as_str())); let pool_id = SequenceUtils::get_next_id(); let mut pool_worker: PoolWorker = PoolWorker { cmd_sock: recv_cmd_sock, @@ -395,7 +395,7 @@ impl Pool { pub fn send_tx(&self, cmd_id: i32, json: &str) -> Result<(), PoolError> { let mut buf = [0u8; 4]; LittleEndian::write_i32(&mut buf, cmd_id); - Ok(self.cmd_sock.send_multipart(&[json.as_bytes(), &buf], zmq::DONTWAIT)?) + Ok(try_log!(self.cmd_sock.send_multipart(&[json.as_bytes(), &buf], zmq::DONTWAIT))) } } @@ -428,8 +428,8 @@ impl Debug for RemoteNode { impl RemoteNode { fn new(txn: &GenTransaction) -> Result { - let public_key = txn.dest.as_str().from_base58() - .map_err(|e| { PoolError::InvalidData("Invalid field dest in genesis transaction".to_string()) })?; + let public_key = try_log!(txn.dest.as_str().from_base58() + .map_err(|e| { PoolError::InvalidData("Invalid field dest in genesis transaction".to_string()) })); Ok(RemoteNode { verify_key: ED25519::pk_to_curve25519(&public_key), public_key: public_key, @@ -440,17 +440,16 @@ impl RemoteNode { } fn connect(&mut self, ctx: &zmq::Context) -> Result<(), PoolError> { - let key_pair = zmq::CurveKeyPair::new()?; - let s = ctx.socket(zmq::SocketType::DEALER)?; - s.set_identity(key_pair.public_key.as_bytes())?; - s.set_curve_secretkey(key_pair.secret_key.as_str())?; - s.set_curve_publickey(key_pair.public_key.as_str())?; - s.set_curve_serverkey( - zmq::z85_encode(self.verify_key.as_slice()) - .map_err(|err| { PoolError::InvalidData("Can't encode server key as z85".to_string()) })? - .as_str())?; - s.set_linger(0)?; //TODO set correct timeout - s.connect(self.zaddr.as_str())?; + let key_pair = try_log!(zmq::CurveKeyPair::new()); + let s = try_log!(ctx.socket(zmq::SocketType::DEALER)); + try_log!(s.set_identity(key_pair.public_key.as_bytes())); + try_log!(s.set_curve_secretkey(key_pair.secret_key.as_str())); + try_log!(s.set_curve_publickey(key_pair.public_key.as_str())); + let server_key = try_log!(zmq::z85_encode(self.verify_key.as_slice()) + .map_err(|err| { PoolError::InvalidData("Can't encode server key as z85".to_string()) })); + try_log!(s.set_curve_serverkey(server_key.as_str())); + try_log!(s.set_linger(0)); //TODO set correct timeout + try_log!(s.connect(self.zaddr.as_str())); self.zsock = Some(s); Ok(()) } @@ -461,9 +460,12 @@ impl RemoteNode { PoolError::Io(io::Error::from(io::ErrorKind::InvalidData)) } } - let msg: String = self.zsock.as_ref() - .ok_or(PoolError::InvalidState("Try to receive msg for unconnected RemoteNode".to_string()))? - .recv_string(zmq::DONTWAIT)??; + let msg: String = try_log!( + try_log!( + try_log!(self.zsock.as_ref().ok_or(PoolError::InvalidState("Try to receive msg for unconnected RemoteNode".to_string())) + ).recv_string(zmq::DONTWAIT) + ).map_err(|err|{PoolError::InvalidData(format!("{:?}", err))}) + ); info!(target: "RemoteNode_recv_msg", "{} {}", self.name, msg); Ok(Some(msg)) @@ -471,14 +473,14 @@ impl RemoteNode { fn send_str(&self, str: &str) -> Result<(), PoolError> { info!("Sending {:?}", str); - self.zsock.as_ref() - .ok_or(PoolError::InvalidState("Try to send str for unconnected RemoteNode".to_string()))? - .send_str(str, zmq::DONTWAIT)?; + try_log!(try_log!(self.zsock.as_ref() + .ok_or(PoolError::InvalidState("Try to send str for unconnected RemoteNode".to_string()))) + .send_str(str, zmq::DONTWAIT)); Ok(()) } fn send_msg(&self, msg: &Message) -> Result<(), PoolError> { - self.send_str(msg.to_json().map_err(PoolError::from_displayable_as_invalid_data)?.as_str()) + self.send_str(try_log!(msg.to_json().map_err(PoolError::from_displayable_as_invalid_data)).as_str()) } } @@ -492,8 +494,8 @@ impl PoolService { pub fn create(&self, name: &str, config: Option<&str>) -> Result<(), PoolError> { let mut path = EnvironmentUtils::pool_path(name); let pool_config = match config { - Some(config) => PoolConfig::from_json(config) - .map_err(PoolError::from_displayable_as_invalid_config)?, + Some(config) => try_log!(PoolConfig::from_json(config) + .map_err(PoolError::from_displayable_as_invalid_config)), None => PoolConfig::default_for_name(name) }; @@ -501,19 +503,19 @@ impl PoolService { return Err(PoolError::NotCreated("Already created".to_string())); } - fs::create_dir_all(path.as_path())?; + try_log!(fs::create_dir_all(path.as_path())); path.push(name); path.set_extension("txn"); - fs::copy(&pool_config.genesis_txn, path.as_path())?; + try_log!(fs::copy(&pool_config.genesis_txn, path.as_path())); path.pop(); path.push("config"); path.set_extension("json"); - let mut f: fs::File = fs::File::create(path.as_path())?; - f.write(pool_config.to_json() - .map_err(PoolError::from_displayable_as_invalid_config)?.as_bytes())?; - f.flush()?; + let mut f: fs::File = try_log!(fs::File::create(path.as_path())); + try_log!(f.write(try_log!(pool_config.to_json() + .map_err(PoolError::from_displayable_as_invalid_config)).as_bytes())); + try_log!(f.flush()); // TODO probably create another one file pool.json with pool description, // but now there is no info to save (except name witch equal to directory) @@ -526,7 +528,7 @@ impl PoolService { } pub fn open(&self, name: &str, config: Option<&str>) -> Result { - for pool in self.pools.try_borrow()?.values() { + for pool in try_log!(self.pools.try_borrow()).values() { if name.eq(pool.name.as_str()) { //TODO change error return Err(PoolError::InvalidHandle("Already opened".to_string())); @@ -534,18 +536,18 @@ impl PoolService { } let cmd_id: i32 = SequenceUtils::get_next_id(); - let new_pool = Pool::new(name, cmd_id)?; + let new_pool = try_log!(Pool::new(name, cmd_id)); //FIXME process config: check None (use default), transfer to Pool instance - self.pools.try_borrow_mut()?.insert(new_pool.id, new_pool); + try_log!(self.pools.try_borrow_mut()).insert(new_pool.id, new_pool); return Ok(cmd_id); } pub fn send_tx(&self, handle: i32, json: &str) -> Result { let cmd_id: i32 = SequenceUtils::get_next_id(); - self.pools.try_borrow()? - .get(&handle).ok_or(PoolError::InvalidHandle("No pool with requested handle".to_string()))? - .send_tx(cmd_id, json)?; + try_log!(try_log!(try_log!(self.pools.try_borrow()) + .get(&handle).ok_or(PoolError::InvalidHandle("No pool with requested handle".to_string()))) + .send_tx(cmd_id, json)); Ok(cmd_id) } @@ -558,7 +560,7 @@ impl PoolService { } pub fn get_pool_name(&self, handle: i32) -> Result { - self.pools.try_borrow()?.get(&handle).map_or( + try_log!(self.pools.try_borrow()).get(&handle).map_or( Err(PoolError::InvalidHandle("Doesn't exists".to_string())), |pool: &Pool| Ok(pool.name.clone())) }