34
34
class BMS (BaseBMS ):
35
35
"""CBT Power Smart BMS class implementation."""
36
36
37
- HEAD : Final = bytes ([0xAA , 0x55 ])
38
- TAIL_RX : Final = bytes ([0x0D , 0x0A ])
39
- TAIL_TX : Final = bytes ([0x0A , 0x0D ])
40
- MIN_FRAME : Final = len (HEAD ) + len (TAIL_RX ) + 3 # CMD, LEN, CRC each 1 Byte
41
- CRC_POS : Final = - len (TAIL_RX ) - 1
42
- LEN_POS : Final = 3
43
- CMD_POS : Final = 2
37
+ HEAD : Final [ bytes ] = bytes ([0xAA , 0x55 ])
38
+ TAIL_RX : Final [ bytes ] = bytes ([0x0D , 0x0A ])
39
+ TAIL_TX : Final [ bytes ] = bytes ([0x0A , 0x0D ])
40
+ MIN_FRAME : Final [ int ] = len (HEAD ) + len (TAIL_RX ) + 3 # CMD, LEN, CRC each 1 Byte
41
+ CRC_POS : Final [ int ] = - len (TAIL_RX ) - 1
42
+ LEN_POS : Final [ int ] = 3
43
+ CMD_POS : Final [ int ] = 2
44
44
CELL_VOLTAGE_CMDS : Final [list [int ]] = [0x5 , 0x6 , 0x7 , 0x8 ]
45
45
46
46
def __init__ (self , ble_device : BLEDevice , reconnect : bool = False ) -> None :
47
47
"""Intialize private BMS members."""
48
48
super ().__init__ (LOGGER , self ._notification_handler , ble_device , reconnect )
49
49
self ._data : bytearray = bytearray ()
50
- self ._data_event = asyncio .Event ()
51
50
self ._FIELDS : Final [
52
51
list [tuple [str , int , int , int , bool , Callable [[int ], int | float ]]]
53
52
] = [
@@ -59,6 +58,7 @@ def __init__(self, ble_device: BLEDevice, reconnect: bool = False) -> None:
59
58
(ATTR_CYCLES , 0x15 , 6 , 2 , False , lambda x : x ),
60
59
(ATTR_RUNTIME , 0x0C , 14 , 2 , False , lambda x : float (x * _HRS_TO_SECS / 100 )),
61
60
]
61
+ self ._CMDS : Final [list [int ]] = list (set (field [1 ] for field in self ._FIELDS ))
62
62
63
63
@staticmethod
64
64
def matcher_dict_list () -> list [dict [str , Any ]]:
@@ -162,33 +162,33 @@ async def _async_update(self) -> BMSsample:
162
162
"""Update battery status information."""
163
163
data = {}
164
164
resp_cache = {} # variable to avoid multiple queries with same command
165
- for field , cmd , pos , size , sign , fct in self ._FIELDS :
166
- LOGGER .debug ("(%s) request %s info" , self .name , field )
167
- if resp_cache .get (cmd ) is None :
168
- await self ._client .write_gatt_char (
169
- BMS .uuid_tx (), data = self ._gen_frame (cmd .to_bytes (1 ))
165
+ for cmd in self ._CMDS :
166
+ LOGGER .debug ("(%s) request command 0x%X." , self .name , cmd )
167
+ await self ._client .write_gatt_char (
168
+ BMS .uuid_tx (), data = self ._gen_frame (cmd .to_bytes (1 ))
169
+ )
170
+ try :
171
+ await asyncio .wait_for (self ._wait_event (), timeout = BAT_TIMEOUT )
172
+ except TimeoutError :
173
+ continue
174
+ if cmd != self ._data [self .CMD_POS ]:
175
+ LOGGER .debug (
176
+ "(%s): incorrect response 0x%x to command 0x%x" ,
177
+ self .name ,
178
+ self ._data [self .CMD_POS ],
179
+ cmd ,
170
180
)
171
- try :
172
- await asyncio .wait_for (self ._wait_event (), timeout = BAT_TIMEOUT )
173
- except TimeoutError :
174
- continue
175
- if cmd != self ._data [self .CMD_POS ]:
176
- LOGGER .debug (
177
- "(%s): incorrect response 0x%x to command 0x%x" ,
178
- self .name ,
179
- self ._data [self .CMD_POS ],
180
- cmd ,
181
- )
182
- continue
183
- resp_cache [cmd ] = self ._data .copy ()
184
- LOGGER .debug ("(%s) %s" , self .name , resp_cache )
185
- data |= {
186
- field : fct (
187
- int .from_bytes (
188
- resp_cache [cmd ][pos : pos + size ], "little" , signed = sign
181
+ resp_cache [self ._data [self .CMD_POS ]] = self ._data .copy ()
182
+
183
+ for field , cmd , pos , size , sign , fct in self ._FIELDS :
184
+ if resp_cache .get (cmd ):
185
+ data |= {
186
+ field : fct (
187
+ int .from_bytes (
188
+ resp_cache [cmd ][pos : pos + size ], "little" , signed = sign
189
+ )
189
190
)
190
- )
191
- }
191
+ }
192
192
193
193
voltages = {}
194
194
for cmd in self .CELL_VOLTAGE_CMDS :
0 commit comments