Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions tb_mqtt_client/common/install_package_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2025 ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from sys import executable
from subprocess import check_call, CalledProcessError, DEVNULL
from pkg_resources import get_distribution, DistributionNotFound


def install_package(package, version="upgrade"):
result = False

def try_install(args, suppress_stderr=False):
try:
stderr = DEVNULL if suppress_stderr else None
check_call([executable, "-m", "pip", *args], stderr=stderr)
return True
except CalledProcessError:
return False

if version.lower() == "upgrade":
args = ["install", package, "--upgrade"]
result = try_install(args + ["--user"], suppress_stderr=True)
if not result:
result = try_install(args)
else:
try:
installed_version = get_distribution(package).version
if installed_version == version:
return True
except DistributionNotFound:
pass
install_version = f"{package}=={version}" if ">=" not in version else f"{package}{version}"
args = ["install", install_version]
if not try_install(args + ["--user"], suppress_stderr=True):
result = try_install(args)

return result
36 changes: 36 additions & 0 deletions tb_mqtt_client/constants/firmware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2025 ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum


class FirmwareStates(Enum):
IDLE = 'IDLE'
DOWNLOADING = 'DOWNLOADING'
DOWNLOADED = 'DOWNLOADED'
VERIFIED = 'VERIFIED'
FAILED = 'FAILED'
UPDATING = 'UPDATING'
UPDATED = 'UPDATED'


FW_TITLE_ATTR = "fw_title"
FW_VERSION_ATTR = "fw_version"
FW_CHECKSUM_ATTR = "fw_checksum"
FW_CHECKSUM_ALG_ATTR = "fw_checksum_algorithm"
FW_SIZE_ATTR = "fw_size"
FW_STATE_ATTR = "fw_state"

REQUIRED_SHARED_KEYS = [FW_CHECKSUM_ATTR, FW_CHECKSUM_ALG_ATTR,
FW_SIZE_ATTR, FW_TITLE_ATTR, FW_VERSION_ATTR]
10 changes: 10 additions & 0 deletions tb_mqtt_client/constants/mqtt_topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
DEVICE_RPC_RESPONSE_TOPIC_FOR_SUBSCRIPTION = DEVICE_RPC_TOPIC + RESPONSE_TOPIC_SUFFIX + "/" + WILDCARD
# Device Claim topic
DEVICE_CLAIM_TOPIC = "v1/devices/me/claim"
# Device Provisioning topics
PROVISION_REQUEST_TOPIC = "/provision/request"
PROVISION_RESPONSE_TOPIC = "/provision/response"
# Device Firmware Update topics
DEVICE_FIRMWARE_UPDATE_RESPONSE_TOPIC = "v2/fw/response/+/chunk/+"
DEVICE_FIRMWARE_UPDATE_REQUEST_TOPIC = "v2/fw/request/{request_id}/chunk/{current_chunk}"

# V1 Topics for Gateway API
BASE_GATEWAY_TOPIC = "v1/gateway"
Expand Down Expand Up @@ -69,3 +75,7 @@ def build_gateway_device_attributes_topic() -> str:

def build_gateway_rpc_topic() -> str:
return GATEWAY_RPC_TOPIC


def build_firmware_update_request_topic(request_id: int, current_chunk: int) -> str:
return DEVICE_FIRMWARE_UPDATE_REQUEST_TOPIC.format(request_id=request_id, current_chunk=current_chunk)
29 changes: 29 additions & 0 deletions tb_mqtt_client/constants/provisioning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2025 ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum


class ProvisioningResponseStatus(Enum):
SUCCESS = "SUCCESS"
ERROR = "FAILURE"

def __str__(self):
return self.value


class ProvisioningCredentialsType(Enum):
ACCESS_TOKEN = "ACCESS_TOKEN"
MQTT_BASIC = "MQTT_BASIC"
X509_CERTIFICATE = "X509_CERTIFICATE"
77 changes: 77 additions & 0 deletions tb_mqtt_client/entities/data/provisioning_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright 2025 ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from typing import Optional

from tb_mqtt_client.constants.provisioning import ProvisioningCredentialsType


class ProvisioningRequest:
def __init__(self, host, credentials: 'ProvisioningCredentials', port: str = "1883",
device_name: Optional[str] = None, gateway: Optional[bool] = False):
self.host = host
self.port = port
self.credentials = credentials
self.device_name = device_name
self.gateway = gateway


class ProvisioningCredentials(ABC):
@abstractmethod
def __init__(self, provision_device_key: str, provision_device_secret: str):
self.provision_device_key = provision_device_key
self.provision_device_secret = provision_device_secret
self.credentials_type: ProvisioningCredentialsType


class AccessTokenProvisioningCredentials(ProvisioningCredentials):
def __init__(self, provision_device_key: str, provision_device_secret: str, access_token: Optional[str] = None):
super().__init__(provision_device_key, provision_device_secret)
self.access_token = access_token
self.credentials_type = ProvisioningCredentialsType.ACCESS_TOKEN


class BasicProvisioningCredentials(ProvisioningCredentials):
def __init__(self, provision_device_key, provision_device_secret,
client_id: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None):
super().__init__(provision_device_key, provision_device_secret)
self.client_id = client_id
self.username = username
self.password = password
self.credentials_type = ProvisioningCredentialsType.MQTT_BASIC


class X509ProvisioningCredentials(ProvisioningCredentials):
def __init__(self, provision_device_key, provision_device_secret,
private_key_path: str, public_cert_path: str, ca_cert_path: str):
super().__init__(provision_device_key, provision_device_secret)
self.private_key_path = private_key_path
self.ca_cert_path = ca_cert_path
self.public_cert_path = public_cert_path
self.public_cert = self._load_public_cert_path(public_cert_path)
self.credentials_type = ProvisioningCredentialsType.X509_CERTIFICATE

def _load_public_cert_path(public_cert_path):
content = ''

try:
with open(public_cert_path, 'r') as file:
content = file.read()
except FileNotFoundError:
raise FileNotFoundError(f"Public certificate file not found: {public_cert_path}")
except IOError as e:
raise IOError(f"Error reading public certificate file {public_cert_path}: {e}")

return content.strip() if content else None
73 changes: 73 additions & 0 deletions tb_mqtt_client/entities/data/provisioning_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright 2025 ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
from typing import Optional

from tb_mqtt_client.common.config_loader import DeviceConfig
from tb_mqtt_client.constants.provisioning import ProvisioningResponseStatus
from tb_mqtt_client.entities.data.provisioning_request import ProvisioningRequest, ProvisioningCredentialsType


@dataclass(frozen=True)
class ProvisioningResponse:
status: ProvisioningResponseStatus
result: Optional[DeviceConfig] = None
error: Optional[str] = None

def __new__(cls, *args, **kwargs):
raise TypeError("Direct instantiation of ProvisioningResponse is not allowed. Use ProvisioningResponse.build(result, error).") # noqa

def __repr__(self) -> str:
return f"ProvisioningResponse(status={self.status}, result={self.result}, error={self.error})"

@classmethod
def build(cls, provision_request: 'ProvisioningRequest', payload: dict) -> 'ProvisioningResponse':
"""
Constructs a ProvisioningResponse explicitly.
"""
self = object.__new__(cls)

if payload.get('status') == ProvisioningResponseStatus.ERROR.value:
object.__setattr__(self, 'error', payload.get('errorMsg'))
object.__setattr__(self, 'status', ProvisioningResponseStatus.ERROR)
object.__setattr__(self, 'result', None)
else:
device_config = ProvisioningResponse._build_device_config(provision_request, payload)

object.__setattr__(self, 'result', device_config)
object.__setattr__(self, 'status', ProvisioningResponseStatus.SUCCESS)
object.__setattr__(self, 'error', None)

return self

@staticmethod
def _build_device_config(provision_request: 'ProvisioningRequest', payload: dict):
device_config = DeviceConfig()
device_config.host = provision_request.host
device_config.port = provision_request.port

if provision_request.credentials.credentials_type is None or \
provision_request.credentials.credentials_type == ProvisioningCredentialsType.ACCESS_TOKEN:
device_config.access_token = payload['credentialsValue']
elif provision_request.credentials.credentials_type == ProvisioningCredentialsType.MQTT_BASIC:
device_config.client_id = payload['credentialsValue']['clientId']
device_config.username = payload['credentialsValue']['userName']
device_config.password = payload['credentialsValue']['password']
elif provision_request.credentials.credentials_type == ProvisioningCredentialsType.X509_CERTIFICATE:
device_config.ca_cert = provision_request.credentials.ca_cert_path
device_config.client_cert = provision_request.credentials.public_cert_path
device_config.private_key = provision_request.credentials.private_key_path

return device_config
73 changes: 73 additions & 0 deletions tb_mqtt_client/entities/provisioning_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright 2025 ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from asyncio import Event

from gmqtt import Client as GMQTTClient
from orjson import loads

from tb_mqtt_client.common.config_loader import DeviceConfig
from tb_mqtt_client.common.logging_utils import get_logger
from tb_mqtt_client.constants.mqtt_topics import PROVISION_RESPONSE_TOPIC
from tb_mqtt_client.entities.data.provisioning_request import ProvisioningRequest
from tb_mqtt_client.entities.data.provisioning_response import ProvisioningResponse
from tb_mqtt_client.service.message_dispatcher import JsonMessageDispatcher

logger = get_logger(__name__)


class ProvisioningClient:
def __init__(self, host, port, provision_request: 'ProvisioningRequest'):
self._log = logger
self._stop_event = Event()
self._host = host
self._port = port
self._provision_request = provision_request
self._client_id = "provision"
self._client = GMQTTClient(self._client_id)
self._client.on_connect = self._on_connect
self._client.on_message = self._on_message
self._provisioned = Event()
self._device_config: 'DeviceConfig' = None
self._json_message_dispatcher = JsonMessageDispatcher()

def _on_connect(self, client, _, rc, __):
if rc == 0:
self._log.debug("[Provisioning client] Connected to ThingsBoard")
client.subscribe(PROVISION_RESPONSE_TOPIC)
topic, payload = self._json_message_dispatcher.build_provision_request(self._provision_request)
self._log.debug("[Provisioning client] Sending provisioning request %s" % payload)
client.publish(topic, payload)
else:
self._device_config = ProvisioningResponse.build(self._provision_request,
{'status': 'FAILURE',
'errorMsg': 'Cannot connect to ThingsBoard!'})
self._provisioned.set()
self._log.error("[Provisioning client] Cannot connect to ThingsBoard!, result: %s" % rc)

async def _on_message(self, _, __, payload, ___, ____):
decoded_payload = payload.decode("UTF-8")
self._log.debug("[Provisioning client] Received data from ThingsBoard: %s" % decoded_payload)
decoded_message = loads(decoded_payload)

self._device_config = ProvisioningResponse.build(self._provision_request, decoded_message)

await self._client.disconnect()
self._provisioned.set()

async def provision(self):
await self._client.connect(self._host, self._port)
await self._provisioned.wait()

return self._device_config
Loading