Skip to content

Commit

Permalink
improve accessibility of connection state
Browse files Browse the repository at this point in the history
  • Loading branch information
tillsteinbach committed Mar 2, 2025
1 parent 6d2aad2 commit 94da0d7
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 65 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ authors = [
{ name = "Till Steinbach" }
]
dependencies = [
"carconnectivity>=0.4a8",
"carconnectivity>=0.4a11",
"paho-mqtt~=2.1.0",
"python-dateutil~=2.9.0"
]
Expand Down
142 changes: 85 additions & 57 deletions src/carconnectivity_plugins/mqtt/mqtt_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from carconnectivity import attributes, commands
from carconnectivity.observable import Observable
from carconnectivity.json_util import ExtendedWithNullEncoder
from carconnectivity.enums import ConnectionState

SUPPORT_IMAGES = False
try:
Expand All @@ -42,6 +43,8 @@

from carconnectivity.carconnectivity import CarConnectivity

from carconnectivity_plugins.mqtt.plugin import Plugin


LOG = logging.getLogger("carconnectivity.plugins.mqtt")

Expand Down Expand Up @@ -80,15 +83,17 @@ class CarConnectivityMQTTClient(Client): # pylint: disable=too-many-instance-at
MQTT client for car connectivity.
"""
# pylint: disable-next=too-many-arguments, too-many-positional-arguments, too-many-locals
def __init__(self, car_connectivity: CarConnectivity, plugin_id: str, client_id: Optional[str] = None,
def __init__(self, plugin: Plugin, car_connectivity: CarConnectivity, client_id: Optional[str] = None,
protocol: MQTTProtocolVersion = MQTTProtocolVersion.MQTTv311,
transport: Literal["tcp", "websockets", "unix"] = 'tcp',
prefix: Optional[str] = 'carconnectivity/0', ignore_for: int = 0, republish_on_update=False, retain_on_disconnect=False,
topic_filter_regex=None, convert_timezone: Optional[tzinfo] = None, time_format=None, with_raw_json_topic=False,
topic_format: TopicFormat = TopicFormat.SIMPLE, locale: Optional[str] = None, image_format: ImageFormat = ImageFormat.PNG) -> None:
topic_format: TopicFormat = TopicFormat.SIMPLE, locale: Optional[str] = None, image_format: ImageFormat = ImageFormat.PNG,
with_full_json: bool = False) -> None:
super().__init__(callback_api_version=CallbackAPIVersion.VERSION2, client_id=client_id, transport=transport, protocol=protocol)
self.car_connectivity: CarConnectivity = car_connectivity
self.plugin_id: str = plugin_id
self.plugin = plugin
self.plugin_id: str = plugin.id
self.prefix: str = prefix or 'carconnectivity/0'
self.has_error: Optional[bool] = None
self.ignore_for: int = ignore_for
Expand All @@ -107,6 +112,7 @@ def __init__(self, car_connectivity: CarConnectivity, plugin_id: str, client_id:
self.topic_format = topic_format
self.locale = locale
self.image_format: ImageFormat = image_format
self.with_full_json: bool = with_full_json

self.on_connect = self._on_connect_callback
self._on_connect_callbacks: Set[CallbackOnConnect] = set()
Expand All @@ -117,17 +123,8 @@ def __init__(self, car_connectivity: CarConnectivity, plugin_id: str, client_id:
self.on_subscribe = self._on_subscribe_callback
self._on_subscribe_callbacks: Set[CallbackOnSubscribe] = set()

if self.republish_on_update:
flags: Observable.ObserverEvent = (Observable.ObserverEvent.UPDATED
| Observable.ObserverEvent.ENABLED
| Observable.ObserverEvent.DISABLED)
else:
flags = (Observable.ObserverEvent.VALUE_CHANGED
| Observable.ObserverEvent.ENABLED
| Observable.ObserverEvent.DISABLED)
self.car_connectivity.add_observer(self._on_carconnectivity_event, flags, priority=Observable.ObserverPriority.USER_MID)

self.will_set(topic=f'{self.prefix}/plugins/{self.plugin_id}/connected', qos=1, retain=True, payload=False)
self.will_set(topic=f'{self.prefix}{self.plugin.connection_state.get_absolute_path}', qos=1, retain=True,
payload=ConnectionState.DISCONNECTED.value)

def add_on_connect_callback(self, callback: CallbackOnConnect) -> None:
"""
Expand Down Expand Up @@ -305,6 +302,16 @@ def publish_topics(self) -> None:
self.publish(topic=writeabletopicstopic, qos=1, retain=True, payload=content)
self.writeable_topics_changed = False

def connect(self, *args, **kwargs) -> MQTTErrorCode:
"""
Connects the MQTT client
Returns:
MQTTErrorCode: The result of the connection attempt.
"""
self.plugin.connection_state._set_value(value=ConnectionState.CONNECTING) # pylint: disable=protected-access
return super().connect(*args, **kwargs)

def disconnect(self, reasoncode: Optional[ReasonCode] = None, properties: Optional[Properties] = None) -> MQTTErrorCode:
"""
Disconnect from the MQTT broker while setting connected topic to false.
Expand All @@ -317,7 +324,9 @@ def disconnect(self, reasoncode: Optional[ReasonCode] = None, properties: Option
paho.mqtt.client.MQTTErrorCode: The result of the disconnect
"""
try:
disconect_publish = self.publish(topic=f'{self.prefix}/plugins/{self.plugin_id}/connected', qos=1, retain=True, payload=False)
self.plugin.connection_state._set_value(value=ConnectionState.DISCONNECTED) # pylint: disable=protected-access
disconect_publish = self.publish(topic=f'{self.prefix}{self.plugin.connection_state.get_absolute_path}', qos=1, retain=True,
payload=ConnectionState.DISCONNECTED.value)
disconect_publish.wait_for_publish()
except RuntimeError:
pass
Expand Down Expand Up @@ -484,7 +493,18 @@ def _on_connect_callback(self, mqttc, obj, flags, reason_code, properties) -> No
# reason_code 0 means success
if reason_code == 0:
LOG.info('Connected to MQTT broker')
self.publish(topic=f'{self.prefix}/plugins/{self.plugin_id}/connected', qos=1, retain=False, payload=True)
# register callback for carconnectivity events
if self.republish_on_update:
flags: Observable.ObserverEvent = (Observable.ObserverEvent.UPDATED
| Observable.ObserverEvent.ENABLED
| Observable.ObserverEvent.DISABLED)
else:
flags = (Observable.ObserverEvent.VALUE_CHANGED
| Observable.ObserverEvent.ENABLED
| Observable.ObserverEvent.DISABLED)
self.car_connectivity.add_observer(self._on_carconnectivity_event, flags, priority=Observable.ObserverPriority.USER_MID)

self.plugin.connection_state._set_value(value=ConnectionState.CONNECTED) # pylint: disable=protected-access
# subsribe to the force update topic
force_update_topic: str = f'{self.prefix}/plugins/{self.plugin_id}/carconnectivityForceUpdate_writetopic'
self.subscribe(force_update_topic, qos=2)
Expand Down Expand Up @@ -514,49 +534,56 @@ def _on_connect_callback(self, mqttc, obj, flags, reason_code, properties) -> No
# if attribute has a value, publish it
if attribute.value is not None:
self._publish_element(attribute)
# publish raw json topic if needed
if self.with_full_json:
full_json_topic: str = f'{self.prefix}/full_json'
self._add_topic(full_json_topic, with_filter=True, subscribe=False, writeable=False)
self.publish(topic=full_json_topic, qos=1, retain=True, payload=self.car_connectivity.as_json(pretty=True))
# Handle different reason codes
elif reason_code == 128:
LOG.error('Could not connect (%s): Unspecified error', reason_code)
elif reason_code == 129:
LOG.error('Could not connect (%s): Malformed packet', reason_code)
elif reason_code == 130:
LOG.error('Could not connect (%s): Protocol error', reason_code)
elif reason_code == 131:
LOG.error('Could not connect (%s): Implementation specific error', reason_code)
elif reason_code == 132:
LOG.error('Could not connect (%s): Unsupported protocol version', reason_code)
elif reason_code == 133:
LOG.error('Could not connect (%s): Client identifier not valid', reason_code)
elif reason_code == 134:
LOG.error('Could not connect (%s): Bad user name or password', reason_code)
elif reason_code == 135:
LOG.error('Could not connect (%s): Not authorized', reason_code)
elif reason_code == 136:
LOG.error('Could not connect (%s): Server unavailable', reason_code)
elif reason_code == 137:
LOG.error('Could not connect (%s): Server busy. Retrying', reason_code)
elif reason_code == 138:
LOG.error('Could not connect (%s): Banned', reason_code)
elif reason_code == 140:
LOG.error('Could not connect (%s): Bad authentication method', reason_code)
elif reason_code == 144:
LOG.error('Could not connect (%s): Topic name invalid', reason_code)
elif reason_code == 149:
LOG.error('Could not connect (%s): Packet too large', reason_code)
elif reason_code == 151:
LOG.error('Could not connect (%s): Quota exceeded', reason_code)
elif reason_code == 154:
LOG.error('Could not connect (%s): Retain not supported', reason_code)
elif reason_code == 155:
LOG.error('Could not connect (%s): QoS not supported', reason_code)
elif reason_code == 156:
LOG.error('Could not connect (%s): Use another server', reason_code)
elif reason_code == 157:
LOG.error('Could not connect (%s): Server move', reason_code)
elif reason_code == 159:
LOG.error('Could not connect (%s): Connection rate exceeded', reason_code)
else:
LOG.error('Could not connect (%s)', reason_code)
self.plugin.connection_state._set_value(value=ConnectionState.ERROR) # pylint: disable=protected-access
if reason_code == 128:
LOG.error('Could not connect (%s): Unspecified error', reason_code)
elif reason_code == 129:
LOG.error('Could not connect (%s): Malformed packet', reason_code)
elif reason_code == 130:
LOG.error('Could not connect (%s): Protocol error', reason_code)
elif reason_code == 131:
LOG.error('Could not connect (%s): Implementation specific error', reason_code)
elif reason_code == 132:
LOG.error('Could not connect (%s): Unsupported protocol version', reason_code)
elif reason_code == 133:
LOG.error('Could not connect (%s): Client identifier not valid', reason_code)
elif reason_code == 134:
LOG.error('Could not connect (%s): Bad user name or password', reason_code)
elif reason_code == 135:
LOG.error('Could not connect (%s): Not authorized', reason_code)
elif reason_code == 136:
LOG.error('Could not connect (%s): Server unavailable', reason_code)
elif reason_code == 137:
LOG.error('Could not connect (%s): Server busy. Retrying', reason_code)
elif reason_code == 138:
LOG.error('Could not connect (%s): Banned', reason_code)
elif reason_code == 140:
LOG.error('Could not connect (%s): Bad authentication method', reason_code)
elif reason_code == 144:
LOG.error('Could not connect (%s): Topic name invalid', reason_code)
elif reason_code == 149:
LOG.error('Could not connect (%s): Packet too large', reason_code)
elif reason_code == 151:
LOG.error('Could not connect (%s): Quota exceeded', reason_code)
elif reason_code == 154:
LOG.error('Could not connect (%s): Retain not supported', reason_code)
elif reason_code == 155:
LOG.error('Could not connect (%s): QoS not supported', reason_code)
elif reason_code == 156:
LOG.error('Could not connect (%s): Use another server', reason_code)
elif reason_code == 157:
LOG.error('Could not connect (%s): Server move', reason_code)
elif reason_code == 159:
LOG.error('Could not connect (%s): Connection rate exceeded', reason_code)
else:
LOG.error('Could not connect (%s)', reason_code)
for callback in self._on_connect_callbacks:
callback(mqttc, obj, flags, reason_code, properties)

Expand All @@ -575,6 +602,7 @@ def _on_disconnect_callback(self, client, userdata, flags, reason_code, properti
Returns:
None
"""
self.car_connectivity.remove_observer(self._on_carconnectivity_event)
if reason_code == 0:
LOG.info('Client successfully disconnected')
elif reason_code == 4:
Expand Down
25 changes: 18 additions & 7 deletions src/carconnectivity_plugins/mqtt/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

from carconnectivity.errors import ConfigurationError
from carconnectivity.util import config_remove_credentials
from carconnectivity.attributes import EnumAttribute
from carconnectivity.enums import ConnectionState

from carconnectivity_plugins.base.plugin import BasePlugin
from carconnectivity_plugins.mqtt.mqtt_client import CarConnectivityMQTTClient, TopicFormat, ImageFormat
from carconnectivity_plugins.mqtt._version import __version__
Expand All @@ -35,7 +38,9 @@ class Plugin(BasePlugin): # pylint: disable=too-many-instance-attributes
"""
def __init__(self, plugin_id: str, car_connectivity: CarConnectivity, config: Dict) -> None: # pylint: disable=too-many-branches, too-many-statements
BasePlugin.__init__(self, plugin_id=plugin_id, car_connectivity=car_connectivity, config=config, log=LOG)
self._healthy = False

self.connection_state: EnumAttribute = EnumAttribute(name="connection_state", parent=self, value_type=ConnectionState,
value=ConnectionState.DISCONNECTED, tags={'plugin_custom'})

self._background_connect_thread: Optional[threading.Thread] = None
self._background_publish_topics_thread: Optional[threading.Thread] = None
Expand Down Expand Up @@ -225,8 +230,13 @@ def __init__(self, plugin_id: str, car_connectivity: CarConnectivity, config: Di
self.image_format: ImageFormat = ImageFormat.PNG
self.active_config['image_format'] = self.image_format.value

self.mqtt_client = CarConnectivityMQTTClient(car_connectivity=self.car_connectivity,
plugin_id=plugin_id,
if 'with_full_json' in config and config['with_full_json'] is not None:
self.active_config['with_full_json'] = config['with_full_json']
else:
self.active_config['with_full_json'] = False

self.mqtt_client = CarConnectivityMQTTClient(plugin=self,
car_connectivity=self.car_connectivity,
client_id=self.active_config['clientid'],
protocol=self.mqttversion,
transport=self.active_config['transport'],
Expand All @@ -240,7 +250,8 @@ def __init__(self, plugin_id: str, car_connectivity: CarConnectivity, config: Di
with_raw_json_topic=False,
topic_format=self.active_config['topic_format'],
locale=self.active_config['locale'],
image_format=self.image_format)
image_format=self.image_format,
with_full_json=self.active_config['with_full_json'])
if self.active_config['tls']:
if self.active_config['tls_insecure']:
cert_required: ssl.VerifyMode = ssl.CERT_NONE
Expand All @@ -265,7 +276,7 @@ def startup(self) -> None:
self._background_publish_topics_thread = threading.Thread(target=self._background_publish_topics_loop, daemon=False)
self._background_publish_topics_thread.name = 'carconnectivity.plugins.mqtt-background_publish_topics'
self._background_publish_topics_thread.start()
self._healthy = True
self.healthy._set_value(value=True) # pylint: disable=protected-access
LOG.debug("Starting MQTT plugin done")

def _background_connect_loop(self) -> None:
Expand Down Expand Up @@ -307,5 +318,5 @@ def get_version(self) -> str:
def get_type(self) -> str:
return "carconnectivity-plugin-mqtt"

def is_healthy(self) -> bool:
return self._healthy and super().is_healthy()
def get_name(self) -> str:
return "MQTT Plugin"

0 comments on commit 94da0d7

Please sign in to comment.