Skip to content

Commit

Permalink
generalize the tests
Browse files Browse the repository at this point in the history
  • Loading branch information
umbynos committed Jul 30, 2021
1 parent b41d0cc commit fcaa53c
Showing 1 changed file with 46 additions and 81 deletions.
127 changes: 46 additions & 81 deletions test/test_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,115 +26,80 @@ def test_list(socketio):
reason="VMs have no serial ports",
)
def test_open_serial_default(socketio):
time.sleep(.2)
global message
message = []
socketio.on('message', message_handler)
socketio.emit('command', 'open /dev/ttyACM0 9600')
time.sleep(.5) # give time to message to be filled
assert any("\"IsOpen\": true" in i for i in message)
socketio.emit('command', 'send /dev/ttyACM0 /"ciao/"')
time.sleep(.2)
assert any("send /dev/ttyACM0 /\"ciao/\"" in i for i in message)
assert "ciao" in extract_serial_data(message)
general_test_serial(socketio, "default")

# test with a lot of emoji: they can be messed up
# message = [] # reinitialize the message buffer
socketio.emit('command', 'send /dev/ttyACM0 /"πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€/"')
time.sleep(.2)
print (message)
assert any("send /dev/ttyACM0 /\"πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€/\"" in i for i in message)
emoji_output = extract_serial_data(message)
assert "/\"πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€/\"" in emoji_output # this could be failing because of UTF8 encoding problems
message = []
socketio.emit('command', 'close /dev/ttyACM0')
time.sleep(.2)
assert any("\"IsOpen\": false," in i for i in message)

@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_open_serial_timed(socketio):
time.sleep(.2)
global message
message = []
socketio.on('message', message_handler)
socketio.emit('command', 'open /dev/ttyACM0 9600 timed')
time.sleep(.5) # give time to message to be filled
print(message)
assert any("\"IsOpen\": true" in i for i in message)
socketio.emit('command', 'send /dev/ttyACM0 /"ciao/"')
time.sleep(.2)
assert any("send /dev/ttyACM0 /\"ciao/\"" in i for i in message)
assert "ciao" in extract_serial_data(message)
general_test_serial(socketio, "timed")

# test with a lot of emoji: usually they get messed up
message = [] # reinitialize the message buffer
socketio.emit('command', 'send /dev/ttyACM0 /"πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€/"')
time.sleep(.2)
assert any("send /dev/ttyACM0 /\"πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€/\"" in i for i in message)
assert "/\"πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€/\"" in extract_serial_data(message)
message = []
socketio.emit('command', 'close /dev/ttyACM0')
time.sleep(.2)
# print (message)
assert any("\"IsOpen\": false," in i for i in message)

@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_open_serial_timedraw(socketio):
global message
message = []
socketio.on('message', message_handler)
socketio.emit('command', 'open /dev/ttyACM0 9600 timedraw')
time.sleep(.5) # give time to message to be filled
assert any("\"IsOpen\": true" in i for i in message)
socketio.emit('command', 'send /dev/ttyACM0 /"ciao/"')
time.sleep(.2)
assert any("send /dev/ttyACM0 /\"ciao/\"" in i for i in message)
assert "ciao" in decode_output(extract_serial_data(message))
general_test_serial(socketio, "timedraw")

# test with a lot of emoji: usually they get messed up
message = [] # reinitialize the message buffer
socketio.emit('command', 'send /dev/ttyACM0 /"πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€/"')
time.sleep(.2)
assert any("send /dev/ttyACM0 /\"πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€/\"" in i for i in message)
# print (message)
assert "/\"πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€/\"" in decode_output(extract_serial_data(message))
socketio.emit('command', 'close /dev/ttyACM0')
time.sleep(.2)
# print (message)
assert any("\"IsOpen\": false," in i for i in message)

@pytest.mark.skipif(
running_on_ci(),
reason="VMs have no serial ports",
)
def test_open_serial_timedbinary(socketio):
general_test_serial(socketio, "timedbinary")


def general_test_serial(socketio, buffertype):
port = "/dev/ttyACM0"
global message
message = []
#in message var we will find the "response"
socketio.on('message', message_handler)
socketio.emit('command', 'open /dev/ttyACM0 9600 timedbinary')
time.sleep(.5) # give time to message to be filled
#open a new serial connection with the specified buffertype, if buffertype s empty it will use the default one
socketio.emit('command', 'open ' + port + ' 9600 ' + buffertype)
# give time to the message var to be filled
time.sleep(.5)
print(message)
# the serial connection should be open now
assert any("\"IsOpen\": true" in i for i in message)
socketio.emit('command', 'send /dev/ttyACM0 /"ciao/"')
time.sleep(.2)
assert any("send /dev/ttyACM0 /\"ciao/\"" in i for i in message)
print (message)
assert "ciao" in decode_output(extract_serial_data(message))

# test with a lot of emoji: usually they get messed up
message = [] # reinitialize the message buffer
socketio.emit('command', 'send /dev/ttyACM0 /"πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€/"')
#test with string
# send the string "ciao" using the serial connection
socketio.emit('command', 'send ' + port + ' /"ciao/"')
time.sleep(1)
print(message)
# check if the send command has been registered
assert any("send " + port + " /\"ciao/\"" in i for i in message)
#check if message has been sent back by the connected board
if buffertype in ("timedbinary", "timedraw"):
output = decode_output(extract_serial_data(message))
elif buffertype in ("default", "timed"):
output = extract_serial_data(message)
assert "ciao" in output

#test with emoji
message = [] # reinitialize the message buffer to have a clean situation
# send a lot of emoji: they can be messed up
socketio.emit('command', 'send ' + port + ' /"πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€/"')
time.sleep(.2)
assert any("send /dev/ttyACM0 /\"πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€/\"" in i for i in message)
assert "/\"πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€/\"" in decode_output(extract_serial_data(message))
socketio.emit('command', 'close /dev/ttyACM0')
print(message)
# check if the send command has been registered
assert any("send " + port + " /\"πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€/\"" in i for i in message)
if buffertype in ("timedbinary", "timedraw"):
output = decode_output(extract_serial_data(message))
elif buffertype in ("default", "timed"):
output = extract_serial_data(message)
assert "/\"πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€πŸ§€/\"" in output

#finally close the serial port
socketio.emit('command', 'close ' + port)
time.sleep(.2)
# print (message)
print (message)
#check if port has been closed
assert any("\"IsOpen\": false," in i for i in message)


Expand Down

0 comments on commit fcaa53c

Please sign in to comment.