From 74b9597a6b906ac2948f1a82efc9e5d991ccbfd4 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Thu, 5 Aug 2021 19:02:26 +0800 Subject: [PATCH 01/11] Add the session and pool. --- .../tests/transport_tokio_io.rs | 2 +- nebula_rust/Cargo.toml | 1 + nebula_rust/src/graph_client/connection.rs | 41 ++++++-- .../src/graph_client/connection_pool.rs | 97 ++++++++++++++++++- nebula_rust/src/graph_client/mod.rs | 3 + nebula_rust/src/graph_client/pool_config.rs | 56 ++++++++++- nebula_rust/src/graph_client/session.rs | 80 ++++++++++++++- nebula_rust/src/lib.rs | 1 + nebula_rust/src/value/data_set.rs | 30 ++++++ nebula_rust/src/value/mod.rs | 11 +++ nebula_rust/src/value/row.rs | 26 +++++ nebula_rust/tests/test_connection.rs | 6 +- nebula_rust/tests/test_connection_pool.rs | 40 ++++++++ nebula_rust/tests/test_session.rs | 32 ++++++ 14 files changed, 412 insertions(+), 14 deletions(-) create mode 100644 nebula_rust/src/value/data_set.rs create mode 100644 nebula_rust/src/value/mod.rs create mode 100644 nebula_rust/src/value/row.rs create mode 100644 nebula_rust/tests/test_connection_pool.rs create mode 100644 nebula_rust/tests/test_session.rs diff --git a/fbthrift-transport/tests/transport_tokio_io.rs b/fbthrift-transport/tests/transport_tokio_io.rs index 06121c7..ab4ae9e 100644 --- a/fbthrift-transport/tests/transport_tokio_io.rs +++ b/fbthrift-transport/tests/transport_tokio_io.rs @@ -23,8 +23,8 @@ mod transport_tokio_io_tests { task::JoinHandle, }; - use nebula_fbthrift_transport::AsyncTransport; use fbthrift_transport_response_handler::ResponseHandler; + use nebula_fbthrift_transport::AsyncTransport; #[derive(Clone)] pub struct FooResponseHandler; diff --git a/nebula_rust/Cargo.toml b/nebula_rust/Cargo.toml index 06604f6..a7e8e5e 100644 --- a/nebula_rust/Cargo.toml +++ b/nebula_rust/Cargo.toml @@ -22,6 +22,7 @@ tokio = { version = "1.8.2", features = ["full"] } fbthrift = { version = "0.0.2" } fbthrift-transport = { path = "../fbthrift-transport", package = "nebula-fbthrift-transport" , features = ["tokio_io"], version = "0.0.2" } bytes = { version = "0.5" } +futures = { version = "0.3.16" } [build-dependencies] diff --git a/nebula_rust/src/graph_client/connection.rs b/nebula_rust/src/graph_client/connection.rs index 0999519..10e19e0 100644 --- a/nebula_rust/src/graph_client/connection.rs +++ b/nebula_rust/src/graph_client/connection.rs @@ -13,18 +13,24 @@ use tokio::net::TcpStream; use crate::graph_client::transport_response_handler; +/// The simple abstraction of a connection to nebula graph server +#[derive(Default)] pub struct Connection { - client: client::GraphServiceImpl< - BinaryProtocol, - AsyncTransport, + // The option is used to construct a null connection + // which is used to give back the connection to pool from session + // So we could assume it's alway not null + client: Option< + client::GraphServiceImpl< + BinaryProtocol, + AsyncTransport, + >, >, } impl Connection { - /// Create connection with the specified [host:port] - pub async fn new(host: &str, port: i32) -> Result { - let addr = format!("{}:{}", host, port); - let stream = TcpStream::connect(addr).await?; + /// Create connection with the specified [host:port] address + pub async fn new_from_address(address: &str) -> Result { + let stream = TcpStream::connect(address).await?; let transport = AsyncTransport::new( stream, AsyncTransportConfiguration::new( @@ -32,11 +38,20 @@ impl Connection { ), ); Ok(Connection { - client: client::GraphServiceImpl::new(transport), + client: Some(client::GraphServiceImpl::new(transport)), }) } + /// Create connection with the specified [host:port] + pub async fn new(host: &str, port: i32) -> Result { + let address = format!("{}:{}", host, port); + Connection::new_from_address(&address).await + } + /// Authenticate by username and password + /// The returned error of `Result` only means the request/response status + /// The error from Nebula Graph is still in `error_code` field in response, so you need check it + /// to known wether authenticate succeeded pub async fn authenticate( &self, username: &str, @@ -44,6 +59,8 @@ impl Connection { ) -> std::result::Result { let result = self .client + .as_ref() + .unwrap() .authenticate( &username.to_string().into_bytes(), &password.to_string().into_bytes(), @@ -56,11 +73,12 @@ impl Connection { } /// Sign out the authentication by session id which got by authenticating previous + /// The returned error of `Result` only means the request/response status pub async fn signout( &self, session_id: i64, ) -> std::result::Result<(), common::types::ErrorCode> { - let result = self.client.signout(session_id).await; + let result = self.client.as_ref().unwrap().signout(session_id).await; if let Err(_) = result { return Err(common::types::ErrorCode::E_RPC_FAILURE); } @@ -68,6 +86,9 @@ impl Connection { } /// Execute the query with current session id which got by authenticating previous + /// The returned error of `Result` only means the request/response status + /// The error from Nebula Graph is still in `error_code` field in response, so you need check it + /// to known wether the query execute succeeded pub async fn execute( &self, session_id: i64, @@ -75,6 +96,8 @@ impl Connection { ) -> std::result::Result { let result = self .client + .as_ref() + .unwrap() .execute(session_id, &query.to_string().into_bytes()) .await; if let Err(_) = result { diff --git a/nebula_rust/src/graph_client/connection_pool.rs b/nebula_rust/src/graph_client/connection_pool.rs index c4e7abb..f6b730b 100644 --- a/nebula_rust/src/graph_client/connection_pool.rs +++ b/nebula_rust/src/graph_client/connection_pool.rs @@ -4,4 +4,99 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ -mod graph_client {}; +use crate::graph_client::connection::Connection; +use crate::graph_client::pool_config::PoolConfig; +use crate::graph_client::session::Session; + +/// The pool of connection to server, it's MT-safe to access. +pub struct ConnectionPool { + // The connections + conns: std::sync::Mutex>>, + // It should be immutable + config: PoolConfig, + // Address cursor + cursor: std::sync::atomic::AtomicUsize, +} + +impl ConnectionPool { + /// Construct pool by the configuration + pub async fn new(conf: &PoolConfig) -> Self { + let conns = std::collections::LinkedList::::new(); + let mut pool = ConnectionPool { + conns: std::sync::Mutex::new(std::cell::RefCell::new(conns)), + config: conf.clone(), + cursor: std::sync::atomic::AtomicUsize::new(0), + }; + pool.new_connection(pool.config.min_connection_pool_size) + .await; + pool + } + + /// Get a session authenticated by username and password + /// retry_connect means keep the connection available if true + pub async fn get_session( + &self, + username: &str, + password: &str, + retry_connect: bool, + ) -> std::result::Result, common::types::ErrorCode> { + let conn = self.conns.lock().unwrap().borrow_mut().pop_back(); + if let Some(conn) = conn { + let resp = conn.authenticate(username, password).await?; + if resp.error_code != common::types::ErrorCode::SUCCEEDED { + return Err(resp.error_code); + } + Ok(Session::new( + resp.session_id.unwrap(), + conn, + self, + username.to_string(), + password.to_string(), + if let Some(time_zone_name) = resp.time_zone_name { + std::str::from_utf8(&time_zone_name).unwrap().to_string() + } else { + String::new() + }, + resp.time_zone_offset_seconds.unwrap(), + retry_connect, + )) + } else { + Err(common::types::ErrorCode::E_UNKNOWN) + } + } + + /// Give back the connection to pool + #[inline] + pub fn give_back(&self, conn: Connection) { + self.conns.lock().unwrap().borrow_mut().push_back(conn); + } + + /// Get the count of connections + #[inline] + pub fn len(&self) -> usize { + self.conns.lock().unwrap().borrow().len() + } + + // Add new connection to pool + async fn new_connection(&mut self, inc: u32) { + assert!(inc != 0); + // TODO concurrent these + for _ in 0..inc { + let cursor = { self.cursor() }; + match Connection::new_from_address(&self.config.addresses[cursor]).await { + Ok(conn) => self.conns.lock().unwrap().borrow_mut().push_back(conn), + Err(_) => (), + }; + } + } + + fn cursor(&mut self) -> usize { + if self.cursor.load(std::sync::atomic::Ordering::Relaxed) >= self.config.addresses.len() { + self.cursor.store(0, std::sync::atomic::Ordering::Relaxed); + 0 + } else { + self.cursor + .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + } + } +} diff --git a/nebula_rust/src/graph_client/mod.rs b/nebula_rust/src/graph_client/mod.rs index 99cf0db..9aeaa93 100644 --- a/nebula_rust/src/graph_client/mod.rs +++ b/nebula_rust/src/graph_client/mod.rs @@ -5,4 +5,7 @@ */ pub mod connection; +pub mod connection_pool; +pub mod pool_config; +pub mod session; mod transport_response_handler; diff --git a/nebula_rust/src/graph_client/pool_config.rs b/nebula_rust/src/graph_client/pool_config.rs index 33c14ca..3e3de23 100644 --- a/nebula_rust/src/graph_client/pool_config.rs +++ b/nebula_rust/src/graph_client/pool_config.rs @@ -4,4 +4,58 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ -mod graph_client {}; +#[derive(Debug, Default, Clone)] +pub struct PoolConfig { + /// connection timeout in ms + pub timeout: u32, + pub idle_time: u32, + /// max limit count of connections in pool + pub max_connection_pool_size: u32, + /// min limit count of connections in pool, also the initial count if works well + pub min_connection_pool_size: u32, + /// address of graph server + pub addresses: std::vec::Vec, +} + +impl PoolConfig { + #[inline] + pub fn new() -> Self { + Self::default() + } + + #[inline] + pub fn timeout(&mut self, timeout: u32) -> &mut Self { + self.timeout = timeout; + self + } + + #[inline] + pub fn idle_time(&mut self, idle_time: u32) -> &mut Self { + self.idle_time = idle_time; + self + } + + #[inline] + pub fn max_connection_pool_size(&mut self, size: u32) -> &mut Self { + self.max_connection_pool_size = size; + self + } + + #[inline] + pub fn min_connection_pool_size(&mut self, size: u32) -> &mut Self { + self.min_connection_pool_size = size; + self + } + + #[inline] + pub fn addresses(&mut self, addresses: std::vec::Vec) -> &mut Self { + self.addresses = addresses; + self + } + + #[inline] + pub fn address(&mut self, address: String) -> &mut Self { + self.addresses.push(address); + self + } +} diff --git a/nebula_rust/src/graph_client/session.rs b/nebula_rust/src/graph_client/session.rs index c4e7abb..b215abc 100644 --- a/nebula_rust/src/graph_client/session.rs +++ b/nebula_rust/src/graph_client/session.rs @@ -4,4 +4,82 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ -mod graph_client {}; +use crate::graph_client::connection::Connection; +use crate::graph_client::connection_pool::ConnectionPool; + +pub struct Session<'a> { + session_id: i64, + conn: Connection, + pool: &'a ConnectionPool, + username: String, + password: String, + // empty means not a named timezone + time_zone_name: String, + // Offset to utc in seconds + offset_secs: i32, + // Keep connection if true + retry_connect: bool, +} + +impl<'a> Session<'a> { + pub fn new( + session_id: i64, + conn: Connection, + pool: &'a ConnectionPool, + username: String, + password: String, + time_zone_name: String, + offset_secs: i32, + retry_connect: bool, + ) -> Self { + Session { + session_id: session_id, + conn: conn, + pool: pool, + username: username, + password: password, + time_zone_name: time_zone_name, + offset_secs: offset_secs, + retry_connect: retry_connect, + } + } + + /// sign out the session + #[inline] + pub async fn signout(&self) -> std::result::Result<(), common::types::ErrorCode> { + self.conn.signout(self.session_id).await + } + + /// Execute the query in current session + /// The returned error of `Result` only means the request/response status + /// The error from Nebula Graph is still in `error_code` field in response, so you need check it + /// to known wether the query execute succeeded + #[inline] + pub async fn execute( + &self, + query: &str, + ) -> std::result::Result { + self.conn.execute(self.session_id, query).await + } + + /// Get the time zone name + #[inline] + pub fn time_zone_name(&self) -> &str { + &self.time_zone_name + } + + /// Get the time zone offset to UTC in seconds + #[inline] + pub fn offset_secs(&self) -> i32 { + self.offset_secs + } +} + +impl<'a> Drop for Session<'a> { + /// Drop session will sign out the session in server + /// and give back connection to pool + fn drop(&mut self) { + futures::executor::block_on(self.signout()); + self.pool.give_back(std::mem::take(&mut self.conn)); + } +} diff --git a/nebula_rust/src/lib.rs b/nebula_rust/src/lib.rs index e87dc26..5ef11a8 100644 --- a/nebula_rust/src/lib.rs +++ b/nebula_rust/src/lib.rs @@ -5,3 +5,4 @@ */ pub mod graph_client; +pub mod value; diff --git a/nebula_rust/src/value/data_set.rs b/nebula_rust/src/value/data_set.rs new file mode 100644 index 0000000..89dcfde --- /dev/null +++ b/nebula_rust/src/value/data_set.rs @@ -0,0 +1,30 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +use common::types::DataSet; + +pub trait DataSetValue { + /// Construct data set with name of columns + fn new(col_names: &[String]) -> Self; + + /// push one row into back of data set + fn push(&mut self, row: common::types::Row); +} + +impl DataSetValue for DataSet { + fn new(col_names: &[String]) -> Self { + let cols = col_names.to_vec(); + let cols_bytes = cols.into_iter().map(|s| s.as_bytes().to_vec()).collect(); + DataSet { + column_names: cols_bytes, + rows: vec![], + } + } + + fn push(&mut self, row: common::types::Row) { + self.rows.push(row); + } +} diff --git a/nebula_rust/src/value/mod.rs b/nebula_rust/src/value/mod.rs new file mode 100644 index 0000000..35723d2 --- /dev/null +++ b/nebula_rust/src/value/mod.rs @@ -0,0 +1,11 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + + +/// Some extension of the thrift value + +pub mod data_set; +pub mod row; diff --git a/nebula_rust/src/value/row.rs b/nebula_rust/src/value/row.rs new file mode 100644 index 0000000..87c9620 --- /dev/null +++ b/nebula_rust/src/value/row.rs @@ -0,0 +1,26 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +use common::types::Row; + +pub trait RowValue { + fn new(row: &[common::types::Value]) -> Self; + + fn len(&self) -> usize; +} + +impl RowValue for Row { + fn new(row: &[common::types::Value]) -> Self { + Row { + values: row.to_vec(), + } + } + + #[inline] + fn len(&self) -> usize { + self.values.len() + } +} diff --git a/nebula_rust/tests/test_connection.rs b/nebula_rust/tests/test_connection.rs index dd21ec0..f3a8729 100644 --- a/nebula_rust/tests/test_connection.rs +++ b/nebula_rust/tests/test_connection.rs @@ -9,6 +9,8 @@ extern crate nebula_rust; #[cfg(test)] mod test_connection { use nebula_rust::graph_client; + use nebula_rust::value::data_set::DataSetValue; + use nebula_rust::value::row::RowValue; #[tokio::test] async fn basic_op() { @@ -26,7 +28,9 @@ mod test_connection { assert!(result.is_ok()); let response = result.unwrap(); assert!(response.error_code == common::types::ErrorCode::SUCCEEDED); - println!("{:?}", response.data.unwrap()); + let mut dt = common::types::DataSet::new(&["1".to_string()]); + dt.push(common::types::Row::new(&[common::types::Value::iVal(1)])); + assert!(dt == response.data.unwrap()); let result = conn.signout(session_id).await; assert!(result.is_ok()); diff --git a/nebula_rust/tests/test_connection_pool.rs b/nebula_rust/tests/test_connection_pool.rs new file mode 100644 index 0000000..9a39bce --- /dev/null +++ b/nebula_rust/tests/test_connection_pool.rs @@ -0,0 +1,40 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +extern crate nebula_rust; + +#[cfg(test)] +mod test_connection { + use nebula_rust::graph_client; + use nebula_rust::value::data_set::DataSetValue; + use nebula_rust::value::row::RowValue; + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn mt_safe() { + let mut conf = graph_client::pool_config::PoolConfig::new(); + conf.min_connection_pool_size(10) + .max_connection_pool_size(10) + .address("localhost:9669".to_string()); + let pool = graph_client::connection_pool::ConnectionPool::new(&conf).await; + + { + let mut futs = vec![]; + for _ in 0..10 { + futs.push(pool.get_session("root", "nebula", true)); + } + let sessions = futures::future::join_all(futs).await; + for session in &sessions { + let resp = session.as_ref().unwrap().execute("YIELD 1").await.unwrap(); + assert!(resp.error_code == common::types::ErrorCode::SUCCEEDED); + + let mut dt = common::types::DataSet::new(&["1".to_string()]); + dt.push(common::types::Row::new(&[common::types::Value::iVal(1)])); + assert!(dt == resp.data.unwrap()); + } + } + assert!(pool.len() == 10); + } +} diff --git a/nebula_rust/tests/test_session.rs b/nebula_rust/tests/test_session.rs new file mode 100644 index 0000000..8c63a46 --- /dev/null +++ b/nebula_rust/tests/test_session.rs @@ -0,0 +1,32 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +extern crate nebula_rust; + +#[cfg(test)] +mod test_connection { + use nebula_rust::graph_client; + use nebula_rust::value::data_set::DataSetValue; + use nebula_rust::value::row::RowValue; + + #[tokio::test] + async fn basic_op() { + let mut conf = graph_client::pool_config::PoolConfig::new(); + conf.min_connection_pool_size(2) + .max_connection_pool_size(10) + .address("localhost:9669".to_string()); + + let pool = graph_client::connection_pool::ConnectionPool::new(&conf).await; + let session = pool.get_session("root", "nebula", true).await.unwrap(); + + let resp = session.execute("YIELD 1").await.unwrap(); + assert!(resp.error_code == common::types::ErrorCode::SUCCEEDED); + + let mut dt = common::types::DataSet::new(&["1".to_string()]); + dt.push(common::types::Row::new(&[common::types::Value::iVal(1)])); + assert!(dt == resp.data.unwrap()); + } +} From a87e425bf81c6d46f3c38b2954b7ea076d825ce1 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Fri, 6 Aug 2021 10:38:59 +0800 Subject: [PATCH 02/11] Comment the interior mutable. --- nebula_rust/src/graph_client/connection_pool.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/nebula_rust/src/graph_client/connection_pool.rs b/nebula_rust/src/graph_client/connection_pool.rs index f6b730b..4f81b25 100644 --- a/nebula_rust/src/graph_client/connection_pool.rs +++ b/nebula_rust/src/graph_client/connection_pool.rs @@ -11,6 +11,7 @@ use crate::graph_client::session::Session; /// The pool of connection to server, it's MT-safe to access. pub struct ConnectionPool { // The connections + // The interior mutable to enable could get multiple sessions in one scope conns: std::sync::Mutex>>, // It should be immutable config: PoolConfig, From e58472a69e2033c2e52141e0615fae32427be784 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Fri, 6 Aug 2021 10:51:05 +0800 Subject: [PATCH 03/11] Add some methods for value type. --- nebula_rust/src/value/data_set.rs | 23 +++++++++++++++++++++++ nebula_rust/src/value/mod.rs | 2 -- nebula_rust/src/value/row.rs | 11 +++++++++++ 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/nebula_rust/src/value/data_set.rs b/nebula_rust/src/value/data_set.rs index 89dcfde..38c8cc1 100644 --- a/nebula_rust/src/value/data_set.rs +++ b/nebula_rust/src/value/data_set.rs @@ -10,8 +10,14 @@ pub trait DataSetValue { /// Construct data set with name of columns fn new(col_names: &[String]) -> Self; + /// Construct data set from vec of columns name + fn from_columns_name(col_names: std::vec::Vec) -> Self; + /// push one row into back of data set fn push(&mut self, row: common::types::Row); + + /// Get rows size + fn len(&self) -> usize; } impl DataSetValue for DataSet { @@ -24,7 +30,24 @@ impl DataSetValue for DataSet { } } + fn from_columns_name(col_names: std::vec::Vec) -> Self { + let cols_bytes = col_names + .into_iter() + .map(|s| s.as_bytes().to_vec()) + .collect(); + DataSet { + column_names: cols_bytes, + rows: vec![], + } + } + + #[inline] fn push(&mut self, row: common::types::Row) { self.rows.push(row); } + + #[inline] + fn len(&self) -> usize { + self.rows.len() + } } diff --git a/nebula_rust/src/value/mod.rs b/nebula_rust/src/value/mod.rs index 35723d2..513df73 100644 --- a/nebula_rust/src/value/mod.rs +++ b/nebula_rust/src/value/mod.rs @@ -4,8 +4,6 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ - /// Some extension of the thrift value - pub mod data_set; pub mod row; diff --git a/nebula_rust/src/value/row.rs b/nebula_rust/src/value/row.rs index 87c9620..4745567 100644 --- a/nebula_rust/src/value/row.rs +++ b/nebula_rust/src/value/row.rs @@ -7,18 +7,29 @@ use common::types::Row; pub trait RowValue { + /// Construct row by columns name fn new(row: &[common::types::Value]) -> Self; + /// Construct row by vec of column name + fn from_vec(row: std::vec::Vec) -> Self; + + /// Get row length fn len(&self) -> usize; } impl RowValue for Row { + #[inline] fn new(row: &[common::types::Value]) -> Self { Row { values: row.to_vec(), } } + #[inline] + fn from_vec(row: std::vec::Vec) -> Self { + Row { values: row } + } + #[inline] fn len(&self) -> usize { self.values.len() From bba2218e4caed4821394c7ea71cd5fadd13aca78 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Fri, 6 Aug 2021 11:50:40 +0800 Subject: [PATCH 04/11] Try to fit the connection count. --- nebula_rust/src/graph_client/connection_pool.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/nebula_rust/src/graph_client/connection_pool.rs b/nebula_rust/src/graph_client/connection_pool.rs index 4f81b25..b16d0a8 100644 --- a/nebula_rust/src/graph_client/connection_pool.rs +++ b/nebula_rust/src/graph_client/connection_pool.rs @@ -79,15 +79,25 @@ impl ConnectionPool { } // Add new connection to pool + // inc is the count of new connection created, which shouldn't be zero + // the incremental count maybe can't fit when occurs error in connection creating async fn new_connection(&mut self, inc: u32) { assert!(inc != 0); // TODO concurrent these - for _ in 0..inc { + let mut count = 0; + let mut loop_count = 0; + let loop_limit = inc as usize * self.config.addresses.len(); + while count < inc { let cursor = { self.cursor() }; match Connection::new_from_address(&self.config.addresses[cursor]).await { - Ok(conn) => self.conns.lock().unwrap().borrow_mut().push_back(conn), + Ok(conn) => { self.conns.lock().unwrap().borrow_mut().push_back(conn); count += 1; }, Err(_) => (), }; + loop_count += 1; + if loop_count > loop_limit { + // Can't get so many connections, avoid dead loop + break; + } } } From d527e20828277e9c1acd390a8b3abf420af9fbf7 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Fri, 6 Aug 2021 16:41:38 +0800 Subject: [PATCH 05/11] Support pool expansion. --- .../src/graph_client/connection_pool.rs | 57 +++++++++++++++---- nebula_rust/tests/test_connection_pool.rs | 11 +++- 2 files changed, 54 insertions(+), 14 deletions(-) diff --git a/nebula_rust/src/graph_client/connection_pool.rs b/nebula_rust/src/graph_client/connection_pool.rs index b16d0a8..1846657 100644 --- a/nebula_rust/src/graph_client/connection_pool.rs +++ b/nebula_rust/src/graph_client/connection_pool.rs @@ -10,24 +10,28 @@ use crate::graph_client::session::Session; /// The pool of connection to server, it's MT-safe to access. pub struct ConnectionPool { - // The connections - // The interior mutable to enable could get multiple sessions in one scope + /// The connections + /// The interior mutable to enable could get multiple sessions in one scope conns: std::sync::Mutex>>, - // It should be immutable + /// It should be immutable config: PoolConfig, - // Address cursor - cursor: std::sync::atomic::AtomicUsize, + /// Address cursor + cursor: std::cell::RefCell, + /// The total count of connections, contains which hold by session + conns_count: std::cell::RefCell, } impl ConnectionPool { /// Construct pool by the configuration pub async fn new(conf: &PoolConfig) -> Self { let conns = std::collections::LinkedList::::new(); - let mut pool = ConnectionPool { + let pool = ConnectionPool { conns: std::sync::Mutex::new(std::cell::RefCell::new(conns)), config: conf.clone(), - cursor: std::sync::atomic::AtomicUsize::new(0), + cursor: std::cell::RefCell::new(std::sync::atomic::AtomicUsize::new(0)), + conns_count: std::cell::RefCell::new(std::sync::atomic::AtomicUsize::new(0)), }; + assert!(pool.config.min_connection_pool_size <= pool.config.max_connection_pool_size); pool.new_connection(pool.config.min_connection_pool_size) .await; pool @@ -41,6 +45,9 @@ impl ConnectionPool { password: &str, retry_connect: bool, ) -> std::result::Result, common::types::ErrorCode> { + if self.conns.lock().unwrap().borrow_mut().is_empty() { + self.new_connection(1).await; + } let conn = self.conns.lock().unwrap().borrow_mut().pop_back(); if let Some(conn) = conn { let resp = conn.authenticate(username, password).await?; @@ -81,16 +88,29 @@ impl ConnectionPool { // Add new connection to pool // inc is the count of new connection created, which shouldn't be zero // the incremental count maybe can't fit when occurs error in connection creating - async fn new_connection(&mut self, inc: u32) { + async fn new_connection(&self, inc: u32) { assert!(inc != 0); // TODO concurrent these let mut count = 0; let mut loop_count = 0; let loop_limit = inc as usize * self.config.addresses.len(); while count < inc { + if count as usize + + self + .conns_count + .borrow() + .load(std::sync::atomic::Ordering::Acquire) + >= self.config.max_connection_pool_size as usize + { + // Reach the pool size limit + break; + } let cursor = { self.cursor() }; match Connection::new_from_address(&self.config.addresses[cursor]).await { - Ok(conn) => { self.conns.lock().unwrap().borrow_mut().push_back(conn); count += 1; }, + Ok(conn) => { + self.conns.lock().unwrap().borrow_mut().push_back(conn); + count += 1; + } Err(_) => (), }; loop_count += 1; @@ -99,14 +119,27 @@ impl ConnectionPool { break; } } + // Release ordering make sure inc happened after creating new connections + self.conns_count + .borrow_mut() + .fetch_add(count as usize, std::sync::atomic::Ordering::Release); } - fn cursor(&mut self) -> usize { - if self.cursor.load(std::sync::atomic::Ordering::Relaxed) >= self.config.addresses.len() { - self.cursor.store(0, std::sync::atomic::Ordering::Relaxed); + // cursor on the server addresses + fn cursor(&self) -> usize { + if self + .cursor + .borrow() + .load(std::sync::atomic::Ordering::Relaxed) + >= self.config.addresses.len() + { + self.cursor + .borrow_mut() + .store(0, std::sync::atomic::Ordering::Relaxed); 0 } else { self.cursor + .borrow_mut() .fetch_add(1, std::sync::atomic::Ordering::Relaxed) } } diff --git a/nebula_rust/tests/test_connection_pool.rs b/nebula_rust/tests/test_connection_pool.rs index 9a39bce..042e972 100644 --- a/nebula_rust/tests/test_connection_pool.rs +++ b/nebula_rust/tests/test_connection_pool.rs @@ -15,14 +15,15 @@ mod test_connection { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn mt_safe() { let mut conf = graph_client::pool_config::PoolConfig::new(); - conf.min_connection_pool_size(10) + conf.min_connection_pool_size(5) .max_connection_pool_size(10) .address("localhost:9669".to_string()); let pool = graph_client::connection_pool::ConnectionPool::new(&conf).await; { + // Consume all connections let mut futs = vec![]; - for _ in 0..10 { + for _ in 0..conf.max_connection_pool_size { futs.push(pool.get_session("root", "nebula", true)); } let sessions = futures::future::join_all(futs).await; @@ -34,6 +35,12 @@ mod test_connection { dt.push(common::types::Row::new(&[common::types::Value::iVal(1)])); assert!(dt == resp.data.unwrap()); } + + assert!(pool.len() == 0); + + // out of pool size limit + let result = pool.get_session("root", "nebula", true).await; + assert!(!result.is_ok()); } assert!(pool.len() == 10); } From 681af15f43c27c1d3c57eff132d3047f56a57095 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Fri, 6 Aug 2021 16:47:55 +0800 Subject: [PATCH 06/11] Unify the value trait name. --- nebula_rust/src/value/data_set.rs | 10 ++++------ nebula_rust/src/value/row.rs | 10 ++++------ nebula_rust/tests/test_connection.rs | 4 ++-- nebula_rust/tests/test_connection_pool.rs | 4 ++-- nebula_rust/tests/test_session.rs | 4 ++-- 5 files changed, 14 insertions(+), 18 deletions(-) diff --git a/nebula_rust/src/value/data_set.rs b/nebula_rust/src/value/data_set.rs index 38c8cc1..5f01d34 100644 --- a/nebula_rust/src/value/data_set.rs +++ b/nebula_rust/src/value/data_set.rs @@ -4,9 +4,7 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ -use common::types::DataSet; - -pub trait DataSetValue { +pub trait DataSet { /// Construct data set with name of columns fn new(col_names: &[String]) -> Self; @@ -20,11 +18,11 @@ pub trait DataSetValue { fn len(&self) -> usize; } -impl DataSetValue for DataSet { +impl DataSet for common::types::DataSet { fn new(col_names: &[String]) -> Self { let cols = col_names.to_vec(); let cols_bytes = cols.into_iter().map(|s| s.as_bytes().to_vec()).collect(); - DataSet { + common::types::DataSet { column_names: cols_bytes, rows: vec![], } @@ -35,7 +33,7 @@ impl DataSetValue for DataSet { .into_iter() .map(|s| s.as_bytes().to_vec()) .collect(); - DataSet { + common::types::DataSet { column_names: cols_bytes, rows: vec![], } diff --git a/nebula_rust/src/value/row.rs b/nebula_rust/src/value/row.rs index 4745567..ea2e028 100644 --- a/nebula_rust/src/value/row.rs +++ b/nebula_rust/src/value/row.rs @@ -4,9 +4,7 @@ * attached with Common Clause Condition 1.0, found in the LICENSES directory. */ -use common::types::Row; - -pub trait RowValue { +pub trait Row { /// Construct row by columns name fn new(row: &[common::types::Value]) -> Self; @@ -17,17 +15,17 @@ pub trait RowValue { fn len(&self) -> usize; } -impl RowValue for Row { +impl Row for common::types::Row { #[inline] fn new(row: &[common::types::Value]) -> Self { - Row { + common::types::Row { values: row.to_vec(), } } #[inline] fn from_vec(row: std::vec::Vec) -> Self { - Row { values: row } + common::types::Row { values: row } } #[inline] diff --git a/nebula_rust/tests/test_connection.rs b/nebula_rust/tests/test_connection.rs index f3a8729..d6bec99 100644 --- a/nebula_rust/tests/test_connection.rs +++ b/nebula_rust/tests/test_connection.rs @@ -9,8 +9,8 @@ extern crate nebula_rust; #[cfg(test)] mod test_connection { use nebula_rust::graph_client; - use nebula_rust::value::data_set::DataSetValue; - use nebula_rust::value::row::RowValue; + use nebula_rust::value::data_set::DataSet; + use nebula_rust::value::row::Row; #[tokio::test] async fn basic_op() { diff --git a/nebula_rust/tests/test_connection_pool.rs b/nebula_rust/tests/test_connection_pool.rs index 042e972..1935dd8 100644 --- a/nebula_rust/tests/test_connection_pool.rs +++ b/nebula_rust/tests/test_connection_pool.rs @@ -9,8 +9,8 @@ extern crate nebula_rust; #[cfg(test)] mod test_connection { use nebula_rust::graph_client; - use nebula_rust::value::data_set::DataSetValue; - use nebula_rust::value::row::RowValue; + use nebula_rust::value::data_set::DataSet; + use nebula_rust::value::row::Row; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn mt_safe() { diff --git a/nebula_rust/tests/test_session.rs b/nebula_rust/tests/test_session.rs index 8c63a46..b81e383 100644 --- a/nebula_rust/tests/test_session.rs +++ b/nebula_rust/tests/test_session.rs @@ -9,8 +9,8 @@ extern crate nebula_rust; #[cfg(test)] mod test_connection { use nebula_rust::graph_client; - use nebula_rust::value::data_set::DataSetValue; - use nebula_rust::value::row::RowValue; + use nebula_rust::value::data_set::DataSet; + use nebula_rust::value::row::Row; #[tokio::test] async fn basic_op() { From eb9a88530377a8f8ce0c7671d9af164e7508ea19 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Mon, 9 Aug 2021 11:38:19 +0800 Subject: [PATCH 07/11] Refactor the for loop by map collect. --- nebula_rust/tests/test_connection_pool.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nebula_rust/tests/test_connection_pool.rs b/nebula_rust/tests/test_connection_pool.rs index 1935dd8..a13037c 100644 --- a/nebula_rust/tests/test_connection_pool.rs +++ b/nebula_rust/tests/test_connection_pool.rs @@ -22,10 +22,10 @@ mod test_connection { { // Consume all connections - let mut futs = vec![]; - for _ in 0..conf.max_connection_pool_size { - futs.push(pool.get_session("root", "nebula", true)); - } + let futs = (0..conf.max_connection_pool_size) + .into_iter() + .map(|_| pool.get_session("root", "nebula", true)) + .collect::>(); let sessions = futures::future::join_all(futs).await; for session in &sessions { let resp = session.as_ref().unwrap().execute("YIELD 1").await.unwrap(); From abb68855bdd3d59fc4acd7a1fc4fca5d6b4b9ced Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Mon, 9 Aug 2021 11:47:17 +0800 Subject: [PATCH 08/11] Add method to get data set columns size. --- nebula_rust/src/value/data_set.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/nebula_rust/src/value/data_set.rs b/nebula_rust/src/value/data_set.rs index 5f01d34..7c70c3b 100644 --- a/nebula_rust/src/value/data_set.rs +++ b/nebula_rust/src/value/data_set.rs @@ -16,6 +16,9 @@ pub trait DataSet { /// Get rows size fn len(&self) -> usize; + + /// Get count of columns + fn cols_len(&self) -> usize; } impl DataSet for common::types::DataSet { @@ -48,4 +51,9 @@ impl DataSet for common::types::DataSet { fn len(&self) -> usize { self.rows.len() } + + #[inline] + fn cols_len(&self) -> usize { + self.column_names.len() + } } From ed964e48cf4d76ae16a30f30b5df30b52305c992 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Mon, 9 Aug 2021 15:06:19 +0800 Subject: [PATCH 09/11] Add a simple example. --- .github/workflows/test.yml | 4 ++++ nebula_rust/Cargo.toml | 3 +++ nebula_rust/examples/basic_op.rs | 23 +++++++++++++++++++++++ 3 files changed, 30 insertions(+) create mode 100644 nebula_rust/examples/basic_op.rs diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3312a83..17d98ac 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -47,3 +47,7 @@ jobs: run: | cargo test timeout-minutes: 4 + - name: example + run: | + cargo run --example basic_op + timeout-minutes: 4 diff --git a/nebula_rust/Cargo.toml b/nebula_rust/Cargo.toml index a7e8e5e..6fab764 100644 --- a/nebula_rust/Cargo.toml +++ b/nebula_rust/Cargo.toml @@ -27,3 +27,6 @@ futures = { version = "0.3.16" } [build-dependencies] [dev-dependencies] + +[[example]] +name = "basic_op" diff --git a/nebula_rust/examples/basic_op.rs b/nebula_rust/examples/basic_op.rs new file mode 100644 index 0000000..3ae4c1b --- /dev/null +++ b/nebula_rust/examples/basic_op.rs @@ -0,0 +1,23 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +use nebula_rust::graph_client; + +#[tokio::main] +async fn main() { + let mut conf = graph_client::pool_config::PoolConfig::new(); + conf.min_connection_pool_size(2) + .max_connection_pool_size(10) + .address("localhost:9669".to_string()); + + let pool = graph_client::connection_pool::ConnectionPool::new(&conf).await; + let session = pool.get_session("root", "nebula", true).await.unwrap(); + + let resp = session.execute("YIELD 1").await.unwrap(); + assert!(resp.error_code == common::types::ErrorCode::SUCCEEDED); + + println!("{:?}", resp.data.unwrap()); +} From b49b9bd8cb425992839565ea298c6494dee0b6e4 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Tue, 10 Aug 2021 14:45:11 +0800 Subject: [PATCH 10/11] Show the example query result. --- nebula_rust/examples/basic_op.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/nebula_rust/examples/basic_op.rs b/nebula_rust/examples/basic_op.rs index 3ae4c1b..171b1f3 100644 --- a/nebula_rust/examples/basic_op.rs +++ b/nebula_rust/examples/basic_op.rs @@ -19,5 +19,13 @@ async fn main() { let resp = session.execute("YIELD 1").await.unwrap(); assert!(resp.error_code == common::types::ErrorCode::SUCCEEDED); - println!("{:?}", resp.data.unwrap()); + println!("{:?}", resp.data.as_ref().unwrap()); + println!( + "The result of query `YIELD 1' is {}.", + if let common::types::Value::iVal(v) = resp.data.unwrap().rows[0].values[0] { + v + } else { + panic!() + } + ); } From 9f34ea20933687e265ec824aec0a6fde80fa4ebf Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Tue, 10 Aug 2021 14:52:32 +0800 Subject: [PATCH 11/11] Simplify the data set columns name conversion. --- nebula_rust/src/value/data_set.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nebula_rust/src/value/data_set.rs b/nebula_rust/src/value/data_set.rs index 7c70c3b..11eb837 100644 --- a/nebula_rust/src/value/data_set.rs +++ b/nebula_rust/src/value/data_set.rs @@ -23,8 +23,7 @@ pub trait DataSet { impl DataSet for common::types::DataSet { fn new(col_names: &[String]) -> Self { - let cols = col_names.to_vec(); - let cols_bytes = cols.into_iter().map(|s| s.as_bytes().to_vec()).collect(); + let cols_bytes = col_names.into_iter().map(|s| s.as_bytes().to_vec()).collect(); common::types::DataSet { column_names: cols_bytes, rows: vec![],