From c343fa052a61314748e362941ef41557c69a5739 Mon Sep 17 00:00:00 2001 From: Fabius Mettner Date: Fri, 2 Sep 2022 20:08:08 +0200 Subject: [PATCH] feat: Added test, fix: Bug causing crash on drop LocoDriveController in async context --- src/args.rs | 9 +-- src/loco_controller.rs | 35 +++++++--- src/tests.rs | 141 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 169 insertions(+), 16 deletions(-) diff --git a/src/args.rs b/src/args.rs index 1163c96..cb75c25 100644 --- a/src/args.rs +++ b/src/args.rs @@ -71,9 +71,10 @@ pub enum SwitchDirection { pub struct SwitchArg { /// The address of the switch (0 - 2047) address: u16, - /// The switches direction state + /// The switches direction state, which direction the switch points direction: SwitchDirection, - /// The activation state of the switch (`true = ON, false = OFF`) + /// If the switch is not in the requested direction. + /// Use true if you want the switch to go to the direction. state: bool, } @@ -85,7 +86,7 @@ impl SwitchArg { /// /// - `address`: The address of the switch you want to change state (0 to 2047) /// - `direction`: The direction the switch should switch to - /// - `state`: The activation state of the switch (`true = ON`, `false = OFF`) + /// - `state`: The activation state of the switch (If the switch is in the requested state) pub fn new(address: u16, direction: SwitchDirection, state: bool) -> Self { Self { address, @@ -134,7 +135,7 @@ impl SwitchArg { } /// # Returns /// - /// The switches activation status (`true = ON, false = OFF`) + /// The switches activation status. False if the switch has switched to the requested state. pub fn state(&self) -> bool { self.state } diff --git a/src/loco_controller.rs b/src/loco_controller.rs index 06b96a6..b1c8761 100644 --- a/src/loco_controller.rs +++ b/src/loco_controller.rs @@ -1,9 +1,11 @@ use std::fmt::Debug; use crate::error::{LocoDriveSendingError, MessageParseError}; use crate::protocol::Message; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Mutex}; +use std::thread::{spawn}; use tokio::time::{sleep, Duration}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::runtime::Runtime; use tokio::sync::broadcast::Sender; use tokio::task::JoinHandle; use tokio::sync::Notify; @@ -236,13 +238,25 @@ impl LocoDriveController { /// /// This function panics if the reading thread has panicked or the reading thread was killed, /// by some external source. - async fn stop_reader(&mut self) { + fn stop_reader(&mut self) { if let Some(reader) = self.reading_thread.take() { // Note the thread to end reading *self.stop.lock().unwrap() = true; self.fire_stop.notify_waiters(); // Wait until the thread is stopped - reader.await.unwrap(); + match spawn(move || { + let runtime = match Runtime::new() { + Ok(runtime) => runtime, + Err(_) => { return; } + }; + match runtime.block_on(reader) { + Ok(_) => "", + Err(_) => "", + }; + }).join() { + Ok(_) => "", + Err(_) => "", + }; // We allow new threads to spawn and read from the port *self.stop.lock().unwrap() = false; @@ -370,6 +384,8 @@ impl LocoDriveController { // We read the next message from the serial port let parsed = LocoDriveController::read_next_message(port, send, stopping, ignore_send_messages).await; + println!("Read: {:?}", parsed); + // We check which type the message we received is match parsed { // We can at this level ignore update messages @@ -550,10 +566,13 @@ impl LocoDriveController { _ = notify.notified() => false, _ = sleep(Duration::from_millis(self.sending_timeout)) => true, } { - return Err(LocoDriveSendingError::Timeout) + Err(LocoDriveSendingError::Timeout) + } else { + Ok(()) } + } else { + Ok(()) } - Ok(()) } Err(_) => Err(LocoDriveSendingError::NotWritable), } @@ -570,10 +589,6 @@ impl Drop for LocoDriveController { /// /// The drop panics if the reading thread has panicked. fn drop(&mut self) { - let runtime = match tokio::runtime::Runtime::new() { - Ok(runtime) => runtime, - Err(_) => { return; } - }; - runtime.block_on(self.stop_reader()); + self.stop_reader() } } diff --git a/src/tests.rs b/src/tests.rs index cdad60b..0e56781 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,15 +1,22 @@ /// Tests all testable core functions of this module #[cfg(test)] mod tests { + use std::collections::HashMap; + use std::process::exit; + use std::time::Duration; + use tokio::time::sleep; + use tokio_serial::FlowControl; use crate::args::{Ack1Arg, AddressArg, Consist, CvDataArg, DecoderType, DirfArg, DstArg, FastClock, FunctionArg, FunctionGroup, IdArg, ImAddress, ImArg, ImFunctionType, InArg, LissyIrReport, LopcArg, MultiSenseArg, Pcmd, ProgrammingAbortedArg, PStat, PxctData, RepStructure, RFID5Report, RFID7Report, SensorLevel, SlotArg, SnArg, SndArg, SourceType, SpeedArg, Stat1Arg, Stat2Arg, State, SwitchArg, SwitchDirection, TrkArg, WheelcntReport, WrSlDataStructure}; + use crate::loco_controller::{LocoDriveController, LocoDriveMessage}; use crate::protocol::Message; + use crate::protocol::Message::{GpOn, LocoSpd}; /// Tests if the message parsing is reliable #[test] fn messages() { // Test 2 byte messages test_one_message(Message::Idle); - test_one_message(Message::GpOn); + test_one_message(GpOn); test_one_message(Message::GpOff); test_one_message(Message::Busy); @@ -29,7 +36,7 @@ mod tests { test_one_message(Message::SwReq(SwitchArg::new(10, SwitchDirection::Curved, false))); test_one_message(Message::LocoSnd(SlotArg::new(24), SndArg::new(false, true, false, true))); test_one_message(Message::LocoDirf(SlotArg::new(10), DirfArg::new(false, true, false, true, false, false))); - test_one_message(Message::LocoSpd(SlotArg::new(10), SpeedArg::Drive(122))); + test_one_message(LocoSpd(SlotArg::new(10), SpeedArg::Drive(122))); // Test 6 bytes messages test_one_message(Message::MultiSense(MultiSenseArg::new(3, false, 3, 7), AddressArg::new(12))); @@ -144,4 +151,134 @@ mod tests { message ); } + + #[tokio::test] + async fn test_message_sending() { + println!("Start test!"); + + for port in tokio_serial::available_ports().unwrap() { + println!("Port: {:?}", port) + } + + let (sender, mut receiver) = tokio::sync::broadcast::channel(10); + + println!("Try to connect to port!"); + + let mut loco_controller = match LocoDriveController::new( + "/dev/ttyUSB0", + 115_200, + 5000, + FlowControl::Software, + sender, + false, + ).await { + Ok(loco_controller) => loco_controller, + Err(err) => { + eprintln!("Error: Could not connect to the serial port! {:?}", err); + println!(); + return; + } + }; + + loco_controller.send_message(GpOn).await.unwrap(); + + println!("Setup test train"); + + let adr = AddressArg::new(5); + + let mut slot_adr_map: HashMap = HashMap::new(); + + match loco_controller.send_message(Message::LocoAdr(adr)).await { + Ok(()) => {}, + Err(err) => { + eprintln!("Message was not send! {:?}", err); + println!(); + exit(1) + } + }; + + loop { + match receiver.recv().await { + Ok(message) => + match message { + LocoDriveMessage::Message(message) => { + match message { + Message::SlRdData(slot, _, address, ..) => { + slot_adr_map.insert(address, slot); + println!("Added {:?}, {:?} to {:?}", address, slot, slot_adr_map); + break; + }, + _ => {} + } + } + LocoDriveMessage::Answer(_, _) => {} + LocoDriveMessage::Error(err) => { + eprintln!("Message could not be read! {:?}", err); + exit(1) + } + LocoDriveMessage::SerialPortError(err) => { + eprintln!("Connection refused! {:?}", err); + exit(1) + } + }, + Err(err) => { + println!("WHAT? {:?}", err); + } + } + } + + println!("Known Trains: {:?}", slot_adr_map); + + for i in 1..11 { + println!("Drive round {}", i); + + if i % 2 == 0 { + loco_controller.send_message(Message::SwReq(SwitchArg::new(8, SwitchDirection::Straight, true))).await.unwrap(); + loco_controller.send_message(Message::SwReq(SwitchArg::new(9, SwitchDirection::Curved, true))).await.unwrap(); + } else { + loco_controller.send_message(Message::SwReq(SwitchArg::new(8, SwitchDirection::Curved, true))).await.unwrap(); + loco_controller.send_message(Message::SwReq(SwitchArg::new(9, SwitchDirection::Straight, true))).await.unwrap(); + } + + loco_controller.send_message(LocoSpd(*slot_adr_map.get(&adr).unwrap(), SpeedArg::Drive(100))).await.unwrap(); + + let mut waiting = true; + + while let Ok(message) = receiver.recv().await { + match message { + LocoDriveMessage::Message(message) => { + match message { + Message::InputRep(in_arg) => { + if i % 2 == 0 && in_arg.address() == 1 && in_arg.input_source() == SourceType::Switch && in_arg.sensor_level() == SensorLevel::High { + waiting = false; + loco_controller.send_message(LocoSpd(*slot_adr_map.get(&adr).unwrap(), SpeedArg::Drive(50))).await.unwrap(); + } else if i % 2 == 1 && in_arg.address() == 2 && in_arg.input_source() == SourceType::Ds54Aux && in_arg.sensor_level() == SensorLevel::High { + waiting = false; + loco_controller.send_message(LocoSpd(*slot_adr_map.get(&adr).unwrap(), SpeedArg::Drive(50))).await.unwrap(); + } else if !waiting && in_arg.address() == 2 && in_arg.input_source() == SourceType::Switch && in_arg.sensor_level() == SensorLevel::Low { + break; + } + }, + _ => {} + } + } + LocoDriveMessage::Answer(_, _) => {} + LocoDriveMessage::Error(err) => { + eprintln!("Message could not be read! {:?}", err); + exit(1) + } + LocoDriveMessage::SerialPortError(err) => { + eprintln!("Connection refused! {:?}", err); + exit(1) + } + } + } + + loco_controller.send_message(LocoSpd(*slot_adr_map.get(&adr).unwrap(), SpeedArg::Stop)).await.unwrap(); + + sleep(Duration::from_secs(6)).await; + } + + println!("Drive 10 rounds!") + } }