From 8cefa4b2a9fa3fd22489bf80f3d74a8f13842dc7 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Tue, 14 Jan 2025 18:24:56 +0100 Subject: [PATCH] Improved resource transfer timing --- RNS/Resource.py | 44 ++++++++++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/RNS/Resource.py b/RNS/Resource.py index b0c726af..683b6768 100644 --- a/RNS/Resource.py +++ b/RNS/Resource.py @@ -188,8 +188,11 @@ def accept(advertisement_packet, callback=None, progress_callback = None, reques resource.consecutive_completed_height = -1 previous_window = resource.link.get_last_resource_window() + previous_eifr = resource.link.get_last_resource_eifr() if previous_window: resource.window = previous_window + if previous_eifr: + resource.previous_eifr = previous_eifr if not resource.link.has_incoming_resource(resource): resource.link.register_incoming_resource(resource) @@ -213,7 +216,6 @@ def accept(advertisement_packet, callback=None, progress_callback = None, reques except Exception as e: RNS.log("Could not decode resource advertisement, dropping resource", RNS.LOG_DEBUG) - RNS.trace_exception(e) # TODO: Remove debug return None # Create a resource for transmission to a remote destination @@ -295,6 +297,9 @@ def __init__(self, data, link, advertise=True, auto_compress=True, callback=None self.req_sent = 0 self.req_resp_rtt_rate = 0 self.rtt_rxd_bytes_at_part_req = 0 + self.req_data_rtt_rate = 0 + self.eifr = None + self.previous_eifr = None self.fast_rate_rounds = 0 self.very_slow_rate_rounds = 0 self.request_id = request_id @@ -465,6 +470,22 @@ def __advertise_job(self): self.watchdog_job() + def update_eifr(self): + if self.rtt == None: + rtt = self.link.rtt + else: + rtt = self.rtt + + if self.req_data_rtt_rate != 0: + expected_inflight_rate = self.req_data_rtt_rate*8 + else: + if self.previous_eifr != None: + expected_inflight_rate = self.previous_eifr + else: + expected_inflight_rate = self.link.establishment_cost*8 / rtt + + self.eifr = expected_inflight_rate + def watchdog_job(self): thread = threading.Thread(target=self.__watchdog_job) thread.daemon = True @@ -479,7 +500,6 @@ def __watchdog_job(self): sleep(0.025) sleep_time = None - if self.status == Resource.ADVERTISED: sleep_time = (self.adv_sent+self.timeout+Resource.PROCESSING_GRACE)-time.time() if sleep_time < 0: @@ -503,18 +523,17 @@ def __watchdog_job(self): elif self.status == Resource.TRANSFERRING: if not self.initiator: + retries_used = self.max_retries - self.retries_left + extra_wait = retries_used * Resource.PER_RETRY_DELAY - if self.rtt == None: - rtt = self.link.rtt - else: - rtt = self.rtt + self.update_eifr() + expected_tof_remaining = (self.outstanding_parts*self.sdu*8)/self.eifr - window_remaining = self.outstanding_parts + if self.req_resp_rtt_rate != 0: + sleep_time = self.last_activity + self.part_timeout_factor*expected_tof_remaining + Resource.RETRY_GRACE_TIME + extra_wait - time.time() + else: + sleep_time = self.last_activity + self.part_timeout_factor*((3*self.sdu)/self.eifr) + Resource.RETRY_GRACE_TIME + extra_wait - time.time() - retries_used = self.max_retries - self.retries_left - extra_wait = retries_used * Resource.PER_RETRY_DELAY - sleep_time = self.last_activity + (rtt*(self.part_timeout_factor+window_remaining)) + Resource.RETRY_GRACE_TIME + extra_wait - time.time() - if sleep_time < 0: if self.retries_left > 0: ms = "" if self.outstanding_parts == 1 else "s" @@ -564,7 +583,7 @@ def __watchdog_job(self): sleep_time = 0.001 if sleep_time == 0: - RNS.log("Warning! Link watchdog sleep time of 0!", RNS.LOG_WARNING) + RNS.log("Warning! Link watchdog sleep time of 0!", RNS.LOG_DEBUG) if sleep_time == None or sleep_time < 0: RNS.log("Timing error, cancelling resource transfer.", RNS.LOG_ERROR) self.cancel() @@ -781,6 +800,7 @@ def receive_part(self, packet): if rtt != 0: self.req_data_rtt_rate = req_transferred/rtt + self.update_eifr() self.rtt_rxd_bytes_at_part_req = self.rtt_rxd_bytes if self.req_data_rtt_rate > Resource.RATE_FAST and self.fast_rate_rounds < Resource.FAST_RATE_THRESHOLD: