Skip to content

Commit

Permalink
Update examples to work with Java MQTT
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewd0123 committed Jul 16, 2024
1 parent b0af8f0 commit 7497a4d
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 45 deletions.
4 changes: 2 additions & 2 deletions examples/mqtt5_pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def build_source():


def build_sink():
return UUri(authority_name="vcu.matthew.com", ue_id=1234, ue_version_major=1, resource_id=0)
return UUri(authority_name="vcu.matthew.com", ue_id=0xFFFF, ue_version_major=0xFF, resource_id=0xFFFF)


def build_timestamp_upayload():
Expand All @@ -53,7 +53,7 @@ def build_umessage(payload, source=build_source()):


if __name__ == "__main__":
mqtt5_publisher = MQTT5UTransport(build_sink(), "client_pub", "127.0.0.1", 1883, False)
mqtt5_publisher = MQTT5UTransport(build_sink(), "client_pub", "127.0.0.1", 8883, False)
mqtt5_publisher.connect()
umsg: UMessage = build_umessage(build_timestamp_upayload())
while True:
Expand Down
8 changes: 6 additions & 2 deletions examples/mqtt5_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@ def build_source():
return UUri(authority_name="vcu.matthew.com", ue_id=1234, ue_version_major=1, resource_id=0x8000)


def build_sink():
return UUri(authority_name="vcu.matthew.com", ue_id=0xFFFF, ue_version_major=0xFF, resource_id=0xFFFF)


if __name__ == "__main__":
mqtt5_subscriber = MQTT5UTransport(build_source(), "client_sub", "127.0.0.1", 1883, False)
mqtt5_subscriber = MQTT5UTransport(build_source(), "client_sub", "127.0.0.1", 8883, False)
mqtt5_subscriber.connect()
source: UUri = build_source()
listener: MQTT5UListener = MQTT5UListener()
mqtt5_subscriber.register_listener(source, listener)
mqtt5_subscriber.register_listener(source, listener, build_sink())
while True:
time.sleep(10)
56 changes: 24 additions & 32 deletions up_client_mqtt5_python/mqtt5_utransport.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
build_attributes_from_mqtt_properties,
build_message_from_mqtt_message_and_attributes,
build_mqtt_properties_from_attributes,
uuri_field_resolver,
)

logging.basicConfig(format="%(levelname)s| %(filename)s:%(lineno)s %(message)s")
Expand Down Expand Up @@ -70,7 +71,7 @@ def __init__(self, source: UUri, client_id: str, host_name: str, port: int, clou

self._connected_signal = threading.Event()

self.topic_to_listener: Dict[bytes, List[UListener]] = {}
self.topic_to_listener: Dict[str, List[UListener]] = {}
self.reqid_to_future: Dict[bytes, Future] = {}

self._mqtt_client = mqtt.Client(
Expand Down Expand Up @@ -133,11 +134,13 @@ def _listen(self, client, userdata, msg):
@param msg:
@return: None
"""
logger.info(f"Received Message on MQTT: {msg}")

attributes: UAttributes = build_attributes_from_mqtt_properties(msg.properties)
umsg: UMessage = build_message_from_mqtt_message_and_attributes(msg, attributes)

message_type_handlers = {
UMessageType.UMESSAGE_TYPE_UNSPECIFIED: self._handle_unspecified_message,
UMessageType.UMESSAGE_TYPE_PUBLISH: self._handle_publish_message,
UMessageType.UMESSAGE_TYPE_REQUEST: self._handle_publish_message,
UMessageType.UMESSAGE_TYPE_RESPONSE: self._handle_response_message,
Expand All @@ -147,9 +150,14 @@ def _listen(self, client, userdata, msg):
if handler:
handler(msg.topic, umsg)
else:
raise ValueError("Unsupported message type: " + attributes.type)
raise ValueError("Unsupported message type: " + UMessageType.Name(attributes.type))

def _handle_response_message(self, umsg: UMessage):
def _handle_unspecified_message(self, topic: str, umsg: UMessage):
logger.info("%s Unspecified Message Received", self.__class__.__name__)
logger.info(f"Message Details: {umsg}")
logger.info(f"Unspecified Message received on topic {topic}")

def _handle_response_message(self, topic: str, umsg: UMessage):
request_id: UUID = umsg.attributes.reqid
request_id_b: bytes = request_id.SerializeToString()

Expand All @@ -161,7 +169,7 @@ def _handle_response_message(self, umsg: UMessage):

def _handle_publish_message(self, topic: str, umsg: UMessage):
if topic in self.topic_to_listener:
logger.info("%s Handle Topic", self.__class__.__name__)
logger.info("%s Handle Publish Message on Topic", self.__class__.__name__)

for listener in self.topic_to_listener[topic]:
listener.on_receive(umsg)
Expand All @@ -179,36 +187,18 @@ def mqtt_topic_builder(self, source: UUri, sink: UUri = None) -> str:
"""

device = "c" if self.cloud_device else "d"
src_auth_name = source.authority_name if source != UUri() else "+"
src_ue_id = source.ue_id if source != UUri() and source.ue_id != 0xFFFF else "+"
src_ue_version_major = source.ue_version_major if source != UUri() and source.ue_version_major != 0xFF else "+"
src_resource_id = source.resource_id if source != UUri() and source.resource_id != 0xFFFF else "+"
topic = (
device
+ "/"
+ src_auth_name
+ "/"
+ str(src_ue_id)
+ "/"
+ str(src_ue_version_major)
+ "/"
+ str(src_resource_id)
)
if source != UUri():
src_auth_name = source.authority_name if source != UUri() else "+"
src_ue_id = uuri_field_resolver(source.ue_id, 0xFFFF, "ffff")
src_ue_version_major = uuri_field_resolver(source.ue_version_major, 0xFF, "ff")
src_resource_id = uuri_field_resolver(source.resource_id, 0xFFFF, "ffff")
topic = device + "/" + src_auth_name + "/" + src_ue_id + "/" + src_ue_version_major + "/" + src_resource_id
if sink is not None and sink != UUri():
sink_auth_name = sink.authority_name
sink_ue_id = sink.ue_id if sink.ue_id != 0xFFFF else "+"
sink_ue_version_major = sink.ue_version_major if sink.ue_version_major != 0xFF else "+"
sink_resource_id = sink.resource_id if sink.resource_id != 0xFFFF else "+"
topic += (
"/"
+ sink_auth_name
+ "/"
+ str(sink_ue_id)
+ "/"
+ str(sink_ue_version_major)
+ "/"
+ str(sink_resource_id)
)
sink_ue_id = uuri_field_resolver(sink.ue_id, 0xFFFF, "ffff")
sink_ue_version_major = uuri_field_resolver(sink.ue_version_major, 0xFF, "ff")
sink_resource_id = uuri_field_resolver(sink.resource_id, 0xFFFF, "ffff")
topic += "/" + sink_auth_name + "/" + sink_ue_id + "/" + sink_ue_version_major + "/" + sink_resource_id
return topic

def send(self, message: UMessage) -> UStatus:
Expand Down Expand Up @@ -248,6 +238,8 @@ def register_listener(self, source_filter: UUri, listener: UListener, sink_filte
"""

mqtt_topic = self.mqtt_topic_builder(source=source_filter, sink=sink_filter)
logger.info("%s Registering Listener for Topic: %s", self.__class__.__name__, mqtt_topic)

self.topic_to_listener.setdefault(mqtt_topic, []).append(listener)

self._mqtt_client.subscribe(topic=mqtt_topic, qos=1)
Expand Down
32 changes: 23 additions & 9 deletions up_client_mqtt5_python/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
from uprotocol.transport.builder.umessagebuilder import UMessageBuilder
from uprotocol.uri.serializer.uriserializer import UriSerializer
from uprotocol.uuid.serializer.uuidserializer import UuidSerializer
from uprotocol.v1.uattributes_pb2 import UAttributes, UMessageType, UPriority
from uprotocol.v1.ucode_pb2 import UCode
from uprotocol.v1.uattributes_pb2 import UAttributes, UMessageType
from uprotocol.v1.umessage_pb2 import UMessage


Expand Down Expand Up @@ -63,27 +62,27 @@ def build_attributes_from_mqtt_properties(publish_properties) -> UAttributes:
if user_property[0] == "1":
attributes.id.CopyFrom(UuidSerializer.deserialize(user_property[1]))
elif user_property[0] == "2":
attributes.type = UMessageType.Value(user_property[1])
attributes.type = int(user_property[1])
elif user_property[0] == "3":
attributes.source.CopyFrom(UriSerializer.deserialize(user_property[1]))
elif user_property[0] == "4":
attributes.sink.CopyFrom(UriSerializer.deserialize(user_property[1]))
elif user_property[0] == "5":
attributes.priority = UPriority.Value(user_property[1])
attributes.priority = int(user_property[1])
elif user_property[0] == "6":
attributes.ttl = int(user_property[1])
elif user_property[0] == "7":
attributes.permission_level = int(user_property[1])
elif user_property[0] == "8":
attributes.commstatus = UCode.Value(user_property[1])
attributes.commstatus = int(user_property[1])
elif user_property[0] == "9":
attributes.reqid.CopyFrom(UuidSerializer.deserialize(user_property[1]))
elif user_property[0] == "10":
attributes.token = user_property[1]
elif user_property[0] == "11":
attributes.traceparent = user_property[1]
elif user_property[0] == "12":
attributes.payload_format = user_property[1]
attributes.payload_format = int(user_property[1])
return attributes


Expand All @@ -98,7 +97,7 @@ def build_mqtt_properties_from_attributes(attributes: UAttributes):
try:
if attributes.HasField("id"):
publish_properties.UserProperty.append(("1", UuidSerializer.serialize(attributes.id)))
publish_properties.UserProperty.append(("2", UMessageType.Name(attributes.type)))
publish_properties.UserProperty.append(("2", str(attributes.type)))
if attributes.HasField("source"):
publish_properties.UserProperty.append(("3", UriSerializer.serialize(attributes.source)))
if attributes.HasField("sink"):
Expand All @@ -108,13 +107,13 @@ def build_mqtt_properties_from_attributes(attributes: UAttributes):
UriSerializer.serialize(attributes.sink),
)
)
publish_properties.UserProperty.append(("5", UPriority.Name(attributes.priority)))
publish_properties.UserProperty.append(("5", str(attributes.priority)))
if attributes.HasField("ttl"):
publish_properties.UserProperty.append(("6", str(attributes.ttl)))
if attributes.HasField("permission_level"):
publish_properties.UserProperty.append(("7", str(attributes.permission_level)))
if attributes.HasField("commstatus"):
publish_properties.UserProperty.append(("8", UCode.Name(attributes.commstatus)))
publish_properties.UserProperty.append(("8", str(attributes.commstatus)))
if attributes.type == UMessageType.UMESSAGE_TYPE_RESPONSE:
publish_properties.UserProperty.append(
(
Expand All @@ -130,3 +129,18 @@ def build_mqtt_properties_from_attributes(attributes: UAttributes):
raise ValueError(e) from e

return publish_properties


def length_resolver(field):
return "0" + field if len(field) % 2 == 1 else field


def uuri_field_resolver(field, wildcard_value, wild_return="+"):
"""
Returns self if value isn't wild or empty, else returns wildcard_value
:param field: field to resolve
:wildcard_value: wildcard value of the field
:return: resolved field
"""
hex_val = length_resolver(f'{field:x}')
return hex_val if field != wildcard_value else wild_return

0 comments on commit 7497a4d

Please sign in to comment.