Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build single token -> address map #978

Merged
merged 2 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions src/filters/token_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,19 +122,9 @@ impl Router for HashedTokenRouter {
fn sync_read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> {
match ctx.metadata.get(&self.config.metadata_key) {
Some(metadata::Value::Bytes(token)) => {
let mut destinations = Vec::new();

let tok = crate::net::cluster::Token::new(token);

for ep in ctx.endpoints.iter() {
ep.value().addresses_for_token(tok, &mut destinations);

if !destinations.is_empty() {
break;
}
}

ctx.destinations = destinations;
ctx.destinations = ctx.endpoints.addresses_for_token(tok);

if ctx.destinations.is_empty() {
Err(FilterError::new(Error::NoEndpointMatch(
Expand Down
119 changes: 84 additions & 35 deletions src/net/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub(crate) fn active_endpoints() -> &'static prometheus::IntGauge {
&ACTIVE_ENDPOINTS
}

pub type TokenAddressMap = std::collections::BTreeMap<u64, Vec<EndpointAddress>>;
pub type TokenAddressMap = std::collections::BTreeMap<u64, BTreeSet<EndpointAddress>>;

#[derive(Copy, Clone)]
pub struct Token(u64);
Expand Down Expand Up @@ -165,13 +165,6 @@ impl EndpointSet {
self.endpoints.contains(ep)
}

#[inline]
pub fn addresses_for_token(&self, token: Token, addresses: &mut Vec<EndpointAddress>) {
if let Some(addrs) = self.token_map.get(&token.0) {
addresses.extend_from_slice(addrs);
}
}

/// Unique version for this endpoint set
#[inline]
pub fn version(&self) -> EndpointSetVersion {
Expand All @@ -182,7 +175,7 @@ impl EndpointSet {
///
/// This is extremely expensive
#[inline]
pub fn update(&mut self) {
pub fn update(&mut self) -> TokenAddressMap {
use std::hash::{Hash, Hasher};
let mut hasher = seahash::SeaHasher::with_seeds(0, 1, 2, 3);
let mut token_map = TokenAddressMap::new();
Expand All @@ -192,50 +185,81 @@ impl EndpointSet {

for tok in &ep.metadata.known.tokens {
let hash = seahash::hash(tok);
token_map.entry(hash).or_default().push(ep.address.clone());
token_map
.entry(hash)
.or_default()
.insert(ep.address.clone());
}
}

self.hash = hasher.finish();
self.version += 1;
self.token_map = token_map;
std::mem::replace(&mut self.token_map, token_map)
}

/// Creates a map of tokens -> address for the current set
#[inline]
pub fn build_token_map(&mut self) {
pub fn build_token_map(&mut self) -> TokenAddressMap {
let mut token_map = TokenAddressMap::new();

// This is only called on proxies, so calculate a token map
for ep in &self.endpoints {
for tok in &ep.metadata.known.tokens {
let hash = seahash::hash(tok);
token_map.entry(hash).or_default().push(ep.address.clone());
token_map
.entry(hash)
.or_default()
.insert(ep.address.clone());
}
}

self.token_map = token_map;
std::mem::replace(&mut self.token_map, token_map)
}

#[inline]
pub fn replace(&mut self, replacement: Self) -> BTreeSet<Endpoint> {
let old = std::mem::replace(&mut self.endpoints, replacement.endpoints);

if replacement.hash == 0 {
self.update();
pub fn replace(
&mut self,
replacement: Self,
) -> (
usize,
std::collections::HashMap<u64, Option<Vec<EndpointAddress>>>,
) {
let old_len = std::mem::replace(&mut self.endpoints, replacement.endpoints).len();

let old_tm = if replacement.hash == 0 {
self.update()
} else {
self.hash = replacement.hash;
self.version += 1;
self.build_token_map();
self.build_token_map()
};

let mut hm = std::collections::HashMap::new();

for (token, addrs) in &old_tm {
if let Some(naddrs) = self.token_map.get(token) {
if addrs.symmetric_difference(naddrs).count() > 0 {
hm.insert(*token, Some(naddrs.iter().cloned().collect()));
}
} else {
hm.insert(*token, None);
}
}

for (token, addrs) in &self.token_map {
if !hm.contains_key(token) {
hm.insert(*token, Some(addrs.iter().cloned().collect()));
}
}

old
(old_len, hm)
}
}

/// Represents a full snapshot of all clusters.
pub struct ClusterMap<S = RandomState> {
map: DashMap<Option<Locality>, EndpointSet, S>,
token_map: DashMap<u64, Vec<EndpointAddress>>,
num_endpoints: AtomicUsize,
version: AtomicU64,
}
Expand Down Expand Up @@ -275,25 +299,16 @@ where
}

#[inline]
pub fn insert(
&self,
locality: Option<Locality>,
cluster: BTreeSet<Endpoint>,
) -> Option<BTreeSet<Endpoint>> {
pub fn insert(&self, locality: Option<Locality>, cluster: BTreeSet<Endpoint>) {
self.apply(locality, EndpointSet::new(cluster))
}

pub fn apply(
&self,
locality: Option<Locality>,
cluster: EndpointSet,
) -> Option<BTreeSet<Endpoint>> {
pub fn apply(&self, locality: Option<Locality>, cluster: EndpointSet) {
let new_len = cluster.len();
if let Some(mut current) = self.map.get_mut(&locality) {
let current = current.value_mut();

let old = current.replace(cluster);
let old_len = old.len();
let (old_len, token_map_diff) = current.replace(cluster);

if new_len >= old_len {
self.num_endpoints.fetch_add(new_len - old_len, Relaxed);
Expand All @@ -302,12 +317,23 @@ where
}

self.version.fetch_add(1, Relaxed);
Some(old)

for (token_hash, addrs) in token_map_diff {
if let Some(addrs) = addrs {
self.token_map.insert(token_hash, addrs);
} else {
self.token_map.remove(&token_hash);
}
}
} else {
for (token_hash, addrs) in &cluster.token_map {
self.token_map
.insert(*token_hash, addrs.iter().cloned().collect());
}

self.map.insert(locality, cluster);
self.num_endpoints.fetch_add(new_len, Relaxed);
self.version.fetch_add(1, Relaxed);
None
}
}

Expand Down Expand Up @@ -482,10 +508,23 @@ where
/// Builds token maps for every locality. Only used by testing/benching
#[doc(hidden)]
pub fn build_token_maps(&self) {
self.token_map.clear();

for mut eps in self.map.iter_mut() {
eps.build_token_map();

for (token_hash, addrs) in &eps.token_map {
self.token_map
.insert(*token_hash, addrs.iter().cloned().collect());
}
}
}

pub fn addresses_for_token(&self, token: Token) -> Vec<EndpointAddress> {
self.token_map
.get(&token.0)
.map_or(Vec::new(), |addrs| addrs.value().to_vec())
}
}

impl<S> crate::config::watch::Watchable for ClusterMap<S> {
Expand Down Expand Up @@ -523,6 +562,7 @@ where
fn default() -> Self {
Self {
map: <DashMap<Option<Locality>, EndpointSet, S>>::default(),
token_map: Default::default(),
version: <_>::default(),
num_endpoints: <_>::default(),
}
Expand Down Expand Up @@ -656,8 +696,17 @@ where
{
fn from(map: DashMap<Option<Locality>, EndpointSet, S>) -> Self {
let num_endpoints = AtomicUsize::new(map.iter().map(|kv| kv.value().len()).sum());

let token_map = DashMap::<u64, Vec<EndpointAddress>>::default();
for es in &map {
for (token_hash, addrs) in &es.value().token_map {
token_map.insert(*token_hash, addrs.iter().cloned().collect());
}
}

Self {
map,
token_map,
num_endpoints,
version: AtomicU64::new(1),
}
Expand Down
Loading