Skip to content

Commit

Permalink
Merge pull request #201 from astrogeco/integration-candidate
Browse files Browse the repository at this point in the history
cFS-GroundSystem Integration candidate: Caelum+dev1
  • Loading branch information
astrogeco authored Dec 6, 2021
2 parents 2af2a8d + 79e1368 commit e9015f5
Show file tree
Hide file tree
Showing 38 changed files with 1,519 additions and 1,499 deletions.
154 changes: 77 additions & 77 deletions GroundSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#
# cFS Ground System Version 2.0.0
#
#!/usr/bin/env python3
# !/usr/bin/env python3
#
import shlex
import subprocess
Expand All @@ -31,21 +31,21 @@
from PyQt5.QtWidgets import QApplication, QMainWindow, QMessageBox

from RoutingService import RoutingService
from Ui_MainWindow import Ui_MainWindow
from UiMainWindow import UiMainWindow

from _version import __version__ as _version
from _version import _version_string

__version__ = _version

#ROOTDIR = Path(sys.argv[0]).resolve().parent
# ROOTDIR = Path(sys.argv[0]).resolve().parent
ROOTDIR = pathlib.Path(__file__).parent.absolute()


#
# CFS Ground System: Setup and manage the main window
#
class GroundSystem(QMainWindow, Ui_MainWindow):
class GroundSystem(QMainWindow, UiMainWindow):
TLM_HDR_V1_OFFSET = 4
TLM_HDR_V2_OFFSET = 4
CMD_HDR_PRI_V1_OFFSET = 0
Expand All @@ -60,58 +60,58 @@ def __init__(self):
super().__init__()
self.setupUi(self)

self.RoutingService = None
self.routing_service = None
self.alert = QMessageBox()

# set initial defaults
self.sbTlmOffset.setValue(self.TLM_HDR_V1_OFFSET)
self.sbCmdOffsetPri.setValue(self.CMD_HDR_PRI_V1_OFFSET)
self.sbCmdOffsetSec.setValue(self.CMD_HDR_SEC_V1_OFFSET)
self.sb_tlm_offset.setValue(self.TLM_HDR_V1_OFFSET)
self.sb_cmd_offset_pri.setValue(self.CMD_HDR_PRI_V1_OFFSET)
self.sb_cmd_offset_sec.setValue(self.CMD_HDR_SEC_V1_OFFSET)

self.pushButtonStartTlm.clicked.connect(self.startTlmSystem)
self.pushButtonStartCmd.clicked.connect(self.startCmdSystem)
self.cbTlmHeaderVer.currentIndexChanged.connect(self.setTlmOffset)
self.cbCmdHeaderVer.currentIndexChanged.connect(self.setCmdOffsets)
self.push_button_start_tlm.clicked.connect(self.start_tlm_system)
self.push_button_start_cmd.clicked.connect(self.start_cmd_system)
self.cb_tlm_header_ver.currentIndexChanged.connect(self.set_tlm_offset)
self.cb_cmd_header_ver.currentIndexChanged.connect(self.set_cmd_offsets)

for sb in (self.sbTlmOffset, self.sbCmdOffsetPri, self.sbCmdOffsetSec):
sb.valueChanged.connect(self.saveOffsets)
for sb in (self.sb_tlm_offset, self.sb_cmd_offset_pri, self.sb_cmd_offset_sec):
sb.valueChanged.connect(self.save_offsets)
# Init lists
self.ipAddressesList = ['All']
self.spacecraftNames = ['All']
self.ip_addresses_list = ['All']
self.spacecraft_names = ['All']

def closeEvent(self, evnt):
if self.RoutingService:
self.RoutingService.stop()
if self.routing_service:
self.routing_service.stop()
print("Stopped routing service")
os.kill(0, signal.SIGKILL)
super().closeEvent(evnt)

# Read the selected spacecraft from combo box on GUI
def getSelectedSpacecraftAddress(self):
return self.comboBoxIpAddresses.currentText().strip()
def get_selected_spacecraft_address(self):
return self.combo_box_ip_addresses.currentText().strip()

# Returns the name of the selected spacecraft
def getSelectedSpacecraftName(self):
return self.spacecraftNames[self.ipAddressesList.index(
self.getSelectedSpacecraftAddress())].strip()
def get_selected_spacecraft_name(self):
return self.spacecraft_names[self.ip_addresses_list.index(
self.get_selected_spacecraft_address())].strip()

#
# Display popup with error
#
def DisplayErrorMessage(self, message):
def display_error_message(self, message):
print(message)
self.alert.setText(message)
self.alert.setIcon(QMessageBox.Warning)
self.alert.exec_()

# Start the telemetry system for the selected spacecraft
def startTlmSystem(self):
def start_tlm_system(self):
# Setup the subscription (to let the telemetry
# system know the messages it will be receiving)
subscription = '--sub=GroundSystem'
selectedSpacecraft = self.getSelectedSpacecraftName()
if selectedSpacecraft != 'All':
subscription += f'.{selectedSpacecraft}.TelemetryPackets'
selected_spacecraft = self.get_selected_spacecraft_name()
if selected_spacecraft != 'All':
subscription += f'.{selected_spacecraft}.TelemetryPackets'

# Open Telemetry System
system_call = f'python3 {ROOTDIR}/Subsystems/tlmGUI/TelemetrySystem.py {subscription}'
Expand All @@ -120,93 +120,93 @@ def startTlmSystem(self):

# Start command system
@staticmethod
def startCmdSystem():
def start_cmd_system():
subprocess.Popen(
['python3', f'{ROOTDIR}/Subsystems/cmdGui/CommandSystem.py'])

# Start FDL-FUL gui system
def startFDLSystem(self):
selectedSpacecraft = self.getSelectedSpacecraftName()
if selectedSpacecraft == 'All':
self.DisplayErrorMessage(
def start_fdl_system(self):
selected_spacecraft = self.get_selected_spacecraft_name()
if selected_spacecraft == 'All':
self.display_error_message(
'Cannot open FDL manager.\nNo spacecraft selected.')
else:
subscription = f'--sub=GroundSystem.{selectedSpacecraft}'
subscription = f'--sub=GroundSystem.{selected_spacecraft}'
subprocess.Popen([
'python3', f'{ROOTDIR}/Subsystems/fdlGui/FdlSystem.py',
subscription
])

def setTlmOffset(self):
selectedVer = self.cbTlmHeaderVer.currentText().strip()
if selectedVer == "Custom":
self.sbTlmOffset.setEnabled(True)
def set_tlm_offset(self):
selected_ver = self.cb_tlm_header_ver.currentText().strip()
if selected_ver == "Custom":
self.sb_tlm_offset.setEnabled(True)
else:
self.sbTlmOffset.setEnabled(False)
if selectedVer == "1":
self.sbTlmOffset.setValue(self.TLM_HDR_V1_OFFSET)
elif selectedVer == "2":
self.sbTlmOffset.setValue(self.TLM_HDR_V2_OFFSET)

def setCmdOffsets(self):
selectedVer = self.cbCmdHeaderVer.currentText().strip()
if selectedVer == "Custom":
self.sbCmdOffsetPri.setEnabled(True)
self.sbCmdOffsetSec.setEnabled(True)
self.sb_tlm_offset.setEnabled(False)
if selected_ver == "1":
self.sb_tlm_offset.setValue(self.TLM_HDR_V1_OFFSET)
elif selected_ver == "2":
self.sb_tlm_offset.setValue(self.TLM_HDR_V2_OFFSET)

def set_cmd_offsets(self):
selected_ver = self.cb_cmd_header_ver.currentText().strip()
if selected_ver == "Custom":
self.sb_cmd_offset_pri.setEnabled(True)
self.sb_cmd_offset_sec.setEnabled(True)
else:
self.sbCmdOffsetPri.setEnabled(False)
self.sbCmdOffsetSec.setEnabled(False)
if selectedVer == "1":
self.sbCmdOffsetPri.setValue(self.CMD_HDR_PRI_V1_OFFSET)
self.sbCmdOffsetSec.setValue(self.CMD_HDR_SEC_V1_OFFSET)
elif selectedVer == "2":
self.sbCmdOffsetPri.setValue(self.CMD_HDR_PRI_V2_OFFSET)
self.sbCmdOffsetSec.setValue(self.CMD_HDR_SEC_V2_OFFSET)

def saveOffsets(self):
offsets = bytes((self.sbTlmOffset.value(), self.sbCmdOffsetPri.value(),
self.sbCmdOffsetSec.value()))
self.sb_cmd_offset_pri.setEnabled(False)
self.sb_cmd_offset_sec.setEnabled(False)
if selected_ver == "1":
self.sb_cmd_offset_pri.setValue(self.CMD_HDR_PRI_V1_OFFSET)
self.sb_cmd_offset_sec.setValue(self.CMD_HDR_SEC_V1_OFFSET)
elif selected_ver == "2":
self.sb_cmd_offset_pri.setValue(self.CMD_HDR_PRI_V2_OFFSET)
self.sb_cmd_offset_sec.setValue(self.CMD_HDR_SEC_V2_OFFSET)

def save_offsets(self):
offsets = bytes((self.sb_tlm_offset.value(), self.sb_cmd_offset_pri.value(),
self.sb_cmd_offset_sec.value()))
with open("/tmp/OffsetData", "wb") as f:
f.write(offsets)

# Update the combo box list in gui
def updateIpList(self, ip, name):
self.ipAddressesList.append(ip)
self.spacecraftNames.append(name)
self.comboBoxIpAddresses.addItem(ip)
def update_ip_list(self, ip, name):
self.ip_addresses_list.append(ip)
self.spacecraft_names.append(name)
self.combo_box_ip_addresses.addItem(ip)

# Start the routing service (see RoutingService.py)
def initRoutingService(self):
self.RoutingService = RoutingService()
self.RoutingService.signalUpdateIpList.connect(self.updateIpList)
self.RoutingService.start()
def init_routing_service(self):
self.routing_service = RoutingService()
self.routing_service.signal_update_ip_list.connect(self.update_ip_list)
self.routing_service.start()


#
# Main

#
def main():

# Report Version Number upon startup
print(_version_string)

# Init app
app = QApplication(sys.argv)

# Init main window
MainWindow = GroundSystem()
main_window = GroundSystem()

# Show and put window on front
MainWindow.show()
MainWindow.raise_()
main_window.show()
main_window.raise_()

# Start the Routing Service
MainWindow.initRoutingService()
MainWindow.saveOffsets()
main_window.init_routing_service()
main_window.save_offsets()

# Execute the app
sys.exit(app.exec_())


if __name__ == "__main__":
main()
main()
Loading

0 comments on commit e9015f5

Please sign in to comment.