diff --git a/auto_rx/auto_rx.py b/auto_rx/auto_rx.py index b5f18729..83610a4b 100644 --- a/auto_rx/auto_rx.py +++ b/auto_rx/auto_rx.py @@ -8,6 +8,16 @@ # Refer github page for instructions on setup and usage. # https://github.com/projecthorus/radiosonde_auto_rx/ # + +# exit status codes: +# +# 0 - normal termination (ctrl-c) +# 1 - critical error, needs human attention to fix +# 2 - exit because continous running timeout reached +# 3 - exception occurred, can rerun after resetting SDR +# 4 - some of the threads failed to join, SDR reset and restart required +# this is mostly caused by hung external utilities + import argparse import datetime import logging @@ -44,6 +54,7 @@ start_flask, stop_flask, flask_emit_event, + flask_running, WebHandler, WebExporter, ) @@ -322,7 +333,7 @@ def handle_scan_results(): if (type(_key) == int) or (type(_key) == float): # Extract the currently decoded sonde type from the currently running decoder. _decoding_sonde_type = autorx.task_list[_key]["task"].sonde_type - + # Remove any inverted decoder information for the comparison. if _decoding_sonde_type.startswith("-"): _decoding_sonde_type = _decoding_sonde_type[1:] @@ -806,6 +817,11 @@ def main(): logging.getLogger("engineio").setLevel(logging.ERROR) logging.getLogger("geventwebsocket").setLevel(logging.ERROR) + # Check all the RS utilities exist. + logging.debug("Checking if utils exist") + if not check_rs_utils(): + sys.exit(1) + # Attempt to read in config file logging.info("Reading configuration file...") _temp_cfg = read_auto_rx_config(args.config) @@ -844,9 +860,6 @@ def main(): web_handler = WebHandler() logging.getLogger().addHandler(web_handler) - # Check all the RS utilities exist. - if not check_rs_utils(): - sys.exit(1) # If a sonde type has been provided, insert an entry into the scan results, # and immediately start a decoder. This also sets the decoder time to 0, which @@ -1074,7 +1087,7 @@ def main(): logging.info("Shutdown time reached. Closing.") stop_flask(host=config["web_host"], port=config["web_port"]) stop_all() - break + sys.exit(2) if __name__ == "__main__": @@ -1085,9 +1098,13 @@ def main(): # Upon CTRL+C, shutdown all threads and exit. stop_flask(host=config["web_host"], port=config["web_port"]) stop_all() + sys.exit(0) except Exception as e: # Upon exceptions, attempt to shutdown threads and exit. traceback.print_exc() print("Main Loop Error - %s" % str(e)) - stop_flask(host=config["web_host"], port=config["web_port"]) + if flask_running(): + stop_flask(host=config["web_host"], port=config["web_port"]) stop_all() + sys.exit(3) + diff --git a/auto_rx/auto_rx.sh b/auto_rx/auto_rx.sh index a8a1b567..6c901cd3 100755 --- a/auto_rx/auto_rx.sh +++ b/auto_rx/auto_rx.sh @@ -6,7 +6,9 @@ # NOTE: If running this from crontab, make sure to set the appropriate PATH env-vars, # else utilities like rtl_power and rtl_fm won't be found. # -# WARNING - THIS IS DEPRECATED - PLEASE USE THE SYSTEMD SERVICE +# WARNING - THIS IS DEPRECATED - PLEASE USE THE SYSTEMD SERVICE OR DOCKER IMAGE +# See: https://github.com/projecthorus/radiosonde_auto_rx/wiki#451-option-1---operation-as-a-systemd-service-recommended +# Or: https://github.com/projecthorus/radiosonde_auto_rx/wiki/Docker # # change into appropriate directory @@ -15,7 +17,4 @@ cd $(dirname $0) # Clean up old files rm log_power*.csv -# Start auto_rx process with a 3 hour timeout. -# auto_rx will exit after this time. - -python3 auto_rx.py -t 180 +python3 auto_rx.py -t 180 \ No newline at end of file diff --git a/auto_rx/autorx/__init__.py b/auto_rx/autorx/__init__.py index a108510c..df2b5a84 100644 --- a/auto_rx/autorx/__init__.py +++ b/auto_rx/autorx/__init__.py @@ -12,7 +12,7 @@ # MINOR - New sonde type support, other fairly big changes that may result in telemetry or config file incompatability issus. # PATCH - Small changes, or minor feature additions. -__version__ = "1.6.2-beta4" +__version__ = "1.6.2-beta5" # Global Variables diff --git a/auto_rx/autorx/aprs.py b/auto_rx/autorx/aprs.py index eb864496..747eeaa0 100644 --- a/auto_rx/autorx/aprs.py +++ b/auto_rx/autorx/aprs.py @@ -759,13 +759,19 @@ def close(self): # Wait for all threads to close. if self.upload_thread is not None: - self.upload_thread.join() + self.upload_thread.join(60) + if self.upload_thread.is_alive(): + self.log_error("aprs upload thread failed to join") if self.timer_thread is not None: - self.timer_thread.join() + self.timer_thread.join(60) + if self.timer_thread.is_alive(): + self.log_error("aprs timer thread failed to join") if self.input_thread is not None: - self.input_thread.join() + self.input_thread.join(60) + if self.input_thread.is_alive(): + self.log_error("aprs input thread failed to join") def log_debug(self, line): """ Helper function to log a debug message with a descriptive heading. diff --git a/auto_rx/autorx/config.py b/auto_rx/autorx/config.py index f9b9a56d..ff04798b 100644 --- a/auto_rx/autorx/config.py +++ b/auto_rx/autorx/config.py @@ -883,7 +883,7 @@ def read_auto_rx_config(filename, no_sdr_test=False): if len(auto_rx_config["sdr_settings"].keys()) == 0: # We have no SDRs to use!! logging.error("Config - No working SDRs! Cannot run...") - return None + raise SystemError("No working SDRs!") else: # Create a global copy of the configuration file at this point global_config = copy.deepcopy(auto_rx_config) @@ -902,7 +902,8 @@ def read_auto_rx_config(filename, no_sdr_test=False): web_password = auto_rx_config["web_password"] return auto_rx_config - + except SystemError as e: + raise e except: traceback.print_exc() logging.error("Could not parse config file.") diff --git a/auto_rx/autorx/email_notification.py b/auto_rx/autorx/email_notification.py index 4a8f972c..6e66b4bf 100644 --- a/auto_rx/autorx/email_notification.py +++ b/auto_rx/autorx/email_notification.py @@ -379,7 +379,9 @@ def close(self): self.input_processing_running = False if self.input_thread is not None: - self.input_thread.join() + self.input_thread.join(60) + if self.input_thread.is_alive(): + self.log_error("email notification input thread failed to join") def running(self): """ Check if the logging thread is running. diff --git a/auto_rx/autorx/gpsd.py b/auto_rx/autorx/gpsd.py index bdd8d8e1..ac19782a 100644 --- a/auto_rx/autorx/gpsd.py +++ b/auto_rx/autorx/gpsd.py @@ -335,7 +335,9 @@ def close(self): self.gpsd_thread_running = False # Wait for the thread to close. if self.gpsd_thread != None: - self.gpsd_thread.join() + self.gpsd_thread.join(60) + if self.gpsd_thread.is_alive(): + logging.error("GPS thread failed to join") def send_to_callback(self, data): """ diff --git a/auto_rx/autorx/habitat.py b/auto_rx/autorx/habitat.py index ebb4303b..f76c4299 100644 --- a/auto_rx/autorx/habitat.py +++ b/auto_rx/autorx/habitat.py @@ -831,13 +831,20 @@ def close(self): # Wait for all threads to close. if self.upload_thread is not None: - self.upload_thread.join() + self.upload_thread.join(60) + if self.upload_thread.is_alive(): + self.log_error("habitat upload thread failed to join") + if self.timer_thread is not None: - self.timer_thread.join() + self.timer_thread.join(60) + if self.timer_thread.is_alive(): + self.log_error("habitat timer thread failed to join") if self.input_thread is not None: - self.input_thread.join() + self.input_thread.join(60) + if self.input_thread.is_alive(): + self.log_error("habitat input thread failed to join") def log_debug(self, line): """ Helper function to log a debug message with a descriptive heading. diff --git a/auto_rx/autorx/log_files.py b/auto_rx/autorx/log_files.py index 6a595bfc..95eefde0 100644 --- a/auto_rx/autorx/log_files.py +++ b/auto_rx/autorx/log_files.py @@ -455,9 +455,8 @@ def calculate_skewt_data( break except Exception as e: - print(str(e)) - - # Continue through the data.. + logging.exception(f"Exception {str(e)} in calculate_skewt_data") + raise return _skewt diff --git a/auto_rx/autorx/ozimux.py b/auto_rx/autorx/ozimux.py index a3c8ce84..685a6910 100644 --- a/auto_rx/autorx/ozimux.py +++ b/auto_rx/autorx/ozimux.py @@ -252,7 +252,9 @@ def close(self): self.input_processing_running = False if self.input_thread is not None: - self.input_thread.join() + self.input_thread.join(60) + if self.input_thread.is_alive(): + self.log_error("ozimux input thread failed to join") def log_debug(self, line): """ Helper function to log a debug message with a descriptive heading. diff --git a/auto_rx/autorx/rotator.py b/auto_rx/autorx/rotator.py index 5301cf80..82ee374c 100644 --- a/auto_rx/autorx/rotator.py +++ b/auto_rx/autorx/rotator.py @@ -320,7 +320,9 @@ def close(self): self.rotator_thread_running = False if self.rotator_thread is not None: - self.rotator_thread.join() + self.rotator_thread.join(60) + if self.rotator_thread.is_alive(): + self.log_error("rotator control thread failed to join") self.log_debug("Stopped rotator control thread.") diff --git a/auto_rx/autorx/scan.py b/auto_rx/autorx/scan.py index 1077c5ee..fe71746a 100644 --- a/auto_rx/autorx/scan.py +++ b/auto_rx/autorx/scan.py @@ -9,6 +9,7 @@ import logging import numpy as np import os +import sys import platform import subprocess import time @@ -22,6 +23,7 @@ reset_rtlsdr_by_serial, reset_all_rtlsdrs, peak_decimation, + timeout_cmd ) from .sdr_wrappers import test_sdr, reset_sdr, get_sdr_name, get_sdr_iq_cmd, get_sdr_fm_cmd, get_power_spectrum @@ -91,18 +93,10 @@ def run_rtl_power( if os.path.exists(filename): os.remove(filename) - # Add -k 30 option, to SIGKILL rtl_power 30 seconds after the regular timeout expires. - # Note that this only works with the GNU Coreutils version of Timeout, not the IBM version, - # which is provided with OSX (Darwin). - if "Darwin" in platform.platform(): - timeout_kill = "" - else: - timeout_kill = "-k 30 " - rtl_power_cmd = ( - "timeout %s%d %s %s-f %d:%d:%d -i %d -1 -c 25%% -p %d -d %s %s%s" + "%s %d %s %s-f %d:%d:%d -i %d -1 -c 25%% -p %d -d %s %s%s" % ( - timeout_kill, + timeout_cmd(), dwell + 10, rtl_power_path, bias_option, @@ -314,7 +308,7 @@ def detect_sonde( if _mode == "IQ": # IQ decoding - rx_test_command = f"timeout {dwell_time * 2} " + rx_test_command = f"{timeout_cmd()} {dwell_time * 2} " rx_test_command += get_sdr_iq_cmd( sdr_type=sdr_type, @@ -331,8 +325,9 @@ def detect_sonde( ) # rx_test_command = ( - # "timeout %ds %s %s-p %d -d %s %s-M raw -F9 -s %d -f %d 2>/dev/null |" + # "%s %ds %s %s-p %d -d %s %s-M raw -F9 -s %d -f %d 2>/dev/null |" # % ( + # timeout_cmd(), # dwell_time * 2, # rtl_fm_path, # bias_option, @@ -360,7 +355,7 @@ def detect_sonde( # Sample Source (rtl_fm) - rx_test_command = f"timeout {dwell_time * 2} " + rx_test_command = f"{timeout_cmd()} {dwell_time * 2} " rx_test_command += get_sdr_fm_cmd( sdr_type=sdr_type, @@ -379,8 +374,9 @@ def detect_sonde( ) # rx_test_command = ( - # "timeout %ds %s %s-p %d -d %s %s-M fm -F9 -s %d -f %d 2>/dev/null |" + # "%s %ds %s %s-p %d -d %s %s-M fm -F9 -s %d -f %d 2>/dev/null |" # % ( + # timeout_cmd(), # dwell_time * 2, # rtl_fm_path, # bias_option, @@ -783,9 +779,9 @@ def __init__( def start(self): # Start the scan loop (if not already running) if self.sonde_scan_thread is None: - self.sonde_scanner_running = True self.sonde_scan_thread = Thread(target=self.scan_loop) self.sonde_scan_thread.start() + self.sonde_scanner_running = True else: self.log_warning("Sonde scan already running!") @@ -854,26 +850,32 @@ def scan_loop(self): sdr_hostname = self.sdr_hostname, sdr_port = self.sdr_port ) - - time.sleep(10) + for _ in range(10): + if not self.sonde_scanner_running: + break + time.sleep(1) continue except Exception as e: traceback.print_exc() self.log_error("Caught other error: %s" % str(e)) - time.sleep(10) + for _ in range(10): + if not self.sonde_scanner_running: + break + time.sleep(1) else: # Scan completed successfuly! Reset the error counter. self.error_retries = 0 # Sleep before starting the next scan. - for i in range(self.scan_delay): - time.sleep(1) - if self.sonde_scanner_running == False: + for _ in range(self.scan_delay): + if not self.sonde_scanner_running: self.log_debug("Breaking out of scan loop.") break + time.sleep(1) self.log_info("Scanner Thread Closed.") self.sonde_scanner_running = False + self.sonde_scanner_thread = None def sonde_search(self, first_only=False): """Perform a frequency scan across a defined frequency range, and test each detected peak for the presence of a radiosonde. @@ -1143,12 +1145,16 @@ def oneshot(self, first_only=False): def stop(self, nowait=False): """Stop the Scan Loop""" - self.log_info("Waiting for current scan to finish...") - self.sonde_scanner_running = False + if self.sonde_scanner_running: + self.log_info("Waiting for current scan to finish...") + self.sonde_scanner_running = False - # Wait for the sonde scanner thread to close, if there is one. - if self.sonde_scan_thread != None and (not nowait): - self.sonde_scan_thread.join() + # Wait for the sonde scanner thread to close, if there is one. + if self.sonde_scan_thread != None and (not nowait): + self.sonde_scan_thread.join(60) + if self.sonde_scan_thread.is_alive(): + self.log_error("Scanning thread did not finish, terminating") + sys.exit(4) def running(self): """Check if the scanner is running""" diff --git a/auto_rx/autorx/sdr_wrappers.py b/auto_rx/autorx/sdr_wrappers.py index 62af89d1..2b187ad2 100644 --- a/auto_rx/autorx/sdr_wrappers.py +++ b/auto_rx/autorx/sdr_wrappers.py @@ -11,7 +11,7 @@ import subprocess import numpy as np -from .utils import rtlsdr_test, reset_rtlsdr_by_serial, reset_all_rtlsdrs +from .utils import rtlsdr_test, reset_rtlsdr_by_serial, reset_all_rtlsdrs, timeout_cmd def test_sdr( @@ -67,7 +67,7 @@ def test_sdr( return False _cmd = ( - f"timeout 10 " # Add a timeout, because connections to non-existing IPs seem to block. + f"{timeout_cmd()} 10 " # Add a timeout, because connections to non-existing IPs seem to block. f"{ss_iq_path} " f"-f {check_freq} " f"-s 48000 " @@ -480,17 +480,7 @@ def get_power_spectrum( if os.path.exists(_log_filename): os.remove(_log_filename) - - # Add -k 30 option, to SIGKILL rtl_power 30 seconds after the regular timeout expires. - # Note that this only works with the GNU Coreutils version of Timeout, not the IBM version, - # which is provided with OSX (Darwin). - _platform = platform.system() - if "Darwin" in _platform: - _timeout_kill = "" - else: - _timeout_kill = "-k 30 " - - _timeout_cmd = f"timeout {_timeout_kill}{integration_time+10}" + _timeout_cmd = f"{timeout_cmd()} {integration_time+10} " _gain = "" if gain: @@ -564,17 +554,7 @@ def get_power_spectrum( if os.path.exists(_log_filename): os.remove(_log_filename) - - # Add -k 30 option, to SIGKILL rtl_power 30 seconds after the regular timeout expires. - # Note that this only works with the GNU Coreutils version of Timeout, not the IBM version, - # which is provided with OSX (Darwin). - _platform = platform.system() - if "Darwin" in _platform: - _timeout_kill = "" - else: - _timeout_kill = "-k 30 " - - _timeout_cmd = f"timeout {_timeout_kill}{integration_time+10}" + _timeout_cmd = f"{timeout_cmd()} {integration_time+10} " _frequency_centre = int(frequency_start + (frequency_stop-frequency_start)/2.0) diff --git a/auto_rx/autorx/utils.py b/auto_rx/autorx/utils.py index 8e502faa..9a016afc 100644 --- a/auto_rx/autorx/utils.py +++ b/auto_rx/autorx/utils.py @@ -19,6 +19,7 @@ import time import numpy as np import semver +import shutil from dateutil.parser import parse from datetime import datetime, timedelta from math import radians, degrees, sin, cos, atan2, sqrt, pi @@ -45,6 +46,21 @@ "iq_dec" ] +_timeout_cmd = None + +def timeout_cmd(): + global _timeout_cmd + if not _timeout_cmd: + t=shutil.which("gtimeout") + if t: + _timeout_cmd = "gtimeout -k 30 " + else: + if not shutil.which("timeout"): + logging.critical("timeout command-line tool not present in system. try installing gtimeout.") + sys.exit(1) + else: + _timeout_cmd = "timeout -k 30 " + return _timeout_cmd def check_rs_utils(): """ Check the required RS decoder binaries exist @@ -54,7 +70,7 @@ def check_rs_utils(): if not os.path.isfile(_file): logging.critical("Binary %s does not exist - did you run build.sh?" % _file) return False - + _ = timeout_cmd() return True @@ -776,10 +792,10 @@ def is_rtlsdr(vid, pid): def reset_rtlsdr_by_serial(serial): """ Attempt to reset a RTLSDR with a provided serial number """ - # If not Linux, return immediately. + # If not Linux, raise exception and let auto_rx.py convert it to exit status code. if is_not_linux(): logging.debug("RTLSDR - Not a native Linux system, skipping reset attempt.") - return + raise SystemError("SDR unresponsive") lsusb_info = lsusb() bus_num = None @@ -853,10 +869,10 @@ def find_rtlsdr(serial=None): def reset_all_rtlsdrs(): """ Reset all RTLSDR devices found in the lsusb tree """ - # If not Linux, return immediately. + # If not Linux, raise exception and let auto_rx.py convert it to exit status code. if is_not_linux(): logging.debug("RTLSDR - Not a native Linux system, skipping reset attempt.") - return + raise SystemError("SDR unresponsive") lsusb_info = lsusb() bus_num = None @@ -906,11 +922,12 @@ def rtlsdr_test(device_idx="0", rtl_sdr_path="rtl_sdr", retries=5): logging.debug("RTLSDR - TCP Device, skipping RTLSDR test step.") return True - _rtl_cmd = "timeout 5 %s -d %s -n 200000 - > /dev/null" % ( + _rtl_cmd = "%s 5 %s -d %s -f 400000000 -n 200000 - > /dev/null" % ( + timeout_cmd(), rtl_sdr_path, str(device_idx), ) - + # First, check if the RTLSDR with a provided serial number is present. if device_idx == "0": # Check for the presence of any RTLSDRs. diff --git a/auto_rx/autorx/web.py b/auto_rx/autorx/web.py index 4457d4e7..9911ec34 100644 --- a/auto_rx/autorx/web.py +++ b/auto_rx/autorx/web.py @@ -49,7 +49,7 @@ # This thread will hold the currently running flask application thread. flask_app_thread = None # A key that needs to be matched to allow shutdown. -flask_shutdown_key = "temp" +flask_shutdown_key = None # SocketIO instance socketio = SocketIO(app, async_mode="threading") @@ -289,6 +289,9 @@ def flask_get_log_list(): """ Return a list of log files, as a list of objects """ return json.dumps(list_log_files(quicklook=True)) +def flask_running(): + global flask_shutdown_key + return flask_shutdown_key is not None @app.route("/get_log_by_serial/") def flask_get_log_by_serial(serial):