-
Notifications
You must be signed in to change notification settings - Fork 55
/
Copy pathVESC.py
170 lines (138 loc) · 5.89 KB
/
VESC.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
from pyvesc.protocol.interface import encode_request, encode, decode
from pyvesc.VESC.messages import *
import time
import threading
# because people may want to use this library for their own messaging, do not make this a required package
try:
import serial
except ImportError:
serial = None
class VESC(object):
def __init__(self, serial_port, has_sensor=False, start_heartbeat=True, baudrate=115200, timeout=0.05):
"""
:param serial_port: Serial device to use for communication (i.e. "COM3" or "/dev/tty.usbmodem0")
:param has_sensor: Whether or not the bldc motor is using a hall effect sensor
:param start_heartbeat: Whether or not to automatically start the heartbeat thread that will keep commands
alive.
:param baudrate: baudrate for the serial communication. Shouldn't need to change this.
:param timeout: timeout for the serial communication
"""
if serial is None:
raise ImportError("Need to install pyserial in order to use the VESCMotor class.")
self.serial_port = serial.Serial(port=serial_port, baudrate=baudrate, timeout=timeout)
if has_sensor:
self.serial_port.write(encode(SetRotorPositionMode(SetRotorPositionMode.DISP_POS_OFF)))
self.alive_msg = [encode(Alive())]
self.heart_beat_thread = threading.Thread(target=self._heartbeat_cmd_func)
self._stop_heartbeat = threading.Event()
if start_heartbeat:
self.start_heartbeat()
# check firmware version and set GetValue fields to old values if pre version 3.xx
version = self.get_firmware_version()
if int(version.split('.')[0]) < 3:
GetValues.fields = pre_v3_33_fields
# store message info for getting values so it doesn't need to calculate it every time
msg = GetValues()
self._get_values_msg = encode_request(msg)
self._get_values_msg_expected_length = msg._full_msg_size
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop_heartbeat()
if self.serial_port.is_open:
self.serial_port.flush()
self.serial_port.close()
def _heartbeat_cmd_func(self):
"""
Continuous function calling that keeps the motor alive
"""
while not self._stop_heartbeat.is_set():
time.sleep(0.1)
for i in self.alive_msg:
self.write(i)
def start_heartbeat(self, can_id=None):
"""
Starts a repetitive calling of the last set cmd to keep the motor alive.
Args:
can_id: Optional, used to specify the CAN ID to add to the existing heartbeat messaged
"""
if can_id is not None:
self.alive_msg.append(encode(Alive(can_id=can_id)))
else:
self.heart_beat_thread.start()
def stop_heartbeat(self):
"""
Stops the heartbeat thread and resets the last cmd function. THIS MUST BE CALLED BEFORE THE OBJECT GOES OUT OF
SCOPE UNLESS WRAPPING IN A WITH STATEMENT (Assuming the heartbeat was started).
"""
self._stop_heartbeat.set()
if self.heart_beat_thread.is_alive():
self.heart_beat_thread.join()
def write(self, data, num_read_bytes=None):
"""
A write wrapper function implemented like this to try and make it easier to incorporate other communication
methods than UART in the future.
:param data: the byte string to be sent
:param num_read_bytes: number of bytes to read for decoding response
:return: decoded response from buffer
"""
self.serial_port.write(data)
if num_read_bytes is not None:
while self.serial_port.in_waiting <= num_read_bytes:
time.sleep(0.000001) # add some delay just to help the CPU
response, consumed = decode(self.serial_port.read(self.serial_port.in_waiting))
return response
def set_rpm(self, new_rpm, **kwargs):
"""
Set the electronic RPM value (a.k.a. the RPM value of the stator)
:param new_rpm: new rpm value
"""
self.write(encode(SetRPM(new_rpm, **kwargs)))
def set_current(self, new_current, **kwargs):
"""
:param new_current: new current in milli-amps for the motor
"""
self.write(encode(SetCurrent(new_current, **kwargs)))
def set_duty_cycle(self, new_duty_cycle, **kwargs):
"""
:param new_duty_cycle: Value of duty cycle to be set (range [-1e5, 1e5]).
"""
self.write(encode(SetDutyCycle(new_duty_cycle, **kwargs)))
def set_servo(self, new_servo_pos, **kwargs):
"""
:param new_servo_pos: New servo position. valid range [0, 1]
"""
self.write(encode(SetServoPosition(new_servo_pos, **kwargs)))
def get_measurements(self):
"""
:return: A msg object with attributes containing the measurement values
"""
return self.write(self._get_values_msg, num_read_bytes=self._get_values_msg_expected_length)
def get_firmware_version(self):
msg = GetVersion()
return str(self.write(encode_request(msg), num_read_bytes=msg._full_msg_size))
def get_rpm(self):
"""
:return: Current motor rpm
"""
return self.get_measurements().rpm
def get_duty_cycle(self):
"""
:return: Current applied duty-cycle
"""
return self.get_measurements().duty_now
def get_v_in(self):
"""
:return: Current input voltage
"""
return self.get_measurements().v_in
def get_motor_current(self):
"""
:return: Current motor current
"""
return self.get_measurements().current_motor
def get_incoming_current(self):
"""
:return: Current incoming current
"""
return self.get_measurements().current_in