-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprint_battery_service.py
executable file
·240 lines (192 loc) · 9.98 KB
/
print_battery_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
#!/usr/bin/env python3
#
# A basic PyUAVCAN demo. This file is included in the user documentation, please keep it tidy.
#
# Distributed under CC0 1.0 Universal (CC0 1.0) Public Domain Dedication. To the extent possible under law, the
# UAVCAN Development Team has waived all copyright and related or neighboring rights to this work.
#
import os
import sys
import typing
import pathlib
import asyncio
import tempfile
import importlib
import pyuavcan
import inspect
import yaml
# Explicitly import transports and media sub-layers that we may need here.
import pyuavcan.transport.can
import pyuavcan.transport.can.media.socketcan
import pyuavcan.transport.redundant
# We will need a directory to store the generated Python packages in.
#
# It is perfectly acceptable to just use a random temp directory at every run, but the disadvantage of that approach
# is that the packages will be re-generated from scratch every time the program is started, which may be undesirable.
#
# So in this example we select a fixed temp dir name (make sure it's unique enough) and shard its contents by the
# library version. The sharding helps us ensure that we won't attempt to use a package generated for an older library
# version with a newer one, as they may be incompatible.
#
# Another sensible location for the generated package directory is somewhere in the application data directory,
# like "~/.my-app/dsdl/{pyuavcan.__version__}/"; or, for Windows: "%APPDATA%/my-app/dsdl/{pyuavcan.__version__}/".
dsdl_generated_dir = pathlib.Path(tempfile.gettempdir(), 'dsdl-for-my-program', f'pyuavcan-v{pyuavcan.__version__}')
dsdl_generated_dir.mkdir(parents=True, exist_ok=True)
print('Generated DSDL packages will be stored in:', dsdl_generated_dir, file=sys.stderr)
# We will need to import the packages once they are generated, so we should update the module import look-up path set.
# If you're using an IDE for development, add this path to its look-up set as well for code completion to work.
sys.path.insert(0, str(dsdl_generated_dir))
# Now we can import our packages. If import fails, invoke the code generator, then import again.
try:
import reg
import pyuavcan.application # The application module requires the standard types from the root namespace "uavcan".
except (ImportError, AttributeError):
script_path = os.path.abspath(os.path.dirname(__file__))
# Generate the standard namespace. The order actually doesn't matter.
pyuavcan.dsdl.generate_package(
root_namespace_directory=os.path.join(script_path, './public_regulated_data_types/uavcan'),
output_directory=dsdl_generated_dir,
)
pyuavcan.dsdl.generate_package(
root_namespace_directory=os.path.join(script_path, './public_regulated_data_types/reg/'),
lookup_directories=[os.path.join(script_path, './public_regulated_data_types/uavcan')],
output_directory=dsdl_generated_dir,
)
# Okay, we can try importing again. We need to clear the import cache first because Python's import machinery
# requires that; see the docs for importlib.invalidate_caches() for more info.
importlib.invalidate_caches()
import reg
import pyuavcan.application
# Import other namespaces we're planning to use. Nested namespaces are not auto-imported, so in order to reach,
# say, "uavcan.node.Heartbeat", you have to do "import uavcan.node".
import uavcan.node # noqa E402
import uavcan.diagnostic # noqa E402
import uavcan.primitive
import reg.drone.physics.electricity
import reg.drone.service.battery
import pyuavcan.dsdl as _dsdl_
print('Start')
print(chr(27)+'[2j')
print('\033c')
print('\x1bc')
class PrintBMSStatusApplication:
def __init__(self):
# Make sure to initialize the CAN interface.
media = pyuavcan.transport.can.media.socketcan.SocketCANMedia('slcan0', mtu=8)
transport = pyuavcan.transport.can.CANTransport(media, local_node_id=42)
assert transport.local_node_id == 42 # Yup, the node-ID is configured.
# Populate the node info for use with the Node class. Please see the DSDL definition of uavcan.node.GetInfo.
node_info = uavcan.node.GetInfo_1_0.Response(
# Version of the protocol supported by the library, and hence by our node.
protocol_version=uavcan.node.Version_1_0(*pyuavcan.UAVCAN_SPECIFICATION_VERSION),
# There is a similar field for hardware version, but we don't populate it because it's a software-only node.
software_version=uavcan.node.Version_1_0(major=1, minor=0),
# The name of the local node. Should be a reversed Internet domain name, like a Java package.
name='org.uavcan.pyuavcan.demo.basic_usage',
# We've left the optional fields default-initialized here.
)
# The transport layer is ready; next layer up the protocol stack is the presentation layer. Construct it here.
presentation = pyuavcan.presentation.Presentation(transport)
# The application layer is next -- construct the node instance. It will serve GetInfo requests and publish its
# heartbeat automatically (unless it's anonymous). Read the source code of the Node class for more details.
self._node = pyuavcan.application.Node(presentation, node_info)
# A message subscription.
self._sub_energy_source = self._node.presentation.make_subscriber(reg.drone.physics.electricity.SourceTs_0_1, 4096)
self._sub_energy_source.receive_in_background(self._handle_msg)
self._sub_bat_status = self._node.presentation.make_subscriber(reg.drone.service.battery.Status_0_2, 4097)
self._sub_bat_status.receive_in_background(self._handle_msg)
self._sub_bat_parameters = self._node.presentation.make_subscriber(reg.drone.service.battery.Parameters_0_2, 4098)
self._sub_bat_parameters.receive_in_background(self._handle_msg)
# When all is initialized, don't forget to start the node!
self._node.start()
async def _handle_msg(self,
msg: _dsdl_.CompositeObject,
metadata: pyuavcan.transport.TransferFrom) -> None:
"""
A subscription message handler. This is also an async function, so we can block inside if necessary.
The received message object is passed in along with the information about the transfer that delivered it.
"""
# check if it is the energy source
if isinstance(msg, reg.drone.physics.electricity.SourceTs_0_1):
# Clear energy source part
# print(chr(27)+'[2j')
# print('\033c')
# print('\x1bc')
# move the cursor to the upper left corner (beginning of the energy state message)
print(chr(27)+'[0;0H')
#print(chr(27)+'[s')
# # remove each line of the message plus 2 extra lines
# for _ in range(6):
# # remove the line
# print(chr(27)+'[K')
# # go the next line
# print(chr(27)+'[1B')
# go back to the top
#print(chr(27)+'[1;0H')
# check if it is the battery status
if isinstance(msg, reg.drone.service.battery.Status_0_2):
# Clear battery status part
# move the cursor to the beginning of the battery status message
print(chr(27)+'[9;0H')
#print(chr(27)+'[s')
# remove each line of the message plus 2 extra lines
#for _ in range(12):
# # remove the line
# print(chr(27)+'[K')
# # go the next line
# print(chr(27)+'[1B')
# go back to the top
#print(chr(27)+'[8;0H')
# check if it is the battery parameter message
if isinstance(msg, reg.drone.service.battery.Parameters_0_2):
# Clear battery parameter part
# move the cursor to the beginning of the battery status message
print(chr(27)+'[24;0H')
#print(chr(27)+'[s')
#print(chr(27)+'[21;0H')
# # remove each line of the message plus 2 extra lines
# for _ in range(12):
# # remove the line
# print(chr(27)+'[K')
# # go the next line
# print(chr(27)+'[1B')
# go back to the top
#print(chr(27)+'[9;0H')
# get the cursor position
#print(chr(27)+'[u')
# remove the line
print(chr(27)+'[K')
msg_string = msg.__repr__()
# msg_string = msg_string.replace('(',' \n', 1)
msg_string = msg_string.replace('(',' \n'+chr(27)+'[K', 1)
msg_string = msg_string.replace(' ','')
msg_string = msg_string.replace('=',': ')
msg_string = msg_string.replace(',','\n'+chr(27)+'[K')
print(msg_string)
# remove the next 2 lines
print(chr(27)+'[K')
# if isinstance(msg, reg.drone.service.battery.Parameters_0_2):
# print("\r\r\r\r\r\r")
# print("test")
# print("\r")
#print(chr(27)+'[0;0H')
#print(chr(27)+'[6A')
if __name__ == '__main__':
app = PrintBMSStatusApplication()
app_tasks = asyncio.Task.all_tasks()
async def list_tasks_periodically() -> None:
"""Print active tasks periodically for demo purposes."""
import re
def repr_task(t: asyncio.Task) -> str:
try:
out, = re.findall(r'^<([^<]+<[^>]+>)', str(t))
except ValueError:
out = str(t)
return out
while True:
#print('\nActive tasks:\n' + '\n'.join(map(repr_task, asyncio.Task.all_tasks())), file=sys.stderr)
await asyncio.sleep(10)
asyncio.get_event_loop().create_task(list_tasks_periodically())
# The node and PyUAVCAN objects have created internal tasks, which we need to run now.
# In this case we want to automatically stop and exit when no tasks are left to run.
asyncio.get_event_loop().run_until_complete(asyncio.gather(*app_tasks))