Skip to content

Commit

Permalink
Merge pull request #206 from Nearest dc balancer
Browse files Browse the repository at this point in the history
  • Loading branch information
rekby authored Oct 31, 2024
2 parents 1c32018 + 26b98ec commit 72de41c
Show file tree
Hide file tree
Showing 10 changed files with 1,227 additions and 322 deletions.
17 changes: 11 additions & 6 deletions ydb/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ impl DiscoveryState {
Some(&self.nodes)
}

pub(crate) fn get_all_nodes(&self) -> Option<&Vec<NodeInfo>> {
Some(&self.nodes)
}

pub(crate) fn is_empty(&self) -> bool {
self.nodes.len() == 0
}
Expand Down Expand Up @@ -97,11 +101,12 @@ impl Default for DiscoveryState {
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct NodeInfo {
pub(crate) uri: Uri,
pub(crate) location: String,
}

impl NodeInfo {
pub(crate) fn new(uri: Uri) -> Self {
Self { uri }
pub(crate) fn new(uri: Uri, location: String) -> Self {
Self { uri, location }
}
}

Expand Down Expand Up @@ -141,7 +146,7 @@ pub struct StaticDiscovery {
impl StaticDiscovery {
pub fn new_from_str<'a, T: Into<&'a str>>(endpoint: T) -> YdbResult<Self> {
let endpoint = Uri::from_str(endpoint.into())?;
let nodes = vec![NodeInfo::new(endpoint)];
let nodes = vec![NodeInfo::new(endpoint, String::new())];

let state = DiscoveryState::new(std::time::Instant::now(), nodes);
let state = Arc::new(state);
Expand Down Expand Up @@ -324,14 +329,14 @@ impl DiscoverySharedState {

fn list_endpoints_to_node_infos(list: Vec<EndpointInfo>) -> YdbResult<Vec<NodeInfo>> {
list.into_iter()
.map(|item| match Self::endpoint_info_to_uri(item) {
Ok(uri) => YdbResult::<NodeInfo>::Ok(NodeInfo::new(uri)),
.map(|item| match Self::endpoint_info_to_uri(&item) {
Ok(uri) => YdbResult::<NodeInfo>::Ok(NodeInfo::new(uri, item.location.clone())),
Err(err) => YdbResult::<NodeInfo>::Err(err),
})
.try_collect()
}

fn endpoint_info_to_uri(endpoint_info: EndpointInfo) -> YdbResult<Uri> {
fn endpoint_info_to_uri(endpoint_info: &EndpointInfo) -> YdbResult<Uri> {
let authority: Authority =
Authority::from_str(format!("{}:{}", endpoint_info.fqdn, endpoint_info.port).as_str())?;

Expand Down
3 changes: 3 additions & 0 deletions ydb/src/grpc_wrapper/raw_discovery_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ impl GrpcDiscoveryClient {
};
let resp = self.service.list_endpoints(req).await?;
let result: ListEndpointsResult = grpc_read_operation_result(resp)?;

let res = result
.endpoints
.into_iter()
.map(|item| EndpointInfo {
fqdn: item.address,
port: item.port,
ssl: item.ssl,
location: item.location,
})
.collect_vec();
Ok(res)
Expand All @@ -45,6 +47,7 @@ pub(crate) struct EndpointInfo {
pub(crate) fqdn: String,
pub(crate) port: u32,
pub(crate) ssl: bool,
pub(crate) location: String,
}

impl GrpcServiceForDiscovery for GrpcDiscoveryClient {
Expand Down
315 changes: 0 additions & 315 deletions ydb/src/load_balancer.rs

This file was deleted.

Loading

0 comments on commit 72de41c

Please sign in to comment.