1
1
"""Module to support CBT Power Smart BMS."""
2
2
3
3
import asyncio
4
- from collections .abc import Callable
5
4
import logging
5
+ from collections .abc import Callable
6
6
from typing import Any , Final
7
7
8
8
from bleak .backends .device import BLEDevice
9
9
from bleak .uuids import normalize_uuid_str
10
+ from homeassistant .util .unit_conversion import _HRS_TO_SECS
10
11
11
12
from custom_components .bms_ble .const import (
12
13
ATTR_BATTERY_CHARGING ,
23
24
KEY_CELL_VOLTAGE ,
24
25
KEY_DESIGN_CAP ,
25
26
)
26
- from homeassistant .util .unit_conversion import _HRS_TO_SECS
27
27
28
- from .basebms import BaseBMS , BMSsample
28
+ from .basebms import BaseBMS , BMSsample , crc_sum
29
29
30
30
BAT_TIMEOUT : Final = 1
31
31
LOGGER : Final = logging .getLogger (__name__ )
@@ -42,23 +42,23 @@ class BMS(BaseBMS):
42
42
LEN_POS : Final [int ] = 3
43
43
CMD_POS : Final [int ] = 2
44
44
CELL_VOLTAGE_CMDS : Final [list [int ]] = [0x5 , 0x6 , 0x7 , 0x8 ]
45
+ _FIELDS : Final [
46
+ list [tuple [str , int , int , int , bool , Callable [[int ], int | float ]]]
47
+ ] = [
48
+ (ATTR_VOLTAGE , 0x0B , 4 , 4 , False , lambda x : float (x / 1000 )),
49
+ (ATTR_CURRENT , 0x0B , 8 , 4 , True , lambda x : float (x / 1000 )),
50
+ (ATTR_TEMPERATURE , 0x09 , 4 , 2 , False , lambda x : x ),
51
+ (ATTR_BATTERY_LEVEL , 0x0A , 4 , 1 , False , lambda x : x ),
52
+ (KEY_DESIGN_CAP , 0x15 , 4 , 2 , False , lambda x : x ),
53
+ (ATTR_CYCLES , 0x15 , 6 , 2 , False , lambda x : x ),
54
+ (ATTR_RUNTIME , 0x0C , 14 , 2 , False , lambda x : float (x * _HRS_TO_SECS / 100 )),
55
+ ]
56
+ _CMDS : Final [list [int ]] = list ({field [1 ] for field in _FIELDS })
45
57
46
58
def __init__ (self , ble_device : BLEDevice , reconnect : bool = False ) -> None :
47
59
"""Intialize private BMS members."""
48
60
super ().__init__ (LOGGER , self ._notification_handler , ble_device , reconnect )
49
61
self ._data : bytearray = bytearray ()
50
- self ._FIELDS : Final [
51
- list [tuple [str , int , int , int , bool , Callable [[int ], int | float ]]]
52
- ] = [
53
- (ATTR_VOLTAGE , 0x0B , 4 , 4 , False , lambda x : float (x / 1000 )),
54
- (ATTR_CURRENT , 0x0B , 8 , 4 , True , lambda x : float (x / 1000 )),
55
- (ATTR_TEMPERATURE , 0x09 , 4 , 2 , False , lambda x : x ),
56
- (ATTR_BATTERY_LEVEL , 0x0A , 4 , 1 , False , lambda x : x ),
57
- (KEY_DESIGN_CAP , 0x15 , 4 , 2 , False , lambda x : x ),
58
- (ATTR_CYCLES , 0x15 , 6 , 2 , False , lambda x : x ),
59
- (ATTR_RUNTIME , 0x0C , 14 , 2 , False , lambda x : float (x * _HRS_TO_SECS / 100 )),
60
- ]
61
- self ._CMDS : Final [list [int ]] = list ({field [1 ] for field in self ._FIELDS })
62
62
63
63
@staticmethod
64
64
def matcher_dict_list () -> list [dict [str , Any ]]:
@@ -102,54 +102,48 @@ def _calc_values() -> set[str]:
102
102
103
103
def _notification_handler (self , _sender , data : bytearray ) -> None :
104
104
"""Retrieve BMS data update."""
105
-
106
- LOGGER .debug ("(%s) Rx BLE data: %s" , self ._ble_device .name , data )
105
+ LOGGER .debug ("%s: Received BLE data: %s" , self .name , data )
107
106
108
107
# verify that data long enough
109
- if (
110
- len (data ) < self .MIN_FRAME
111
- or len (data ) != self .MIN_FRAME + data [self .LEN_POS ]
112
- ):
108
+ if len (data ) < BMS .MIN_FRAME or len (data ) != BMS .MIN_FRAME + data [BMS .LEN_POS ]:
113
109
LOGGER .debug (
114
- "(%s) incorrect frame length (%i): %s" , self .name , len (data ), data
110
+ "%s: incorrect frame length (%i): %s" , self .name , len (data ), data
115
111
)
116
112
return
117
113
118
- if not data .startswith (self .HEAD ) or not data .endswith (self .TAIL_RX ):
119
- LOGGER .debug ("(%s) Incorrect frame start/end: %s" , self .name , data )
114
+ if not data .startswith (BMS .HEAD ) or not data .endswith (BMS .TAIL_RX ):
115
+ LOGGER .debug ("%s: incorrect frame start/end: %s" , self .name , data )
120
116
return
121
117
122
- crc = self . _crc (data [len (self .HEAD ) : len (data ) + self .CRC_POS ])
123
- if data [self .CRC_POS ] != crc :
118
+ crc = crc_sum (data [len (BMS .HEAD ) : len (data ) + BMS .CRC_POS ])
119
+ if data [BMS .CRC_POS ] != crc :
124
120
LOGGER .debug (
125
- "(%s) Rx data CRC is invalid: 0x%x != 0x%x " ,
121
+ "%s: RX data CRC is invalid: 0x%X != 0x%X " ,
126
122
self .name ,
127
- data [len (data ) + self .CRC_POS ],
123
+ data [len (data ) + BMS .CRC_POS ],
128
124
crc ,
129
125
)
130
126
return
131
127
132
128
self ._data = data
133
129
self ._data_event .set ()
134
130
135
- def _crc (self , frame : bytes ) -> int :
136
- """Calculate CBT Power frame CRC."""
137
- return sum (frame ) & 0xFF
138
-
139
- def _gen_frame (self , cmd : bytes , value : list [int ] | None = None ) -> bytes :
131
+ @staticmethod
132
+ def _gen_frame (cmd : bytes , value : list [int ] | None = None ) -> bytes :
140
133
"""Assemble a CBT Power BMS command."""
141
134
value = [] if value is None else value
142
135
assert len (value ) <= 255
143
- frame = bytes ([* self .HEAD , cmd [0 ]])
136
+ frame = bytes ([* BMS .HEAD , cmd [0 ]])
144
137
frame += bytes ([len (value ), * value ])
145
- frame += bytes ([self . _crc (frame [len (self .HEAD ) :])])
146
- frame += bytes ([* self .TAIL_TX ])
138
+ frame += bytes ([crc_sum (frame [len (BMS .HEAD ) :])])
139
+ frame += bytes ([* BMS .TAIL_TX ])
147
140
return frame
148
141
149
- def _cell_voltages (self , data : bytearray ) -> dict [str , float ]:
142
+ @staticmethod
143
+ def _cell_voltages (data : bytearray ) -> dict [str , float ]:
150
144
"""Return cell voltages from status message."""
151
145
return {
152
- f"{ KEY_CELL_VOLTAGE } { idx + (data [self .CMD_POS ]- 5 )* 5 } " : int .from_bytes (
146
+ f"{ KEY_CELL_VOLTAGE } { idx + (data [BMS .CMD_POS ]- 5 )* 5 } " : int .from_bytes (
153
147
data [4 + 2 * idx : 6 + 2 * idx ],
154
148
byteorder = "little" ,
155
149
signed = True ,
@@ -162,25 +156,25 @@ async def _async_update(self) -> BMSsample:
162
156
"""Update battery status information."""
163
157
data = {}
164
158
resp_cache = {} # variable to avoid multiple queries with same command
165
- for cmd in self ._CMDS :
166
- LOGGER .debug ("(%s) request command 0x%X." , self .name , cmd )
159
+ for cmd in BMS ._CMDS :
160
+ LOGGER .debug ("%s: request command 0x%X." , self .name , cmd )
167
161
await self ._client .write_gatt_char (
168
- BMS .uuid_tx (), data = self ._gen_frame (cmd .to_bytes (1 ))
162
+ BMS .uuid_tx (), data = BMS ._gen_frame (cmd .to_bytes (1 ))
169
163
)
170
164
try :
171
165
await asyncio .wait_for (self ._wait_event (), timeout = BAT_TIMEOUT )
172
166
except TimeoutError :
173
167
continue
174
- if cmd != self ._data [self .CMD_POS ]:
168
+ if cmd != self ._data [BMS .CMD_POS ]:
175
169
LOGGER .debug (
176
- "(%s): incorrect response 0x%x to command 0x%x " ,
170
+ "%s:: incorrect response 0x%X to command 0x%X " ,
177
171
self .name ,
178
- self ._data [self .CMD_POS ],
172
+ self ._data [BMS .CMD_POS ],
179
173
cmd ,
180
174
)
181
- resp_cache [self ._data [self .CMD_POS ]] = self ._data .copy ()
175
+ resp_cache [self ._data [BMS .CMD_POS ]] = self ._data .copy ()
182
176
183
- for field , cmd , pos , size , sign , fct in self ._FIELDS :
177
+ for field , cmd , pos , size , sign , fct in BMS ._FIELDS :
184
178
if resp_cache .get (cmd ):
185
179
data |= {
186
180
field : fct (
@@ -191,15 +185,15 @@ async def _async_update(self) -> BMSsample:
191
185
}
192
186
193
187
voltages = {}
194
- for cmd in self .CELL_VOLTAGE_CMDS :
188
+ for cmd in BMS .CELL_VOLTAGE_CMDS :
195
189
await self ._client .write_gatt_char (
196
- BMS .uuid_tx (), data = self ._gen_frame (cmd .to_bytes (1 ))
190
+ BMS .uuid_tx (), data = BMS ._gen_frame (cmd .to_bytes (1 ))
197
191
)
198
192
try :
199
193
await asyncio .wait_for (self ._wait_event (), timeout = BAT_TIMEOUT )
200
194
except TimeoutError :
201
195
break
202
- voltages |= self ._cell_voltages (self ._data )
196
+ voltages |= BMS ._cell_voltages (self ._data )
203
197
if invalid := [k for k , v in voltages .items () if v == 0 ]:
204
198
for k in invalid :
205
199
voltages .pop (k )
0 commit comments