Skip to content

Commit

Permalink
Address feedback, fix unacked send
Browse files Browse the repository at this point in the history
  • Loading branch information
madninja committed Dec 7, 2023
1 parent 2a108ad commit 697c526
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 33 deletions.
19 changes: 12 additions & 7 deletions src/message_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ pub struct MessageCache<T: PartialEq + MessageHash> {

#[derive(Debug, Clone)]
pub struct CacheMessage<T: PartialEq + MessageHash> {
received: Instant,
message: T,
pub received: Instant,
pub message: T,
}

impl<T: PartialEq + MessageHash> CacheMessage<T> {
Expand Down Expand Up @@ -53,13 +53,14 @@ impl<T: PartialEq + MessageHash> MessageCache<T> {
///
/// Pushing a packet onto the back of a full cache will cause the oldest
/// (first) message in the cache to be dropped.
pub fn push_back(&mut self, message: T, received: Instant) -> Option<CacheMessage<T>> {
self.cache.push_back(CacheMessage::new(message, received));
pub fn push_back(&mut self, message: T, received: Instant) -> &CacheMessage<T> {
let message = CacheMessage::new(message, received);
self.cache.push_back(message);
if self.len() > self.max_messages as usize {
self.cache.pop_front()
} else {
None
self.cache.pop_front();
}
// safe to unwrap given that the message we just pushed to the back
self.cache.back().unwrap()
}

/// Returns the index of the first matching message in the cache or None if
Expand Down Expand Up @@ -128,6 +129,10 @@ impl<T: PartialEq + MessageHash> MessageCache<T> {
self.cache.front()
}

pub fn peek_back(&self) -> Option<&CacheMessage<T>> {
self.cache.back()
}

pub fn len(&self) -> usize {
self.cache.len()
}
Expand Down
6 changes: 0 additions & 6 deletions src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ impl From<PacketUp> for PacketRouterPacketUpV1 {
}
}

impl From<&PacketUp> for PacketRouterPacketUpV1 {
fn from(value: &PacketUp) -> Self {
value.packet.clone()
}
}

impl From<PacketRouterPacketDownV1> for PacketDown {
fn from(value: PacketRouterPacketDownV1) -> Self {
Self(value)
Expand Down
30 changes: 16 additions & 14 deletions src/packet_router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use helium_proto::services::router::{
};
use serde::Serialize;
use sha2::{Digest, Sha256};
use std::{ops::Deref, time::Instant as StdInstant};
use std::time::Instant as StdInstant;
use tokio::time::Duration;
use tracing::{debug, info, warn};

Expand Down Expand Up @@ -80,7 +80,7 @@ impl PacketRouter {
);
let store = MessageCache::new(router_settings.queue);
let reconnect = Reconnect::default();
let ack_timer = AckTimer::new(router_settings.ack_timeout());
let ack_timer = AckTimer::new(router_settings.ack_timeout(), false);
Self {
service,
transmit,
Expand Down Expand Up @@ -126,9 +126,9 @@ impl PacketRouter {
},
_ = self.ack_timer.wait() => {
warn!("no packet acks received");
let reconnect_result = self.handle_reconnect().await;
self.reconnect.update_next_time(reconnect_result.is_err());
self.ack_timer.update_next_time(reconnect_result.is_ok());
self.service.disconnect();
self.reconnect.update_next_time(true);
self.ack_timer.update_next_time(false);
},
router_message = self.service.recv() => match router_message {
Ok(envelope_down_v1::Data::Packet(message)) => self.handle_downlink(message).await,
Expand All @@ -151,6 +151,7 @@ impl PacketRouter {
},
Err(err) => {
warn!(?err, "router error");
self.service.disconnect();
self.reconnect.update_next_time(true);
self.ack_timer.update_next_time(false);
},
Expand All @@ -170,9 +171,9 @@ impl PacketRouter {
}

async fn handle_uplink(&mut self, uplink: PacketUp, received: StdInstant) -> Result {
self.store.push_back(uplink, received);
let message = self.store.push_back(uplink, received).clone();
if self.service.is_connected() {
self.send_waiting_packets().await?;
self.send_packet(message).await?;
}
Ok(())
}
Expand All @@ -188,23 +189,23 @@ impl PacketRouter {
}
if let Some(index) = self.store.index_of(|msg| msg.hash == message.payload_hash) {
self.store.remove_to(index);
debug!(removed = index, "removed acked packets");
info!(removed = index + 1, "cleared acked packets");
}
}

async fn handle_session_offer(&mut self, message: PacketRouterSessionOfferV1) -> Result {
self.service.session_init(&message.nonce).await?;
self.send_waiting_packets()
self.send_unacked_packets()
.inspect_err(|err| warn!(%err, "failed to send queued packets"))
.await
}

async fn send_waiting_packets(&mut self) -> Result {
async fn send_unacked_packets(&mut self) -> Result {
while let (removed, Some(packet)) = self.store.pop_front(STORE_GC_INTERVAL) {
if removed > 0 {
info!(removed, "discarded queued packets");
}
if let Err(err) = self.send_packet(&packet).await {
if let Err(err) = self.send_packet(packet.clone()).await {
warn!(%err, "failed to send uplink");
self.store.push_front(packet);
return Err(err);
Expand All @@ -213,11 +214,12 @@ impl PacketRouter {
Ok(())
}

async fn send_packet(&mut self, packet: &CacheMessage<PacketUp>) -> Result {
async fn send_packet(&mut self, packet: CacheMessage<PacketUp>) -> Result {
debug!(packet_hash = packet.hash().to_b64(), "sending packet");

let mut uplink: PacketRouterPacketUpV1 = packet.deref().into();
uplink.hold_time = packet.hold_time().as_millis() as u64;
let hold_time = packet.hold_time().as_millis() as u64;
let mut uplink: PacketRouterPacketUpV1 = packet.message.into();
uplink.hold_time = hold_time;
self.service.send_uplink(uplink).await
}
}
16 changes: 10 additions & 6 deletions src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ pub struct AckTimer {
}

impl AckTimer {
pub fn new(timeout: Duration) -> Self {
pub fn new(timeout: Duration, active: bool) -> Self {
Self {
next_time: Instant::now() + timeout,
next_time: Self::calc_next_time(timeout, active),
timeout,
}
}
Expand All @@ -82,12 +82,16 @@ impl AckTimer {
}

pub fn update_next_time(&mut self, active: bool) {
self.next_time = Self::calc_next_time(self.timeout, active)
}

fn calc_next_time(timeout: Duration, active: bool) -> Instant {
// timeout is 0 if the ack timer is not requested. Active means the
// connection is open and acks are to be expected
self.next_time = if self.timeout.as_secs() > 0 && active {
Instant::now() + self.timeout
if timeout.as_secs() > 0 && active {
Instant::now() + timeout + Duration::from_secs(1)
} else {
Instant::now() - self.timeout
};
Instant::now() - timeout
}
}
}

0 comments on commit 697c526

Please sign in to comment.