Skip to content

Commit

Permalink
Merge pull request #182 from vladak/time_monotonic_ns
Browse files Browse the repository at this point in the history
use time.monotonic_ns() when available
  • Loading branch information
FoamyGuy authored Nov 27, 2023
2 parents 6270110 + 20ba1e3 commit d1e2b7c
Showing 1 changed file with 48 additions and 21 deletions.
69 changes: 48 additions & 21 deletions adafruit_minimqtt/adafruit_minimqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,12 @@ class MQTT:
This works with all callbacks but the "on_message" and those added via add_topic_callback();
for those, to get access to the user_data use the 'user_data' member of the MQTT object
passed as 1st argument.
:param bool use_imprecise_time: on boards without time.monotonic_ns() one has to set
this to True in order to operate correctly over more than 24 days or so
"""

# pylint: disable=too-many-arguments,too-many-instance-attributes,too-many-statements, not-callable, invalid-name, no-member
# pylint: disable=too-many-arguments,too-many-instance-attributes,too-many-statements,not-callable,invalid-name,no-member,too-many-locals
def __init__(
self,
*,
Expand All @@ -193,13 +195,28 @@ def __init__(
socket_timeout: int = 1,
connect_retries: int = 5,
user_data=None,
use_imprecise_time: Optional[bool] = None,
) -> None:
self._socket_pool = socket_pool
self._ssl_context = ssl_context
self._sock = None
self._backwards_compatible_sock = False
self._use_binary_mode = use_binary_mode

self.use_monotonic_ns = False
try:
time.monotonic_ns()
self.use_monotonic_ns = True
except AttributeError:
if use_imprecise_time:
self.use_monotonic_ns = False
else:
raise MMQTTException( # pylint: disable=raise-missing-from
"time.monotonic_ns() is not available. "
"Will use imprecise time however only if the"
"use_imprecise_time argument is set to True."
)

if recv_timeout <= socket_timeout:
raise MMQTTException(
"recv_timeout must be strictly greater than socket_timeout"
Expand Down Expand Up @@ -251,9 +268,8 @@ def __init__(
self.client_id = client_id
else:
# assign a unique client_id
self.client_id = (
f"cpy{randint(0, int(time.monotonic() * 100) % 1000)}{randint(0, 99)}"
)
time_int = int(self.get_monotonic_time() * 100) % 1000
self.client_id = f"cpy{randint(0, time_int)}{randint(0, 99)}"
# generated client_id's enforce spec.'s length rules
if len(self.client_id.encode("utf-8")) > 23 or not self.client_id:
raise ValueError("MQTT Client ID must be between 1 and 23 bytes")
Expand All @@ -276,6 +292,17 @@ def __init__(
self.on_subscribe = None
self.on_unsubscribe = None

def get_monotonic_time(self) -> float:
"""
Provide monotonic time in seconds. Based on underlying implementation
this might result in imprecise time, that will result in the library
not being able to operate if running contiguously for more than 24 days or so.
"""
if self.use_monotonic_ns:
return time.monotonic_ns() / 1000000000

return time.monotonic()

# pylint: disable=too-many-branches
def _get_connect_socket(self, host: str, port: int, *, timeout: int = 1):
"""Obtains a new socket and connects to a broker.
Expand Down Expand Up @@ -636,7 +663,7 @@ def _connect(
self._send_str(self._username)
self._send_str(self._password)
self.logger.debug("Receiving CONNACK packet from broker")
stamp = time.monotonic()
stamp = self.get_monotonic_time()
while True:
op = self._wait_for_msg()
if op == 32:
Expand All @@ -652,7 +679,7 @@ def _connect(
return result

if op is None:
if time.monotonic() - stamp > self._recv_timeout:
if self.get_monotonic_time() - stamp > self._recv_timeout:
raise MMQTTException(
f"No data received from broker for {self._recv_timeout} seconds."
)
Expand Down Expand Up @@ -681,13 +708,13 @@ def ping(self) -> list[int]:
self.logger.debug("Sending PINGREQ")
self._sock.send(MQTT_PINGREQ)
ping_timeout = self.keep_alive
stamp = time.monotonic()
stamp = self.get_monotonic_time()
rc, rcs = None, []
while rc != MQTT_PINGRESP:
rc = self._wait_for_msg()
if rc:
rcs.append(rc)
if time.monotonic() - stamp > ping_timeout:
if self.get_monotonic_time() - stamp > ping_timeout:
raise MMQTTException("PINGRESP not returned from broker.")
return rcs

Expand Down Expand Up @@ -768,7 +795,7 @@ def publish(
if qos == 0 and self.on_publish is not None:
self.on_publish(self, self.user_data, topic, self._pid)
if qos == 1:
stamp = time.monotonic()
stamp = self.get_monotonic_time()
while True:
op = self._wait_for_msg()
if op == 0x40:
Expand All @@ -782,7 +809,7 @@ def publish(
return

if op is None:
if time.monotonic() - stamp > self._recv_timeout:
if self.get_monotonic_time() - stamp > self._recv_timeout:
raise MMQTTException(
f"No data received from broker for {self._recv_timeout} seconds."
)
Expand Down Expand Up @@ -834,11 +861,11 @@ def subscribe(self, topic: str, qos: int = 0) -> None:
for t, q in topics:
self.logger.debug("SUBSCRIBING to topic %s with QoS %d", t, q)
self._sock.send(packet)
stamp = time.monotonic()
stamp = self.get_monotonic_time()
while True:
op = self._wait_for_msg()
if op is None:
if time.monotonic() - stamp > self._recv_timeout:
if self.get_monotonic_time() - stamp > self._recv_timeout:
raise MMQTTException(
f"No data received from broker for {self._recv_timeout} seconds."
)
Expand Down Expand Up @@ -901,10 +928,10 @@ def unsubscribe(self, topic: str) -> None:
self._sock.send(packet)
self.logger.debug("Waiting for UNSUBACK...")
while True:
stamp = time.monotonic()
stamp = self.get_monotonic_time()
op = self._wait_for_msg()
if op is None:
if time.monotonic() - stamp > self._recv_timeout:
if self.get_monotonic_time() - stamp > self._recv_timeout:
raise MMQTTException(
f"No data received from broker for {self._recv_timeout} seconds."
)
Expand Down Expand Up @@ -998,8 +1025,8 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
self._connected()
self.logger.debug(f"waiting for messages for {timeout} seconds")
if self._timestamp == 0:
self._timestamp = time.monotonic()
current_time = time.monotonic()
self._timestamp = self.get_monotonic_time()
current_time = self.get_monotonic_time()
if current_time - self._timestamp >= self.keep_alive:
self._timestamp = 0
# Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server
Expand All @@ -1009,14 +1036,14 @@ def loop(self, timeout: float = 0) -> Optional[list[int]]:
rcs = self.ping()
return rcs

stamp = time.monotonic()
stamp = self.get_monotonic_time()
rcs = []

while True:
rc = self._wait_for_msg()
if rc is not None:
rcs.append(rc)
if time.monotonic() - stamp > timeout:
if self.get_monotonic_time() - stamp > timeout:
self.logger.debug(f"Loop timed out after {timeout} seconds")
break

Expand Down Expand Up @@ -1115,7 +1142,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
:param int bufsize: number of bytes to receive
:return: byte array
"""
stamp = time.monotonic()
stamp = self.get_monotonic_time()
if not self._backwards_compatible_sock:
# CPython/Socketpool Impl.
rc = bytearray(bufsize)
Expand All @@ -1130,7 +1157,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
recv_len = self._sock.recv_into(mv, to_read)
to_read -= recv_len
mv = mv[recv_len:]
if time.monotonic() - stamp > read_timeout:
if self.get_monotonic_time() - stamp > read_timeout:
raise MMQTTException(
f"Unable to receive {to_read} bytes within {read_timeout} seconds."
)
Expand All @@ -1150,7 +1177,7 @@ def _sock_exact_recv(self, bufsize: int) -> bytearray:
recv = self._sock.recv(to_read)
to_read -= len(recv)
rc += recv
if time.monotonic() - stamp > read_timeout:
if self.get_monotonic_time() - stamp > read_timeout:
raise MMQTTException(
f"Unable to receive {to_read} bytes within {read_timeout} seconds."
)
Expand Down

0 comments on commit d1e2b7c

Please sign in to comment.