Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
LemonHX committed Apr 6, 2022
1 parent abb263c commit 1d6fb3b
Showing 1 changed file with 33 additions and 9 deletions.
42 changes: 33 additions & 9 deletions rpc/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::chain_watcher::{ChainWatcher, StartSubscribe, WatchBlock, WatchTxn};
use crate::pubsub_client::PubSubClient;
pub use crate::remote_state_reader::{RemoteStateReader, StateRootOption};
use actix::{Addr, System};
use actix::{Addr, Arbiter, System};
use anyhow::anyhow;
use bcs_ext::BCSCodec;
use futures::channel::oneshot;
Expand Down Expand Up @@ -89,7 +89,8 @@ pub struct RpcClient {
provider: ConnectionProvider,
chain_watcher: Addr<ChainWatcher>,
//hold the watch thread handle.
watcher_handle: JoinHandle<()>,
watcher_thread: JoinHandle<()>,
watcher_thread_exit_sender: oneshot::Sender<()>,
}

struct ConnectionProvider {
Expand Down Expand Up @@ -134,12 +135,25 @@ impl RpcClient {
let provider = ConnectionProvider::new(conn_source, Runtime::new()?);
let inner: RpcClientInner = provider.get_rpc_channel().map_err(map_err)?.into(); //Self::create_client_inner(conn_source.clone()).map_err(map_err)?;
let pubsub_client = inner.pubsub_client.clone();
let (handle_exit_sender, handle_exit_receiver) = oneshot::channel::<()>();
let handle = std::thread::spawn(move || {
let sys = System::new("client-actix-system");
let watcher = ChainWatcher::launch();

tx.send(watcher).unwrap();
let _ = sys.run();
let _sys = System::with_tokio_rt(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.on_thread_stop(|| println!("client-actix-system thread stopped"))
.thread_name("client-actix-system")
.build()
.expect("failed to create tokio runtime for client-actix-system")
});
let arbiter = Arbiter::new();
let spawn_result = arbiter.spawn(async {
let watcher = ChainWatcher::launch();
tx.send(watcher).unwrap();
});
assert!(spawn_result);
futures::executor::block_on(async {
let _ = handle_exit_receiver.await;
});
});
let watcher = futures::executor::block_on(rx).expect("Init chain watcher fail.");
watcher.do_send(StartSubscribe {
Expand All @@ -149,7 +163,8 @@ impl RpcClient {
inner: Mutex::new(Some(inner)),
provider,
chain_watcher: watcher,
watcher_handle: handle,
watcher_thread: handle,
watcher_thread_exit_sender: handle_exit_sender,
})
}

Expand Down Expand Up @@ -282,6 +297,11 @@ impl RpcClient {
.map_err(map_err)
}

pub fn submit_hex_transaction(&self, txn: String) -> anyhow::Result<HashValue> {
self.call_rpc_blocking(|inner| inner.txpool_client.submit_hex_transaction(txn))
.map_err(map_err)
}

pub fn get_pending_txn_by_hash(
&self,
txn_hash: HashValue,
Expand Down Expand Up @@ -1011,7 +1031,11 @@ impl RpcClient {
if let Err(e) = self.chain_watcher.try_send(chain_watcher::StopWatcher) {
error!("Try to stop chain watcher error: {:?}", e);
}
if let Err(e) = self.watcher_handle.join() {

if let Err(e) = {
self.watcher_thread_exit_sender.send(()).unwrap();
self.watcher_thread.join()
} {
error!("Wait chain watcher thread stop error: {:?}", e);
}
}
Expand Down

0 comments on commit 1d6fb3b

Please sign in to comment.