Skip to content

Commit

Permalink
Use try_log! macro in pool service.
Browse files Browse the repository at this point in the history
  • Loading branch information
jovfer committed May 26, 2017
1 parent 7f3a8f3 commit 2ee8411
Showing 1 changed file with 81 additions and 79 deletions.
160 changes: 81 additions & 79 deletions src/services/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ struct TransactionHandler {

impl PoolWorkerHandler {
fn process_msg(&mut self, raw_msg: &String, src_ind: usize) -> Result<Option<MerkleTree>, PoolError> {
let msg = Message::from_raw_str(raw_msg)
.map_err(PoolError::from_displayable_as_invalid_data)?;
let msg = try_log!(Message::from_raw_str(raw_msg)
.map_err(PoolError::from_displayable_as_invalid_data));
match self {
&mut PoolWorkerHandler::CatchupHandler(ref mut ch) => ch.process_msg(msg, raw_msg, src_ind),
&mut PoolWorkerHandler::TransactionHandler(ref mut ch) => ch.process_msg(msg, raw_msg, src_ind),
Expand Down Expand Up @@ -142,11 +142,11 @@ impl TransactionHandler {

fn try_send_request(&mut self, cmd: &str, cmd_id: i32) -> Result<(), PoolError> {
info!("cmd {:?}", cmd);
let request: Value = serde_json::from_str(cmd)
.map_err(PoolError::from_displayable_as_invalid_data)?;
let request_id: u64 = request["reqId"]
let request: Value = try_log!(serde_json::from_str(cmd)
.map_err(PoolError::from_displayable_as_invalid_data));
let request_id: u64 = try_log!(request["reqId"]
.as_u64()
.ok_or(PoolError::InvalidData("Invalid request: missed requestId field".to_string()))?;
.ok_or(PoolError::InvalidData("Invalid request: missed requestId field".to_string())));
if self.pending_commands.contains_key(&request_id) {
self.pending_commands.get_mut(&request_id).unwrap().cmd_ids.push(cmd_id);
} else {
Expand All @@ -158,7 +158,7 @@ impl TransactionHandler {
self.pending_commands.insert(request_id, pc);
for node in &self.nodes {
let node: &RemoteNode = node;
node.send_str(cmd)?;
try_log!(node.send_str(cmd));
}
}
Ok(())
Expand All @@ -174,12 +174,12 @@ impl TransactionHandler {
for (_, pending_cmd) in &self.pending_commands {
let pending_cmd: &CommandProcess = pending_cmd;
for cmd_id in &pending_cmd.cmd_ids {
CommandExecutor::instance()
try_log!(CommandExecutor::instance()
.send(Command::Ledger(LedgerCommand::SubmitAck(
cmd_id.clone(), Err(PoolError::Terminate))))
.map_err(|err| {
PoolError::InvalidState("Can't send ACK cmd".to_string())
})?;
}));
}
}
Ok(())
Expand All @@ -200,24 +200,24 @@ impl Default for TransactionHandler {

impl PoolWorker {
fn connect_to_known_nodes(&mut self, merkle_tree: Option<&MerkleTree>) -> Result<(), PoolError> {
let merkle_tree: MerkleTree = merkle_tree.map(|x| { x.clone() })
let merkle_tree: MerkleTree = try_log!(merkle_tree.map(|x| { x.clone() })
.or_else(|| {
match self.handler {
//TODO default self.handler.get_default_mt() -> Result<MerkleTree, PoolError>
PoolWorkerHandler::CatchupHandler(ref ch) => Some(ch.merkle_tree.clone()),
PoolWorkerHandler::TransactionHandler(_) => None
}
})
.ok_or(PoolError::InvalidState("Expect catchup state".to_string()))?;
.ok_or(PoolError::InvalidState("Expect catchup state".to_string())));
let ctx: zmq::Context = zmq::Context::new();
for gen_txn in &merkle_tree {
let gen_txn: GenTransaction = GenTransaction::from_json(gen_txn)
let gen_txn: GenTransaction = try_log!(GenTransaction::from_json(gen_txn)
.map_err(|e| {
PoolError::InvalidState(format!("MerkleTree contains invalid data {}", e))
})?;
let mut rn: RemoteNode = RemoteNode::new(&gen_txn)?;
rn.connect(&ctx)?;
rn.send_str("pi")?;
}));
let mut rn: RemoteNode = try_log!(RemoteNode::new(&gen_txn));
try_log!(rn.connect(&ctx));
try_log!(rn.send_str("pi"));
self.handler.nodes_mut().push(rn);
}
self.handler.set_f(PoolWorker::get_f(merkle_tree.count())); //TODO set cnt to connect
Expand All @@ -226,19 +226,19 @@ impl PoolWorker {

fn init_catchup(&mut self) -> Result<(), PoolError> {
let catchup_handler = CatchupHandler {
merkle_tree: PoolWorker::_restore_merkle_tree(self.name.as_str())?,
merkle_tree: try_log!(PoolWorker::_restore_merkle_tree(self.name.as_str())),
open_cmd_id: self.open_cmd_id,
pool_id: self.pool_id,
..Default::default()
};
self.handler = PoolWorkerHandler::CatchupHandler(catchup_handler);
self.connect_to_known_nodes(None)?;
try_log!(self.connect_to_known_nodes(None));
Ok(())
}

pub fn run(&mut self) -> Result<(), PoolError> {
self._run().or_else(|err: PoolError| {
self.handler.flush_requests(Err(PoolError::Terminate))?;
try_log!(self.handler.flush_requests(Err(PoolError::Terminate)));
match err {
PoolError::Terminate => Ok(()),
_ => Err(err),
Expand All @@ -247,14 +247,14 @@ impl PoolWorker {
}

fn _run(&mut self) -> Result<(), PoolError> {
self.init_catchup()?; //TODO consider error as PoolOpen error
try_log!(self.init_catchup()); //TODO consider error as PoolOpen error

loop {
trace!("zmq poll loop >>");

let actions = self.poll_zmq()?;
let actions = try_log!(self.poll_zmq());

self.process_actions(actions)?;
try_log!(self.process_actions(actions));

trace!("zmq poll loop <<");
}
Expand All @@ -267,20 +267,20 @@ impl PoolWorker {
return Err(PoolError::Terminate);
}
&ZMQLoopAction::MessageToProcess(ref msg) => {
if let Some(new_mt) = self.handler.process_msg(&msg.message, msg.node_idx)? {
self.handler.flush_requests(Ok(()))?;
if let Some(new_mt) = try_log!(self.handler.process_msg(&msg.message, msg.node_idx)) {
try_log!(self.handler.flush_requests(Ok(())));
self.handler = PoolWorkerHandler::TransactionHandler(Default::default());
self.connect_to_known_nodes(Some(&new_mt))?;
try_log!(self.connect_to_known_nodes(Some(&new_mt)));
}
}
&ZMQLoopAction::RequestToSend(ref req) => {
self.handler.send_request(req.request.as_str(), req.id).or_else(|err| {
try_log!(self.handler.send_request(req.request.as_str(), req.id).or_else(|err| {
CommandExecutor::instance()
.send(Command::Ledger(LedgerCommand::SubmitAck(req.id, Err(err))))
.map_err(|err| {
PoolError::InvalidState("Can't send ACK cmd".to_string())
})
})?;
}));
}
}
}
Expand All @@ -290,13 +290,13 @@ impl PoolWorker {
fn poll_zmq(&mut self) -> Result<Vec<ZMQLoopAction>, PoolError> {
let mut actions: Vec<ZMQLoopAction> = Vec::new();

let mut poll_items = self.get_zmq_poll_items()?;
let r = zmq::poll(poll_items.as_mut_slice(), -1)?;
let mut poll_items = try_log!(self.get_zmq_poll_items());
let r = try_log!(zmq::poll(poll_items.as_mut_slice(), -1));
trace!("zmq poll {:?}", r);

for i in 0..self.handler.nodes().len() {
if poll_items[1 + i].is_readable() {
if let Some(msg) = self.handler.nodes()[i].recv_msg()? {
if let Some(msg) = try_log!(self.handler.nodes()[i].recv_msg()) {
actions.push(ZMQLoopAction::MessageToProcess(MessageToProcess {
node_idx: i,
message: msg,
Expand All @@ -305,10 +305,10 @@ impl PoolWorker {
}
}
if poll_items[0].is_readable() {
let cmd = self.cmd_sock.recv_multipart(zmq::DONTWAIT)?;
let cmd = try_log!(self.cmd_sock.recv_multipart(zmq::DONTWAIT));
trace!("cmd {:?}", cmd);
let cmd_s = String::from_utf8(cmd[0].clone())
.map_err(PoolError::from_displayable_as_invalid_data)?;
let cmd_s = try_log!(String::from_utf8(cmd[0].clone())
.map_err(PoolError::from_displayable_as_invalid_data));
if "exit".eq(cmd_s.as_str()) {
actions.push(ZMQLoopAction::Terminate);
} else {
Expand All @@ -325,9 +325,9 @@ impl PoolWorker {
let mut poll_items: Vec<zmq::PollItem> = Vec::new();
poll_items.push(self.cmd_sock.as_poll_item(zmq::POLLIN));
for ref node in self.handler.nodes() {
let s: &zmq::Socket = node.zsock.as_ref()
let s: &zmq::Socket = try_log!(node.zsock.as_ref()
.ok_or(PoolError::InvalidState(
"Try to poll from ZMQ socket for unconnected RemoteNode".to_string()))?;
"Try to poll from ZMQ socket for unconnected RemoteNode".to_string())));
poll_items.push(s.as_poll_item(zmq::POLLIN));
}
Ok(poll_items)
Expand All @@ -336,15 +336,15 @@ impl PoolWorker {

fn _restore_merkle_tree(pool_name: &str) -> Result<MerkleTree, PoolError> {
let mut p = EnvironmentUtils::pool_path(pool_name);
let mut mt = MerkleTree::from_vec(Vec::new())?;
let mut mt = try_log!(MerkleTree::from_vec(Vec::new()));
//TODO firstly try to deserialize merkle tree
p.push(pool_name);
p.set_extension("txn");
let f = fs::File::open(p)?;
let f = try_log!(fs::File::open(p));
let reader = io::BufReader::new(&f);
for line in reader.lines() {
let line: String = line?;
mt.append(line)?;
let line: String = try_log!(line);
try_log!(mt.append(line));
}
Ok(mt)
}
Expand All @@ -360,13 +360,13 @@ impl PoolWorker {
impl Pool {
pub fn new(name: &str, cmd_id: i32) -> Result<Pool, PoolError> {
let zmq_ctx = zmq::Context::new();
let recv_cmd_sock = zmq_ctx.socket(zmq::SocketType::PAIR)?;
let send_cmd_sock = zmq_ctx.socket(zmq::SocketType::PAIR)?;
let recv_cmd_sock = try_log!(zmq_ctx.socket(zmq::SocketType::PAIR));
let send_cmd_sock = try_log!(zmq_ctx.socket(zmq::SocketType::PAIR));
let inproc_sock_name: String = format!("inproc://pool_{}", name);

recv_cmd_sock.bind(inproc_sock_name.as_str())?;
try_log!(recv_cmd_sock.bind(inproc_sock_name.as_str()));

send_cmd_sock.connect(inproc_sock_name.as_str())?;
try_log!(send_cmd_sock.connect(inproc_sock_name.as_str()));
let pool_id = SequenceUtils::get_next_id();
let mut pool_worker: PoolWorker = PoolWorker {
cmd_sock: recv_cmd_sock,
Expand Down Expand Up @@ -395,7 +395,7 @@ impl Pool {
pub fn send_tx(&self, cmd_id: i32, json: &str) -> Result<(), PoolError> {
let mut buf = [0u8; 4];
LittleEndian::write_i32(&mut buf, cmd_id);
Ok(self.cmd_sock.send_multipart(&[json.as_bytes(), &buf], zmq::DONTWAIT)?)
Ok(try_log!(self.cmd_sock.send_multipart(&[json.as_bytes(), &buf], zmq::DONTWAIT)))
}
}

Expand Down Expand Up @@ -428,8 +428,8 @@ impl Debug for RemoteNode {

impl RemoteNode {
fn new(txn: &GenTransaction) -> Result<RemoteNode, PoolError> {
let public_key = txn.dest.as_str().from_base58()
.map_err(|e| { PoolError::InvalidData("Invalid field dest in genesis transaction".to_string()) })?;
let public_key = try_log!(txn.dest.as_str().from_base58()
.map_err(|e| { PoolError::InvalidData("Invalid field dest in genesis transaction".to_string()) }));
Ok(RemoteNode {
verify_key: ED25519::pk_to_curve25519(&public_key),
public_key: public_key,
Expand All @@ -440,17 +440,16 @@ impl RemoteNode {
}

fn connect(&mut self, ctx: &zmq::Context) -> Result<(), PoolError> {
let key_pair = zmq::CurveKeyPair::new()?;
let s = ctx.socket(zmq::SocketType::DEALER)?;
s.set_identity(key_pair.public_key.as_bytes())?;
s.set_curve_secretkey(key_pair.secret_key.as_str())?;
s.set_curve_publickey(key_pair.public_key.as_str())?;
s.set_curve_serverkey(
zmq::z85_encode(self.verify_key.as_slice())
.map_err(|err| { PoolError::InvalidData("Can't encode server key as z85".to_string()) })?
.as_str())?;
s.set_linger(0)?; //TODO set correct timeout
s.connect(self.zaddr.as_str())?;
let key_pair = try_log!(zmq::CurveKeyPair::new());
let s = try_log!(ctx.socket(zmq::SocketType::DEALER));
try_log!(s.set_identity(key_pair.public_key.as_bytes()));
try_log!(s.set_curve_secretkey(key_pair.secret_key.as_str()));
try_log!(s.set_curve_publickey(key_pair.public_key.as_str()));
let server_key = try_log!(zmq::z85_encode(self.verify_key.as_slice())
.map_err(|err| { PoolError::InvalidData("Can't encode server key as z85".to_string()) }));
try_log!(s.set_curve_serverkey(server_key.as_str()));
try_log!(s.set_linger(0)); //TODO set correct timeout
try_log!(s.connect(self.zaddr.as_str()));
self.zsock = Some(s);
Ok(())
}
Expand All @@ -461,24 +460,27 @@ impl RemoteNode {
PoolError::Io(io::Error::from(io::ErrorKind::InvalidData))
}
}
let msg: String = self.zsock.as_ref()
.ok_or(PoolError::InvalidState("Try to receive msg for unconnected RemoteNode".to_string()))?
.recv_string(zmq::DONTWAIT)??;
let msg: String = try_log!(
try_log!(
try_log!(self.zsock.as_ref().ok_or(PoolError::InvalidState("Try to receive msg for unconnected RemoteNode".to_string()))
).recv_string(zmq::DONTWAIT)
).map_err(|err|{PoolError::InvalidData(format!("{:?}", err))})
);
info!(target: "RemoteNode_recv_msg", "{} {}", self.name, msg);

Ok(Some(msg))
}

fn send_str(&self, str: &str) -> Result<(), PoolError> {
info!("Sending {:?}", str);
self.zsock.as_ref()
.ok_or(PoolError::InvalidState("Try to send str for unconnected RemoteNode".to_string()))?
.send_str(str, zmq::DONTWAIT)?;
try_log!(try_log!(self.zsock.as_ref()
.ok_or(PoolError::InvalidState("Try to send str for unconnected RemoteNode".to_string())))
.send_str(str, zmq::DONTWAIT));
Ok(())
}

fn send_msg(&self, msg: &Message) -> Result<(), PoolError> {
self.send_str(msg.to_json().map_err(PoolError::from_displayable_as_invalid_data)?.as_str())
self.send_str(try_log!(msg.to_json().map_err(PoolError::from_displayable_as_invalid_data)).as_str())
}
}

Expand All @@ -492,28 +494,28 @@ impl PoolService {
pub fn create(&self, name: &str, config: Option<&str>) -> Result<(), PoolError> {
let mut path = EnvironmentUtils::pool_path(name);
let pool_config = match config {
Some(config) => PoolConfig::from_json(config)
.map_err(PoolError::from_displayable_as_invalid_config)?,
Some(config) => try_log!(PoolConfig::from_json(config)
.map_err(PoolError::from_displayable_as_invalid_config)),
None => PoolConfig::default_for_name(name)
};

if path.as_path().exists() {
return Err(PoolError::NotCreated("Already created".to_string()));
}

fs::create_dir_all(path.as_path())?;
try_log!(fs::create_dir_all(path.as_path()));

path.push(name);
path.set_extension("txn");
fs::copy(&pool_config.genesis_txn, path.as_path())?;
try_log!(fs::copy(&pool_config.genesis_txn, path.as_path()));
path.pop();

path.push("config");
path.set_extension("json");
let mut f: fs::File = fs::File::create(path.as_path())?;
f.write(pool_config.to_json()
.map_err(PoolError::from_displayable_as_invalid_config)?.as_bytes())?;
f.flush()?;
let mut f: fs::File = try_log!(fs::File::create(path.as_path()));
try_log!(f.write(try_log!(pool_config.to_json()
.map_err(PoolError::from_displayable_as_invalid_config)).as_bytes()));
try_log!(f.flush());

// TODO probably create another one file pool.json with pool description,
// but now there is no info to save (except name witch equal to directory)
Expand All @@ -526,26 +528,26 @@ impl PoolService {
}

pub fn open(&self, name: &str, config: Option<&str>) -> Result<i32, PoolError> {
for pool in self.pools.try_borrow()?.values() {
for pool in try_log!(self.pools.try_borrow()).values() {
if name.eq(pool.name.as_str()) {
//TODO change error
return Err(PoolError::InvalidHandle("Already opened".to_string()));
}
}

let cmd_id: i32 = SequenceUtils::get_next_id();
let new_pool = Pool::new(name, cmd_id)?;
let new_pool = try_log!(Pool::new(name, cmd_id));
//FIXME process config: check None (use default), transfer to Pool instance

self.pools.try_borrow_mut()?.insert(new_pool.id, new_pool);
try_log!(self.pools.try_borrow_mut()).insert(new_pool.id, new_pool);
return Ok(cmd_id);
}

pub fn send_tx(&self, handle: i32, json: &str) -> Result<i32, PoolError> {
let cmd_id: i32 = SequenceUtils::get_next_id();
self.pools.try_borrow()?
.get(&handle).ok_or(PoolError::InvalidHandle("No pool with requested handle".to_string()))?
.send_tx(cmd_id, json)?;
try_log!(try_log!(try_log!(self.pools.try_borrow())
.get(&handle).ok_or(PoolError::InvalidHandle("No pool with requested handle".to_string())))
.send_tx(cmd_id, json));
Ok(cmd_id)
}

Expand All @@ -558,7 +560,7 @@ impl PoolService {
}

pub fn get_pool_name(&self, handle: i32) -> Result<String, PoolError> {
self.pools.try_borrow()?.get(&handle).map_or(
try_log!(self.pools.try_borrow()).get(&handle).map_or(
Err(PoolError::InvalidHandle("Doesn't exists".to_string())),
|pool: &Pool| Ok(pool.name.clone()))
}
Expand Down

0 comments on commit 2ee8411

Please sign in to comment.