-
Notifications
You must be signed in to change notification settings - Fork 0
/
uart.py
140 lines (115 loc) · 3.64 KB
/
uart.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# wykys
# library for basic UART manipulation
import time
import os
import log
try:
import serial
except ImportError:
print ("Trying to Install required module: serial\n")
os.system('py -3.9 -m pip install serial')
# -- above lines try to install requests module if not present
# -- if all went well, import required module again ( for global access)
import serial
try:
from serial.tools import list_ports
except ImportError:
print ("Trying to Install required module: pyserial \n")
os.system('py -3.9 -m pip install pyserial')
# -- above lines try to install requests module if not present
# -- if all went well, import required module again ( for global access)
from serial.tools import list_ports
def singleton(class_):
instances = {}
def getinstance(*args, **kwargs):
if class_ not in instances:
instances[class_] = class_(*args, **kwargs)
return instances[class_]
return getinstance
@singleton
class UART:
def __init__(
self,
name='CP2102',
baudrate=115200,
bytesize=8,
parity='N',
stopbits=1,
timeout=None,
delay=0.01,
port=None
):
self.name = name
self.delay = delay
self.ser = serial.Serial()
self.ser.baudrate = baudrate
self.ser.bytesize = bytesize
self.ser.parity = parity
self.ser.timeout = timeout
if stopbits == 1:
self.ser.stopbits = serial.STOPBITS_ONE
elif stopbits == 2:
self.ser.stopbits = serial.STOPBITS_TWO
if port is None:
self.ser.port = self.find_device()
else:
self.ser.port = port
self.open_connection()
def __del__(self):
self.close_connection()
def cmd_delay(self):
time.sleep(self.delay)
def find_device(self):
for port in list_ports.comports():
if self.name in port.description:
return port.device
log.err(self.name + ' is not connected')
self.list_ports()
exit(1)
def list_ports(self):
log.stdo('List all connected devices:')
for port in list_ports.comports():
print(' ', port.device, '\t', port.description)
def open_connection(self):
try:
self.ser.open()
log.ok('port {} is open'.format(self.ser.port))
except serial.SerialException:
log.err('port {} opening is fail'.format(self.ser.port))
exit(1)
self.ser.reset_input_buffer()
def close_connection(self):
self.ser.close()
log.ok('port is close')
def read_byte(self):
try:
tmp = self.ser.read(1)
except serial.SerialException:
log.err('the device was disconnected')
exit()
if tmp == b'':
print("Serial timeout occur")
return None
return int.from_bytes(tmp, byteorder='little', signed=False)
def read_bytes(self, size: int = 1):
try:
tmp = self.ser.read(size)
except serial.SerialException:
log.err('the device was disconnected')
exit()
if tmp == b'':
return None
return tmp
def send_byte(self, byte):
try:
self.ser.write(bytes((byte,)))
except serial.SerialException:
log.err('the device was disconnected')
os.system('killall ser-term')
time.sleep(0.003)
def send_cmd(self, cmd):
if type(cmd) == str:
for c in cmd:
self.send_byte(ord(c))
self.cmd_delay()
self.send_byte(ord('\n'))