Skip to content

Commit

Permalink
fix: key value service resub faild if run before unsub ack and subscr…
Browse files Browse the repository at this point in the history
…ibe slot is removed
  • Loading branch information
giangndm committed Jan 31, 2024
1 parent ba15687 commit d130996
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 90 deletions.
114 changes: 83 additions & 31 deletions packages/services/key_value/src/behavior/hashmap_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ impl HashmapLocalStorage {
let mut removed_keys = Vec::new();
for ((key, sub_key), slot) in self.data.iter() {
if slot.acked && now - slot.last_sync >= self.sync_each_ms {
let req_id = self.gen_req_id();
if let Some(value) = &slot.value {
let req_id = self.gen_req_id();
log::debug!("[HashmapLocal] sync set key {} with version {}", key, slot.version);
self.output_events.push_back(LocalStorageAction::SendNet(
HashmapRemoteEvent::Set(req_id, *key, *sub_key, value.clone(), slot.version, slot.ex.clone()),
Expand All @@ -161,8 +161,8 @@ impl HashmapLocalStorage {
// we sync subscribe each sync_each_ms with each subscribe which acked
for (key, slot) in self.subscribe.iter() {
if slot.acked && now - slot.last_sync >= self.sync_each_ms {
let req_id = self.gen_req_id();
if slot.sub {
let req_id = self.gen_req_id();
log::debug!("[HashmapLocal] sync sub key {}", key);
self.output_events
.push_back(LocalStorageAction::SendNet(HashmapRemoteEvent::Sub(req_id, *key, slot.ex.clone()), RouteRule::ToKey(*key as u32)));
Expand Down Expand Up @@ -266,6 +266,7 @@ impl HashmapLocalStorage {
}
}
HashmapLocalEvent::OnKeySet(req_id, key, sub_key, value, version, source) => {
log::info!("[HashmapLocal] on_key_set key {}, sub_key {} with version {} from {}", key, sub_key, version, source);
self.output_events
.push_back(LocalStorageAction::SendNet(HashmapRemoteEvent::OnKeySetAck(req_id), RouteRule::ToNode(from)));
if let Some(slot) = self.subscribe.get_mut(&key) {
Expand All @@ -279,6 +280,7 @@ impl HashmapLocalStorage {
}
}
HashmapLocalEvent::OnKeyDel(req_id, key, sub_key, version, source) => {
log::info!("[HashmapLocal] on_key_del key {}, sub_key {} with version {} from {}", key, sub_key, version, source);
self.output_events
.push_back(LocalStorageAction::SendNet(HashmapRemoteEvent::OnKeyDelAck(req_id), RouteRule::ToNode(from)));
if let Some(slot) = self.subscribe.get_mut(&key) {
Expand All @@ -301,7 +303,7 @@ impl HashmapLocalStorage {
pub fn set(&mut self, now_ms: u64, key: KeyId, sub_key: SubKeyId, value: ValueType, ex: Option<u64>) {
let req_id = self.gen_req_id();
let version = self.gen_version(now_ms);
log::debug!("[HashmapLocal] set key {} with version {}", key, version);
log::info!("[HashmapLocal] set key {}, sub_key {} with version {}", key, sub_key, version);
self.data.insert(
(key, sub_key),
KeySlotData {
Expand All @@ -321,7 +323,7 @@ impl HashmapLocalStorage {

pub fn get(&mut self, now_ms: u64, key: KeyId, uuid: u64, service_id: u8, timeout_ms: u64) {
let req_id = self.gen_req_id();
log::debug!("[HashmapLocal] get key {} with req_id {}", key, req_id);
log::info!("[HashmapLocal] get key {} with req_id {}", key, req_id);
self.get_queue.insert(
req_id,
KeySlotGetCallback {
Expand All @@ -336,9 +338,10 @@ impl HashmapLocalStorage {
}

pub fn del(&mut self, key: KeyId, sub_key: SubKeyId) {
let req_id = self.gen_req_id();
log::debug!("[HashmapLocal] del key {} with req_id {}", key, req_id);
if let Some(slot) = self.data.get_mut(&(key, sub_key)) {
if self.data.contains_key(&(key, sub_key)) {
let req_id = self.gen_req_id();
let slot = self.data.get_mut(&(key, sub_key)).expect("Should work with above contains_key check");
log::info!("[HashmapLocal] del key {} sub_key {} with req_id {}", key, sub_key, req_id);
slot.value = None;
slot.last_sync = 0;
slot.acked = false;
Expand All @@ -349,46 +352,56 @@ impl HashmapLocalStorage {
}

pub fn subscribe(&mut self, key: KeyId, ex: Option<u64>, uuid: u64, service_id: u8) {
if let Some(slot) = self.subscribe.get_mut(&key) {
log::debug!("[HashmapLocal] subscribe key {} but already subscribed -> only add to handlers list", key);
let need_sub = if let Some(slot) = self.subscribe.get_mut(&key) {
log::info!(
"[HashmapLocal] subscribe key {} but already has slot -> add to handlers list and fire events for {} sub keys",
key,
slot.values.len()
);
slot.handlers.insert((uuid, service_id), ());
for (sub_key, value) in slot.values.iter() {
self.output_events
.push_back(LocalStorageAction::LocalOnChanged(service_id, uuid, key, *sub_key, Some(value.0.clone()), value.1, value.2));
}
return;
}
let need_sub = !slot.sub;
slot.sub = true;
need_sub
} else {
let mut handlers = SmallMap::new();
handlers.insert((uuid, service_id), ());
self.subscribe.insert(
key,
KeySlotSubscribe {
ex,
last_sync: 0,
sub: true,
acked: false,
handlers,
values: HashMap::new(),
},
);
true
};

if need_sub {
let req_id = self.gen_req_id();
log::info!("[HashmapLocal] subscribe key {} with req_id {}", key, req_id);

let req_id = self.gen_req_id();
log::debug!("[HashmapLocal] subscribe key {} with req_id {}", key, req_id);
let mut handlers = SmallMap::new();
handlers.insert((uuid, service_id), ());
self.subscribe.insert(
key,
KeySlotSubscribe {
ex,
last_sync: 0,
sub: true,
acked: false,
handlers,
values: HashMap::new(),
},
);
self.output_events
.push_back(LocalStorageAction::SendNet(HashmapRemoteEvent::Sub(req_id, key, ex), RouteRule::ToKey(key as u32)));
self.output_events
.push_back(LocalStorageAction::SendNet(HashmapRemoteEvent::Sub(req_id, key, ex), RouteRule::ToKey(key as u32)));
}
}

pub fn unsubscribe(&mut self, key: KeyId, uuid: u64, service_id: u8) {
let req_id = self.gen_req_id();
if let Some(slot) = self.subscribe.get_mut(&key) {
slot.handlers.remove(&(uuid, service_id));

if slot.handlers.is_empty() {
slot.sub = false;
slot.last_sync = 0;
slot.acked = false;

log::debug!("[HashmapLocal] unsubscribe key {} with req_id {}", key, req_id);
let req_id = self.gen_req_id();
log::info!("[HashmapLocal] unsubscribe key {} with req_id {}", key, req_id);

self.output_events
.push_back(LocalStorageAction::SendNet(HashmapRemoteEvent::Unsub(req_id, key), RouteRule::ToKey(key as u32)));
Expand Down Expand Up @@ -610,6 +623,45 @@ mod tests {
assert_eq!(storage.pop_action(), None);
}

#[test]
fn resub_after_unsub_ack_should_work() {
let mut storage = HashmapLocalStorage::new(10000);

storage.subscribe(1, None, 11111, 10);
assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Sub(0, 1, None), RouteRule::ToKey(1))));
assert_eq!(storage.pop_action(), None);

storage.unsubscribe(1, 11111, 10);
assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Unsub(1, 1), RouteRule::ToKey(1))));
assert_eq!(storage.pop_action(), None);

storage.on_event(2, HashmapLocalEvent::UnsubAck(1, 1, true));
storage.tick(10000);

// resub before ack should work
storage.subscribe(1, None, 11111, 10);
assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Sub(2, 1, None), RouteRule::ToKey(1))));
assert_eq!(storage.pop_action(), None);
}

#[test]
fn resub_before_unsub_ack_should_work() {
let mut storage = HashmapLocalStorage::new(10000);

storage.subscribe(1, None, 11111, 10);
assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Sub(0, 1, None), RouteRule::ToKey(1))));
assert_eq!(storage.pop_action(), None);

storage.unsubscribe(1, 11111, 10);
assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Unsub(1, 1), RouteRule::ToKey(1))));
assert_eq!(storage.pop_action(), None);

// resub before ack should work
storage.subscribe(1, None, 11111, 10);
assert_eq!(storage.pop_action(), Some(LocalStorageAction::SendNet(HashmapRemoteEvent::Sub(2, 1, None), RouteRule::ToKey(1))));
assert_eq!(storage.pop_action(), None);
}

#[test]
fn sub_multi_times_test() {
let mut storage = HashmapLocalStorage::new(10000);
Expand Down
48 changes: 27 additions & 21 deletions packages/services/key_value/src/behavior/hashmap_remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,18 @@ impl HashmapRemoteStorage {
pub fn on_event(&mut self, now_ms: u64, from: NodeId, event: HashmapRemoteEvent) {
match event {
HashmapRemoteEvent::Set(req_id, key, sub_key, value, version, ex) => {
log::debug!(
"[HashmapRemote {}] receive set event from {} key {} sub_key {} value {:?} version {} ex {:?}",
self.node_id,
from,
key,
sub_key,
value,
version,
ex
);
let setted = self.storage.set(now_ms, key, sub_key, value, version, from, ex);
if setted {
log::info!(
"[HashmapRemote {}] receive set event from {} key {} sub_key {} version {} ex {:?}",
self.node_id,
from,
key,
sub_key,
version,
ex
);
}
self.output_events
.push_back(RemoteStorageAction(HashmapLocalEvent::SetAck(req_id, key, sub_key, version, setted), RouteRule::ToNode(from)));
}
Expand All @@ -73,26 +74,31 @@ impl HashmapRemoteStorage {
}
}
HashmapRemoteEvent::Del(req_id, key, sub_key, req_version) => {
log::debug!(
"[HashmapRemote {}] receive del event from {} key {} sub_key {} version {:?}",
self.node_id,
from,
key,
sub_key,
req_version
);
let version = self.storage.del(&key, &sub_key, req_version).map(|(_, version, _)| version);
if version.is_some() {
log::info!(
"[HashmapRemote {}] receive del event from {} key {} sub_key {} version {:?}",
self.node_id,
from,
key,
sub_key,
req_version
);
}
self.output_events
.push_back(RemoteStorageAction(HashmapLocalEvent::DelAck(req_id, key, sub_key, version), RouteRule::ToNode(from)));
}
HashmapRemoteEvent::Sub(req_id, key, ex) => {
log::debug!("[HashmapRemote {}] receive sub event from {} key {} ex {:?}", self.node_id, from, key, ex);
self.storage.subscribe(now_ms, &key, from, ex);
if self.storage.subscribe(now_ms, &key, from, ex) {
log::info!("[HashmapRemote {}] receive sub event from {} key {} ex {:?}", self.node_id, from, key, ex);
}
self.output_events.push_back(RemoteStorageAction(HashmapLocalEvent::SubAck(req_id, key), RouteRule::ToNode(from)));
}
HashmapRemoteEvent::Unsub(req_id, key) => {
log::debug!("[HashmapRemote {}] receive unsub event from {} key {}", self.node_id, from, key);
let success = self.storage.unsubscribe(&key, &from);
if success {
log::info!("[HashmapRemote {}] receive unsub event from {} key {}", self.node_id, from, key);
}
self.output_events
.push_back(RemoteStorageAction(HashmapLocalEvent::UnsubAck(req_id, key, success), RouteRule::ToNode(from)));
}
Expand Down
Loading

0 comments on commit d130996

Please sign in to comment.