Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maximum topic length #160

Closed
LelandSindt opened this issue Mar 31, 2023 · 9 comments
Closed

Maximum topic length #160

LelandSindt opened this issue Mar 31, 2023 · 9 comments
Labels
bug Something isn't working

Comments

@LelandSindt
Copy link
Contributor

I am working with Adafruit CircuitPython 8.0.4 on 2023-03-15; Raspberry Pi Pico W with rp2040, Board ID:raspberry_pi_pico_w. Library version adafruit-circuitpython-bundle-8.x-mpy-20230315.

When I try to subscribe to multiple "long" or a single "very long" topic I get the following error(s)

 Traceback (most recent call last):
  File "code.py", line 73, in <module>
  File "adafruit_minimqtt/adafruit_minimqtt.py", line 831, in subscribe
  File "adafruit_minimqtt/adafruit_minimqtt.py", line 1041, in _wait_for_msg
MMQTTException: 

Code done running.

Here is the code I am running...

import wifi
import os
import time
import socketpool
import ssl
import adafruit_minimqtt.adafruit_minimqtt as MQTT

print()
print("Connecting to WiFi")
#  connect to your SSID
wifi.radio.connect(os.getenv('CIRCUITPY_WIFI_SSID'), os.getenv('CIRCUITPY_WIFI_PASSWORD'))
print("Connected to WiFi")
#  prints MAC address to REPL
print("My MAC addr:", [hex(i) for i in wifi.radio.mac_address])
#  prints IP address to REPL
print("My IP address is", wifi.radio.ipv4_address)

def connect(mqtt_client, userdata, flags, rc):
  # This function will be called when the mqtt_client is connected
  # successfully to the broker.
  print("Connected to MQTT Broker!")
  print("Flags: {0}\n RC: {1}".format(flags, rc))

def subscribe(mqtt_client, userdata, topic, granted_qos):
  # This method is called when the mqtt_client subscribes to a new feed.
  print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))

def message(client, topic, message):
  # Method called when a client's subscribed feed has a new value.
  print("New message on topic {0}: {1}".format(topic, message))

# Create a socket pool
pool = socketpool.SocketPool(wifi.radio)

# Set up a MiniMQTT Client
mqtt_client = MQTT.MQTT(
    broker="test.mosquitto.org",
    socket_pool=pool,
    ssl_context=ssl.create_default_context(),
)

# Connect callback handlers to mqtt_client
mqtt_client.on_connect = connect
mqtt_client.on_subscribe = subscribe
mqtt_client.on_message = message

print("Attempting to connect to %s" % mqtt_client.broker)
mqtt_client.connect()

# working topics...
#topics = ("$SYS/broker/subscriptions/count",0)
#topics = [("$SYS/broker/subscriptions/count",0),("$SYS/broker/uptime",0)]
#topics = [("short/path1",0),("short/path2",0)]
#topics = [("longer/path/path1",0),("longer/path/path2",0)]
#topics = [("working/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path",0)]
#topics = [("long/path/path/path/path/path/path/path/path/path/path1",0),("long/path/path/path/path/path/path/path/path/path/path2",0)]

# Broken topics
#topics = [("longer/path/path/path/path/path/path/path/path/path/path/path1",0),("longer/path/path/path/path/path/path/path/path/path/path/path2",0)]
topics = [("broken/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/pat/toolong",0)]

mqtt_client.subscribe(topics)

while True:
  mqtt_client.loop()

I have not been able to find any topic length limits in the documentation. Is this behaviour expected?

@LelandSindt
Copy link
Contributor Author

LelandSindt commented Apr 3, 2023

for what its worth.. I have implemented a work around that "works for now"... https://github.com/LelandSindt/analoguEnvoy/blob/8c307e620575397945cd833c15425e1b4d854dbc/circutpythonMQTT/code.py#L71

@brentru
Copy link
Member

brentru commented Apr 10, 2023

@LelandSindt How long are the topics you are attempting to subscribe to? How many bytes? Could you give me an example of one?

There is a topic string length of 65536 bytes imposed by the MQTT specification itself.

@LelandSindt
Copy link
Contributor Author

@brentru, Thank you for reaching out. The code included in the initial issue is a working example that connects to test.mosquitto.org. Either of the two topic configs under the #Broken topics header will cause the error, and any of the topic configs under the #Working topics header should work without issue.

This example..

echo -n "broken/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/pat/toolong" | wc -c
123

with a total of 123 characters it is certainly shorter than 65k bytes in length.

@brentru brentru added the bug Something isn't working label Apr 11, 2023
@brentru
Copy link
Member

brentru commented Apr 11, 2023

@LelandSindt: Thank you for the feedback! I'm unsure if the issue is the topic length or the number of sub-topics. Marking this as a bug until we can formally test this and resolve it.

@LelandSindt
Copy link
Contributor Author

In my experimentation and experience it is all about length regardless of number of subtopics. for example subscribing to homeassistant/sensor/envoy_2022xxxxxxxx_current_power_production/state and homeassistant/sensor/envoy_2022xxxxxxxx_current_power_consumption/state with all off 4 topics per subscription caused the failure.

Please let me known if you need any more specific details.

@vladak
Copy link
Contributor

vladak commented Nov 20, 2023

Looking at line 1041 (as reported initially in #160 (comment)) in the source code of adafruit-circuitpython-bundle-py-20230315/lib/adafruit_minimqtt/adafruit_minimqtt.py (Adafruit_CircuitPython_MiniMQTT version 7.3.2) there is no enlightement:

  1022	    def _wait_for_msg(self, timeout: float = 0.1) -> Optional[int]:
  1023	        # pylint: disable = too-many-return-statements
  1024	
  1025	        """Reads and processes network events.
  1026	        Return the packet type or None if there is nothing to be received.
  1027	        """
  1028	        # CPython socket module contains a timeout attribute
  1029	        if hasattr(self._socket_pool, "timeout"):
  1030	            try:
  1031	                res = self._sock_exact_recv(1)
  1032	            except self._socket_pool.timeout:
  1033	                return None
  1034	        else:  # socketpool, esp32spi
  1035	            try:
  1036	                res = self._sock_exact_recv(1)
  1037	            except OSError as error:
  1038	                if error.errno in (errno.ETIMEDOUT, errno.EAGAIN):
  1039	                    # raised by a socket timeout if 0 bytes were present
  1040	                    return None
  1041	                raise MMQTTException from error

@vladak
Copy link
Contributor

vladak commented Nov 20, 2023

Trying this with CPython and the latest gratest MiniMQTT (926846c), I too get an exception:

INFO:__main__:Creating MQTT instance
INFO:__main__:Connecting to MQTT broker
DEBUG:log:Attempting to connect to MQTT broker (attempt #0)
DEBUG:log:Attempting to establish MQTT connection...
INFO:log:Establishing an INSECURE connection to 172.40.0.3:1883
DEBUG:log:Sending CONNECT to broker...
DEBUG:log:Fixed Header: bytearray(b'\x10\x14\x00')
DEBUG:log:Variable Header: bytearray(b'\x04MQTT\x04\x02\x00<')
DEBUG:log:Receiving CONNACK packet from broker
DEBUG:log:Got message type: 0x20
DEBUG:log:Resetting reconnect backoff
INFO:__main__:subscribing to [('broken/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/pat/toolong', 0)]
DEBUG:log:SUBSCRIBING to topic broken/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/pat/toolong with QoS 0
Traceback (most recent call last):
  File "/Users/vladimirkotal/Pi/Adafruit_CircuitPython_MiniMQTT/./mqtt_basic.py", line 71, in <module>
    main()
  File "/Users/vladimirkotal/Pi/Adafruit_CircuitPython_MiniMQTT/./mqtt_basic.py", line 40, in main
    mqtt_client.subscribe(topics)
  File "/Users/vladimirkotal/Pi/Adafruit_CircuitPython_MiniMQTT/adafruit_minimqtt/adafruit_minimqtt.py", line 830, in subscribe
    op = self._wait_for_msg()
         ^^^^^^^^^^^^^^^^^^^^
  File "/Users/vladimirkotal/Pi/Adafruit_CircuitPython_MiniMQTT/adafruit_minimqtt/adafruit_minimqtt.py", line 1025, in _wait_for_msg
    res = self._sock_exact_recv(1)
          ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vladimirkotal/Pi/Adafruit_CircuitPython_MiniMQTT/adafruit_minimqtt/adafruit_minimqtt.py", line 1114, in _sock_exact_recv
    recv_len = self._sock.recv_into(rc, bufsize)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [Errno 54] Connection reset by peer

Yes, the broker hung up on the client after seeing the subscribe request.

The code to reproduce this is trivial:

#!/usr/bin/env python3

import json
import socket
import ssl
import sys

import adafruit_minimqtt.adafruit_minimqtt as MQTT

import logging


def main():
    """
    Demonstrate how to use MQTT log handler.
    """
    logging.basicConfig()
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)

    logger.info("Creating MQTT instance")

    broker = "172.40.0.3"
    port = 1883
    mqtt_client = MQTT.MQTT(
        broker=broker,
        port=port,
        socket_pool=socket,
        ssl_context=ssl.create_default_context(),
        connect_retries=3,
    )

    mqtt_client.enable_logger(logging, log_level=logging.DEBUG)

    logger.info("Connecting to MQTT broker")
    mqtt_client.connect()

    topics = [("broken/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/path/pat/toolong",0)]
    logger.info(f"subscribing to {topics}")
    mqtt_client.subscribe(topics)

The broker in question is Mosquitto. Upon reception of the MQTT packet, the server reports this in the log:

1700515215: New connection from 10.0.1.61:55671 on port 1883.
1700515215: New client connected from 10.0.1.61:55671 as cpy13850 (p2, c1, k60).
1700515215: Client cpy13850 disconnected due to malformed packet.

and looking at the traffic dump with Wireshark, the MQTT protocol dissector seems to agree that the SUBSCRIBE packet was malformed.

@vladak
Copy link
Contributor

vladak commented Nov 20, 2023

The problem is in the remaining length bytes. In this case the length is 130 in decimal which is 0x80 hexadecimal, however according to the encoding scheme (MQTT 3.1.1 section 2.2.3 Remaining Length) if the top level bit is set (which it is for 0x80), the next byte contains continuation. The subscribe() however uses only single byte:

packet_length_byte = packet_length.to_bytes(1, "big")
self._pid = self._pid + 1 if self._pid < 0xFFFF else 1
packet_id_bytes = self._pid.to_bytes(2, "big")
# Packet with variable and fixed headers
packet = MQTT_SUB + packet_length_byte + packet_id_bytes
so this is clearly a bug.

vladak added a commit to vladak/Adafruit_CircuitPython_MiniMQTT that referenced this issue Nov 20, 2023
LelandSindt added a commit to LelandSindt/analoguEnvoy that referenced this issue Feb 18, 2024
@LelandSindt
Copy link
Contributor Author

Made time to update and test today... working as expected, thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants