Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

encode/decode remaining length properly for {,UN}SUBSCRIBE/SUBACK #187

Merged
merged 16 commits into from
Dec 17, 2023

Conversation

vladak
Copy link
Contributor

@vladak vladak commented Nov 20, 2023

This change fixes remaining length encoding for SUBSCRIBE packets, allowing subscribe to pass for remaning length bigger than 127. Tested on CPython with:

#!/usr/bin/env python3

import json
import socket
import ssl
import sys

import adafruit_minimqtt.adafruit_minimqtt as MQTT

import logging


def main():
    logging.basicConfig()
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)

    logger.info("Creating MQTT instance")

    broker = "172.40.0.3"
    port = 1883
    mqtt_client = MQTT.MQTT(
        broker=broker,
        port=port,
        socket_pool=socket,
        ssl_context=ssl.create_default_context(),
        connect_retries=3,
    )

    mqtt_client.enable_logger(logging, log_level=logging.DEBUG)

    logger.info("Connecting to MQTT broker")
    mqtt_client.connect()

    topics = [("broken/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/pat/toolong",0)]
    logger.info(f"subscribing to {topics}")
    mqtt_client.subscribe(topics)

    i = 0
    while True:
        logger.info(f"iteration {i}")
        ret = mqtt_client.loop(timeout=3)
        if ret is not None:
             logger.info(f"got {ret}")
        i = i + 1


if __name__ == "__main__":
    main()

which produces the following output:

INFO:__main__:Creating MQTT instance
INFO:__main__:Connecting to MQTT broker
DEBUG:log:Attempting to connect to MQTT broker (attempt #0)
DEBUG:log:Attempting to establish MQTT connection...
INFO:log:Establishing an INSECURE connection to 172.40.0.3:1883
DEBUG:log:Sending CONNECT to broker...
DEBUG:log:Fixed Header: bytearray(b'\x10\x14\x00')
DEBUG:log:Variable Header: bytearray(b'\x04MQTT\x04\x02\x00<')
DEBUG:log:Receiving CONNACK packet from broker
DEBUG:log:Got message type: 0x20
DEBUG:log:Resetting reconnect backoff
INFO:__main__:subscribing to [('broken/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/pat/toolong', 0)]
DEBUG:log:Sending SUBSCRIBE to broker...
DEBUG:log:Fixed Header: bytearray(b'\x82\x80\x01')
DEBUG:log:Variable Header: b'\x00\x01'
DEBUG:log:SUBSCRIBING to topic broken/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/pat/toolong with QoS 0
DEBUG:log:packet: b'\x00{broken/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/pat/toolong\x00'
DEBUG:log:Got message type: 0x90
INFO:__main__:iteration 0
DEBUG:log:waiting for messages for 3 seconds
DEBUG:log:Loop timed out after 3 seconds
INFO:__main__:iteration 1
DEBUG:log:waiting for messages for 3 seconds
...

@FoamyGuy
Copy link
Contributor

I attempted to test this version on a FunHouse 9.0.0-alpha.5 using the native networking simpletest from this repo.

It seems like it is having trouble subscribing to the onoff feed. It keeps printing this over and over: Connected to Adafruit IO! Listening for topic changes on Foamyguy/feeds/onoff and never gets to the point in the main loop where it starts publishing new values. The same test script with current main branch executes successfully and is able to subscribe and receive new data + publish data on the other feed.

I'm attaching two log files from the more verbose logging output. One with the version from this PR and one with current main.

mqtt_main.log
mqtt_pr187.log

The particulars of the MQTT spec and connection are beyond my knowledge so I'm not sure what could cause this.

Let me know if there is any additional testing or info I could provide to try to help figure out what could cause this issue.

test script:

# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
# SPDX-License-Identifier: MIT

import os
import time
import ssl
import socketpool
import wifi
import adafruit_minimqtt.adafruit_minimqtt as MQTT

# Add settings.toml to your filesystem CIRCUITPY_WIFI_SSID and CIRCUITPY_WIFI_PASSWORD keys
# with your WiFi credentials. DO NOT share that file or commit it into Git or other
# source control.

# Set your Adafruit IO Username, Key and Port in settings.toml
# (visit io.adafruit.com if you need to create an account,
# or if you need your Adafruit IO key.)
aio_username = os.getenv("aio_username")
aio_key = os.getenv("aio_key")

print(f"Connecting to {os.getenv('CIRCUITPY_WIFI_SSID')}")
wifi.radio.connect(
    os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD")
)
print(f"Connected to {os.getenv('CIRCUITPY_WIFI_SSID')}!")
### Feeds ###

# Setup a feed named 'photocell' for publishing to a feed
photocell_feed = aio_username + "/feeds/photocell"

# Setup a feed named 'onoff' for subscribing to changes
onoff_feed = aio_username + "/feeds/onoff"

### Code ###


# Define callback methods which are called when events occur
# pylint: disable=unused-argument, redefined-outer-name
def connected(client, userdata, flags, rc):
    # This function will be called when the client is connected
    # successfully to the broker.
    print(f"Connected to Adafruit IO! Listening for topic changes on {onoff_feed}")
    # Subscribe to all changes on the onoff_feed.
    client.subscribe(onoff_feed)


def disconnected(client, userdata, rc):
    # This method is called when the client is disconnected
    print("Disconnected from Adafruit IO!")


def message(client, topic, message):
    # This method is called when a topic the client is subscribed to
    # has a new message.
    print(f"New message on topic {topic}: {message}")

    # if not client.user_data.get(topic):
    #     client.user_data[topic] = []
    # client.user_data[topic].append(message)
    # 
    # print(client.user_data)

# Create a socket pool
pool = socketpool.SocketPool(wifi.radio)
ssl_context = ssl.create_default_context()

# If you need to use certificate/key pair authentication (e.g. X.509), you can load them in the
# ssl context by uncommenting the lines below and adding the following keys to the "secrets"
# dictionary in your secrets.py file:
# "device_cert_path" - Path to the Device Certificate
# "device_key_path" - Path to the RSA Private Key
# ssl_context.load_cert_chain(
#     certfile=secrets["device_cert_path"], keyfile=secrets["device_key_path"]
# )
messages = {}
# Set up a MiniMQTT Client
mqtt_client = MQTT.MQTT(
    broker="io.adafruit.com",
    port=1883,
    username=aio_username,
    password=aio_key,
    socket_pool=pool,
    ssl_context=ssl_context,
    user_data=messages,
)

# Setup the callback methods above
mqtt_client.on_connect = connected
mqtt_client.on_disconnect = disconnected
mqtt_client.on_message = message

# Connect the client to the MQTT broker.
print("Connecting to Adafruit IO...")
mqtt_client.connect()

photocell_val = 0


messages = {}
while True:
    # Poll the message queue
    mqtt_client.loop()

    # Send a new message
    print(f"Sending photocell value: {photocell_val}...")
    mqtt_client.publish(photocell_feed, photocell_val)
    print("Sent!")
    photocell_val += 1
    time.sleep(5)

@vladak vladak force-pushed the subscribe_vs_remaining_len branch from a04fe49 to 973df68 Compare November 28, 2023 21:39
@vladak
Copy link
Contributor Author

vladak commented Nov 28, 2023

Thanks for the testing. I found the extra zero byte in the CONNECT packet strange and did not investigate. As a result I missed the logic for short remaining length. This should work now.

@vladak
Copy link
Contributor Author

vladak commented Nov 30, 2023

As I've started testing on QtPy with code that basically calls subscribe to a wildcard topic in a loop, I noticed this:

536.291: INFO - subscribing to devices/#                                        
536.291: DEBUG - Sending SUBSCRIBE to broker...                                 
536.293: DEBUG - Fixed Header: bytearray(b'\x82\x0e')                           
536.296: DEBUG - Variable Header: b'\x00\x07'                                   
536.298: DEBUG - SUBSCRIBING to topic devices/# with QoS 0                      
536.300: DEBUG - payload: b'\x00\tdevices/#\x00'                                
536.303: DEBUG - Got message type: 0x30 pkt: 0x30                               
536.307: DEBUG - Receiving PUBLISH                                              
Topic: devices/fusebox/esp32                                                    
Msg: bytearray(b'{"humidity": "29.7", "temperature": "29.5", "pulses": 10749}') 
                                                                                
Traceback (most recent call last):                                              
  File "code.py", line 53, in <module>                                          
  File "code.py", line 45, in main                                              
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 887, in subscribe    
MMQTTException: invalid message received as response to SUBSCRIBE: 0x30         
]0;🐍172.40.0.23 | 887@/lib/adafruit_minimqtt/adafruit_ MMQTTException | 8.2.6\ 
Code done running.

and upon realizing this is valid behavior according to the spec, added a fix.

@vladak vladak changed the title encode remaining length properly for SUBSCRIBE encode remaining length properly for {,UN}SUBSCRIBE Nov 30, 2023
@vladak
Copy link
Contributor Author

vladak commented Nov 30, 2023

Further testing revealed that UNSUBSCRIBE needs to be fixed (sic!) as well. Here's the test code used on QtPy running CircuitPython 8.2.6 on 2023-09-12:

#!/usr/bin/env python3

import adafruit_logging as logging
import random
import socketpool
import ssl
import sys
import time
import wifi

from secrets import secrets

import adafruit_minimqtt.adafruit_minimqtt as MQTT


def main():
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)

    logger.info("Connecting to wifi")
    wifi.radio.connect(secrets["SSID"], secrets["password"], timeout=10)
    logger.info(f"Connected to {secrets['SSID']}")
    logger.debug(f"IP: {wifi.radio.ipv4_address}")

    pool = socketpool.SocketPool(wifi.radio)

    host = "172.40.0.3"
    port = 1883
    mqtt_client = MQTT.MQTT(
        broker=host,
        port=port,
        socket_pool=pool,
        ssl_context=ssl.create_default_context(),
        connect_retries=1,
        recv_timeout=5,
    )

    mqtt_client.logger = logger

    logger.debug(f"connecting")
    mqtt_client.connect()

    topic = "devices"
    # topic length should not exceed the maximum given by spec (65535 bytes).
    for i in range(128):
        logger.info(f"### iteration {i}")
        topic += "/" + ''.join(random.choice('abcdefghijklmnopqrstuvwxyz')
                               for _ in range(random.randrange(3, 16)))
        mqtt_client.subscribe(topic)
        mqtt_client.unsubscribe(topic)

        logger.debug("loop")
        mqtt_client.loop(1)


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        sys.exit(0)

It went through all of the iterations, although at times it seemed that the Mu editor will choke over the long input emitted to the serial console however it always recovered.

@vladak
Copy link
Contributor Author

vladak commented Dec 13, 2023

Added protocol level tests for the SUBSCRIBE packet. Firstly, I captured short and long (with remaining length encoded as 1 byte and 2 bytes, respectively) SUBSCRIBE/SUBACK packets using Mosquitto client and server, then added an assert that verifies that the SUBSCRIBE packet sent by MiniMQTT is the same as the one sent by Mosquitto client and that MiniMQTT processes the reply as sent from the Mosquitto server successfully.

There is of course potential for more testing, like generating SUBSCRIBE packets with remaining length encoded as 3 and 4 bytes (maximum) while perhaps at the same time using the topics specified with tuple and list. Similarly for UNSUBSCRIBE.

@vladak
Copy link
Contributor Author

vladak commented Dec 13, 2023

Also, the change that covers the PUBLISH-before-SUBACK case can be tested this way. Let me know.

@vladak
Copy link
Contributor Author

vladak commented Dec 14, 2023

Never mind. I implemented some of the tests described above.

@vladak vladak changed the title encode remaining length properly for {,UN}SUBSCRIBE encode/decode remaining length properly for {,UN}SUBSCRIBE/SUBACK Dec 15, 2023
@vladak
Copy link
Contributor Author

vladak commented Dec 15, 2023

After adding the test case for long lists of topics in SUBSCRIBE, I noticed that the remaining length parsing for SUBACK also assumes 2 bytes so added a fix.

@vladak
Copy link
Contributor Author

vladak commented Dec 15, 2023

I believe all these remaining length encoding/decoding implementation problems stem from the insufficiently worded spec. I wrote an e-mail to the IBM guys who were the editors of the 3.1.1 spec to fix it, however one of them does not seem to be working for IBM any longer as the e-mail bounced and the other one did not reply.

@vladak
Copy link
Contributor Author

vladak commented Dec 16, 2023

The changes are done unless someone raises review concern. Next, I'd like to retest the changes on my trusty QtPy, using both local MQTT broker as well as the public Mosquitto test bed which experiences pretty wild traffic.

@vladak
Copy link
Contributor Author

vladak commented Dec 16, 2023

Completed all the 128 iterations of the above subscribe/unsubscribe test with randomized topics on the QtPy just fine.

@vladak
Copy link
Contributor Author

vladak commented Dec 16, 2023

Used this code on the QtPy to follow the wild traffic on test.mosquitto.org:

#!/usr/bin/env python3

import adafruit_logging as logging
import random
import socketpool
import ssl
import sys
import time
import wifi

from secrets import secrets

import adafruit_minimqtt.adafruit_minimqtt as MQTT


def on_message(client, topic, msg):
    #logger = logging.getLogger(__name__)
    #logger.info(f"Got msg on '{topic}' ({len(msg)} bytes)")
    client.user_data[0] += 1
    client.user_data[1] += len(msg)


def main():
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)

    logger.info("Connecting to wifi")
    wifi.radio.connect(secrets["SSID"], secrets["password"], timeout=10)
    logger.info(f"Connected to {secrets['SSID']}")
    logger.debug(f"IP: {wifi.radio.ipv4_address}")

    pool = socketpool.SocketPool(wifi.radio)

    host = "test.mosquitto.org"
    port = 1883
    stats = [0, 0]
    mqtt_client = MQTT.MQTT(
        broker=host,
        port=port,
        socket_pool=pool,
        ssl_context=ssl.create_default_context(),
        connect_retries=1,
        recv_timeout=5,
	use_binary_mode=True,	# test.mosquitto.org has messages with UnicodeError
	user_data=stats,
    )

    # mqtt_client.logger = logger

    mqtt_client.on_message = on_message

    logger.debug(f"connecting")
    mqtt_client.connect()

    topic = "#"
    logger.debug(f"subscribing")
    mqtt_client.subscribe(topic)

    while True:
        mqtt_client.loop(1)
        logger.info(f"Messages: {stats[0]}, bytes: {stats[1]}")


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        sys.exit(0)

It survived several thousands of messages until it got down with the MemoryError. This is a situation from when the logger was set:

2374.516: DEBUG - Got message type: 0x30 pkt: 0x30
Traceback (most recent call last):
  File "code.py", line 55, in <module>
  File "code.py", line 47, in main
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 862, in subscribe
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 1118, in _wait_for_msg
  File "/lib/adafruit_logging.py", line 382, in debug
  File "/lib/adafruit_logging.py", line 328, in _log
MemoryError: memory allocation failed, allocating 23056 bytes
]0;🐍172.40.0.23 | 328@/lib/adafruit_logging.py MemoryError | 8.2.6\
Code done running.

and this with the MQTT logger left at default, just the callback was used to do the logging:

Traceback (most recent call last):
  File "code.py", line 63, in <module>
  File "code.py", line 58, in main
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 1050, in loop
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 1116, in _wait_for_msg
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 1158, in _sock_exact_recv
MemoryError: memory allocation failed, allocating 60000 bytes
]0;🐍172.40.0.23 | 1158@/lib/adafruit_minimqtt/adafruit_ MemoryError | 8.2.6\
Code done running.

Also, I got the timeout once:

Traceback (most recent call last):
  File "code.py", line 62, in <module>
  File "code.py", line 57, in main
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 1050, in loop
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 1116, in _wait_for_msg
  File "/lib/adafruit_minimqtt/adafruit_minimqtt.py", line 1167, in _sock_exact_recv
OSError: [Errno 116] ETIMEDOUT

None is related to the changes, I'd say.

@vladak vladak force-pushed the subscribe_vs_remaining_len branch from 527fa5f to 279387e Compare December 17, 2023 17:14
@vladak vladak mentioned this pull request Dec 17, 2023
Copy link
Contributor

@FoamyGuy FoamyGuy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me.

I tested the latest version successfully with the test script posted above using Adafruit.io server on a Fun House 8.2.8.

@FoamyGuy FoamyGuy merged commit 70faa4f into adafruit:main Dec 17, 2023
1 check passed
adafruit-adabot added a commit to adafruit/Adafruit_CircuitPython_Bundle that referenced this pull request Dec 18, 2023
Updating https://github.com/adafruit/Adafruit_CircuitPython_HTTPServer to 4.5.0 from 4.4.5:
  > Merge pull request adafruit/Adafruit_CircuitPython_HTTPServer#74 from michalpokusa/optional-hashlib-for-websockets

Updating https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT to 7.5.6 from 7.5.4:
  > Merge pull request adafruit/Adafruit_CircuitPython_MiniMQTT#187 from vladak/subscribe_vs_remaining_len

Updating https://github.com/adafruit/Adafruit_CircuitPython_Bundle/circuitpython_library_list.md to NA from NA:
  > Updated download stats for the libraries
@bschymanski
Copy link

Hi,

i get this error message on unsubscribe:
Connected to MQTT Broker!
Flags: 0
RC: 0
Subscribing to werte/wetter/innen_t
Subscribed to werte/wetter/innen_t with QOS level 0
Unsubscribing from werte/wetter/innen_t
New message on topic werte/wetter/innen_t: 23.1
topic: werte/wetter/innen_t, message: 23.1
Traceback (most recent call last):
File "code.py", line 227, in
File "adafruit_minimqtt/adafruit_minimqtt.py", line 888, in unsubscribe
MMQTTException: ('invalid message received as response to UNSUBSCRIBE: 0x30', None)

I use the Mosquitto broker in Homeassistant.

Is there something that i can do about this error?

Bernd

@vladak
Copy link
Contributor Author

vladak commented Aug 26, 2024

Is there something that i can do about this error?

This is a closed issue so it's better to open a new one, esp. if running the latest gratest MiniMQTT.

@vladak vladak deleted the subscribe_vs_remaining_len branch August 26, 2024 19:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants