Skip to content

Commit

Permalink
feat: Added test, fix: Bug causing crash on drop LocoDriveController …
Browse files Browse the repository at this point in the history
…in async context
  • Loading branch information
juhu1705 committed Sep 2, 2022
1 parent a598763 commit c343fa0
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 16 deletions.
9 changes: 5 additions & 4 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ pub enum SwitchDirection {
pub struct SwitchArg {
/// The address of the switch (0 - 2047)
address: u16,
/// The switches direction state
/// The switches direction state, which direction the switch points
direction: SwitchDirection,
/// The activation state of the switch (`true = ON, false = OFF`)
/// If the switch is not in the requested direction.
/// Use true if you want the switch to go to the direction.
state: bool,
}

Expand All @@ -85,7 +86,7 @@ impl SwitchArg {
///
/// - `address`: The address of the switch you want to change state (0 to 2047)
/// - `direction`: The direction the switch should switch to
/// - `state`: The activation state of the switch (`true = ON`, `false = OFF`)
/// - `state`: The activation state of the switch (If the switch is in the requested state)
pub fn new(address: u16, direction: SwitchDirection, state: bool) -> Self {
Self {
address,
Expand Down Expand Up @@ -134,7 +135,7 @@ impl SwitchArg {
}
/// # Returns
///
/// The switches activation status (`true = ON, false = OFF`)
/// The switches activation status. False if the switch has switched to the requested state.
pub fn state(&self) -> bool {
self.state
}
Expand Down
35 changes: 25 additions & 10 deletions src/loco_controller.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::fmt::Debug;
use crate::error::{LocoDriveSendingError, MessageParseError};
use crate::protocol::Message;
use std::sync::{Arc, Condvar, Mutex};
use std::sync::{Arc, Mutex};
use std::thread::{spawn};
use tokio::time::{sleep, Duration};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::runtime::Runtime;
use tokio::sync::broadcast::Sender;
use tokio::task::JoinHandle;
use tokio::sync::Notify;
Expand Down Expand Up @@ -236,13 +238,25 @@ impl LocoDriveController {
///
/// This function panics if the reading thread has panicked or the reading thread was killed,
/// by some external source.
async fn stop_reader(&mut self) {
fn stop_reader(&mut self) {
if let Some(reader) = self.reading_thread.take() {
// Note the thread to end reading
*self.stop.lock().unwrap() = true;
self.fire_stop.notify_waiters();
// Wait until the thread is stopped
reader.await.unwrap();
match spawn(move || {
let runtime = match Runtime::new() {
Ok(runtime) => runtime,
Err(_) => { return; }
};
match runtime.block_on(reader) {
Ok(_) => "",
Err(_) => "",
};
}).join() {
Ok(_) => "",
Err(_) => "",
};

// We allow new threads to spawn and read from the port
*self.stop.lock().unwrap() = false;
Expand Down Expand Up @@ -370,6 +384,8 @@ impl LocoDriveController {
// We read the next message from the serial port
let parsed = LocoDriveController::read_next_message(port, send, stopping, ignore_send_messages).await;

println!("Read: {:?}", parsed);

// We check which type the message we received is
match parsed {
// We can at this level ignore update messages
Expand Down Expand Up @@ -550,10 +566,13 @@ impl LocoDriveController {
_ = notify.notified() => false,
_ = sleep(Duration::from_millis(self.sending_timeout)) => true,
} {
return Err(LocoDriveSendingError::Timeout)
Err(LocoDriveSendingError::Timeout)
} else {
Ok(())
}
} else {
Ok(())
}
Ok(())
}
Err(_) => Err(LocoDriveSendingError::NotWritable),
}
Expand All @@ -570,10 +589,6 @@ impl Drop for LocoDriveController {
///
/// The drop panics if the reading thread has panicked.
fn drop(&mut self) {
let runtime = match tokio::runtime::Runtime::new() {
Ok(runtime) => runtime,
Err(_) => { return; }
};
runtime.block_on(self.stop_reader());
self.stop_reader()
}
}
141 changes: 139 additions & 2 deletions src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
/// Tests all testable core functions of this module
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::process::exit;
use std::time::Duration;
use tokio::time::sleep;
use tokio_serial::FlowControl;
use crate::args::{Ack1Arg, AddressArg, Consist, CvDataArg, DecoderType, DirfArg, DstArg, FastClock, FunctionArg, FunctionGroup, IdArg, ImAddress, ImArg, ImFunctionType, InArg, LissyIrReport, LopcArg, MultiSenseArg, Pcmd, ProgrammingAbortedArg, PStat, PxctData, RepStructure, RFID5Report, RFID7Report, SensorLevel, SlotArg, SnArg, SndArg, SourceType, SpeedArg, Stat1Arg, Stat2Arg, State, SwitchArg, SwitchDirection, TrkArg, WheelcntReport, WrSlDataStructure};
use crate::loco_controller::{LocoDriveController, LocoDriveMessage};
use crate::protocol::Message;
use crate::protocol::Message::{GpOn, LocoSpd};

/// Tests if the message parsing is reliable
#[test]
fn messages() {
// Test 2 byte messages
test_one_message(Message::Idle);
test_one_message(Message::GpOn);
test_one_message(GpOn);
test_one_message(Message::GpOff);
test_one_message(Message::Busy);

Expand All @@ -29,7 +36,7 @@ mod tests {
test_one_message(Message::SwReq(SwitchArg::new(10, SwitchDirection::Curved, false)));
test_one_message(Message::LocoSnd(SlotArg::new(24), SndArg::new(false, true, false, true)));
test_one_message(Message::LocoDirf(SlotArg::new(10), DirfArg::new(false, true, false, true, false, false)));
test_one_message(Message::LocoSpd(SlotArg::new(10), SpeedArg::Drive(122)));
test_one_message(LocoSpd(SlotArg::new(10), SpeedArg::Drive(122)));

// Test 6 bytes messages
test_one_message(Message::MultiSense(MultiSenseArg::new(3, false, 3, 7), AddressArg::new(12)));
Expand Down Expand Up @@ -144,4 +151,134 @@ mod tests {
message
);
}

#[tokio::test]
async fn test_message_sending() {
println!("Start test!");

for port in tokio_serial::available_ports().unwrap() {
println!("Port: {:?}", port)
}

let (sender, mut receiver) = tokio::sync::broadcast::channel(10);

println!("Try to connect to port!");

let mut loco_controller = match LocoDriveController::new(
"/dev/ttyUSB0",
115_200,
5000,
FlowControl::Software,
sender,
false,
).await {
Ok(loco_controller) => loco_controller,
Err(err) => {
eprintln!("Error: Could not connect to the serial port! {:?}", err);
println!();
return;
}
};

loco_controller.send_message(GpOn).await.unwrap();

println!("Setup test train");

let adr = AddressArg::new(5);

let mut slot_adr_map: HashMap<AddressArg, SlotArg> = HashMap::new();

match loco_controller.send_message(Message::LocoAdr(adr)).await {
Ok(()) => {},
Err(err) => {
eprintln!("Message was not send! {:?}", err);
println!();
exit(1)
}
};

loop {
match receiver.recv().await {
Ok(message) =>
match message {
LocoDriveMessage::Message(message) => {
match message {
Message::SlRdData(slot, _, address, ..) => {
slot_adr_map.insert(address, slot);
println!("Added {:?}, {:?} to {:?}", address, slot, slot_adr_map);
break;
},
_ => {}
}
}
LocoDriveMessage::Answer(_, _) => {}
LocoDriveMessage::Error(err) => {
eprintln!("Message could not be read! {:?}", err);
exit(1)
}
LocoDriveMessage::SerialPortError(err) => {
eprintln!("Connection refused! {:?}", err);
exit(1)
}
},
Err(err) => {
println!("WHAT? {:?}", err);
}
}
}

println!("Known Trains: {:?}", slot_adr_map);

for i in 1..11 {
println!("Drive round {}", i);

if i % 2 == 0 {
loco_controller.send_message(Message::SwReq(SwitchArg::new(8, SwitchDirection::Straight, true))).await.unwrap();
loco_controller.send_message(Message::SwReq(SwitchArg::new(9, SwitchDirection::Curved, true))).await.unwrap();
} else {
loco_controller.send_message(Message::SwReq(SwitchArg::new(8, SwitchDirection::Curved, true))).await.unwrap();
loco_controller.send_message(Message::SwReq(SwitchArg::new(9, SwitchDirection::Straight, true))).await.unwrap();
}

loco_controller.send_message(LocoSpd(*slot_adr_map.get(&adr).unwrap(), SpeedArg::Drive(100))).await.unwrap();

let mut waiting = true;

while let Ok(message) = receiver.recv().await {
match message {
LocoDriveMessage::Message(message) => {
match message {
Message::InputRep(in_arg) => {
if i % 2 == 0 && in_arg.address() == 1 && in_arg.input_source() == SourceType::Switch && in_arg.sensor_level() == SensorLevel::High {
waiting = false;
loco_controller.send_message(LocoSpd(*slot_adr_map.get(&adr).unwrap(), SpeedArg::Drive(50))).await.unwrap();
} else if i % 2 == 1 && in_arg.address() == 2 && in_arg.input_source() == SourceType::Ds54Aux && in_arg.sensor_level() == SensorLevel::High {
waiting = false;
loco_controller.send_message(LocoSpd(*slot_adr_map.get(&adr).unwrap(), SpeedArg::Drive(50))).await.unwrap();
} else if !waiting && in_arg.address() == 2 && in_arg.input_source() == SourceType::Switch && in_arg.sensor_level() == SensorLevel::Low {
break;
}
},
_ => {}
}
}
LocoDriveMessage::Answer(_, _) => {}
LocoDriveMessage::Error(err) => {
eprintln!("Message could not be read! {:?}", err);
exit(1)
}
LocoDriveMessage::SerialPortError(err) => {
eprintln!("Connection refused! {:?}", err);
exit(1)
}
}
}

loco_controller.send_message(LocoSpd(*slot_adr_map.get(&adr).unwrap(), SpeedArg::Stop)).await.unwrap();

sleep(Duration::from_secs(6)).await;
}

println!("Drive 10 rounds!")
}
}

0 comments on commit c343fa0

Please sign in to comment.