diff --git a/.gitignore b/.gitignore index 5f7cb1686..f1ff9a395 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,8 @@ desktop.ini install/outputs demo/outputs autohotkey/outputs +spec/install/outputs +lib/cosmos/dart/log coverage/ profile/ demo/tools/mac/CmdExtractor.app/Contents/MacOS/CmdExtractor diff --git a/bin/dart_util b/bin/dart_util new file mode 100644 index 000000000..8984edf0f --- /dev/null +++ b/bin/dart_util @@ -0,0 +1,4 @@ +#!/usr/bin/env ruby +# encoding: ascii-8bit +require 'cosmos' +require 'cosmos/dart/processes/dart_util' diff --git a/cosmos.gemspec b/cosmos.gemspec index 213be551d..c0c4870d9 100644 --- a/cosmos.gemspec +++ b/cosmos.gemspec @@ -45,6 +45,7 @@ spec = Gem::Specification.new do |s| s.executables << 'cstol_converter' s.executables << 'xtce_converter' s.executables << 'dart_import' + s.executables << 'dart_util' if RUBY_ENGINE == 'ruby' # Ruby C Extensions - MRI Only @@ -95,11 +96,11 @@ spec = Gem::Specification.new do |s| s.add_runtime_dependency 'httpclient', '~> 2.8' # From http://www.rubydoc.info/gems/puma#Known_Bugs : - # "For MRI versions 2.2.7, 2.2.8, 2.3.4 and 2.4.1, you may see stream closed in + # "For MRI versions 2.2.7, 2.2.8, 2.2.9, 2.2.10, 2.3.4 and 2.4.1, you may see stream closed in # another thread (IOError). It may be caused by a Ruby bug. It can be # fixed with the gem https://rubygems.org/gems/stopgap_13632" # This is commented out because the gemspec is only evaluated at gem build time - # s.add_runtime_dependency 'stopgap_13632', '~> 1.1.1' if RUBY_ENGINE == 'ruby' and %w(2.2.7 2.2.8 2.3.4 2.4.1).include? RUBY_VERSION # MRI Only + # s.add_runtime_dependency 'stopgap_13632', '~> 1.2.0' if RUBY_ENGINE == 'ruby' and %w(2.2.7 2.2.8 2.3.4 2.4.1).include? RUBY_VERSION # MRI Only # Development Dependencies s.add_development_dependency 'diff-lcs', '~> 1.3' if RUBY_ENGINE == 'ruby' # Get latest for MRI diff --git a/demo/config/system/system.txt b/demo/config/system/system.txt index 7756c4925..08f42ff6c 100644 --- a/demo/config/system/system.txt +++ b/demo/config/system/system.txt @@ -6,6 +6,7 @@ DECLARE_TARGET INST DECLARE_TARGET INST INST2 DECLARE_TARGET EXAMPLE DECLARE_TARGET TEMPLATED +DECLARE_TARGET DART DECLARE_TARGET SYSTEM # Listen Hosts - Ip addresses or hostnames to listen on when running the tools diff --git a/demo/config/targets/DART/cmd_tlm/dart_cmds.txt b/demo/config/targets/DART/cmd_tlm/dart_cmds.txt new file mode 100644 index 000000000..5e58225fb --- /dev/null +++ b/demo/config/targets/DART/cmd_tlm/dart_cmds.txt @@ -0,0 +1,2 @@ +COMMAND DART CLEAR_ERRORS BIG_ENDIAN "Clears error counters and messages" + APPEND_ID_PARAMETER PACKET_ID 8 INT 1 1 1 "Packet Id" diff --git a/demo/config/targets/DART/cmd_tlm/dart_tlm.txt b/demo/config/targets/DART/cmd_tlm/dart_tlm.txt new file mode 100644 index 000000000..39667dc03 --- /dev/null +++ b/demo/config/targets/DART/cmd_tlm/dart_tlm.txt @@ -0,0 +1,40 @@ +TELEMETRY DART STATUS BIG_ENDIAN "DART Status" + APPEND_ID_ITEM PACKET_ID 8 INT 1 "Packet Id" + APPEND_ITEM LAST_PLE_ID 64 INT "Database ID of the most recent PacketLogEntry created" + APPEND_ITEM PLE_STATE_NEED_DECOM 64 INT "Number of PacketLogEntries ready for decom" + APPEND_ITEM PLE_STATE_ERROR 64 INT "Number of PacketLogEntries with errors" + APPEND_ITEM DECOM_COUNT 64 INT "Count of packets decommutated" + APPEND_ITEM DECOM_ERROR_COUNT 64 INT "Count of decommutation errors" + APPEND_ITEM DECOM_MESSAGE 2048 STRING "Most recent decommutation message" + APPEND_ITEM REDUCTION_COUNT 64 INT "Count of packet reduced since DART started" + APPEND_ITEM REDUCTION_ERROR_COUNT 64 INT "Count of reduction errors" + APPEND_ITEM REDUCTION_MESSAGE 2048 STRING "Most recent reduction message" + APPEND_ITEM DART_DATA_BYTES 64 INT "Size of the DART Data folder in bytes" + POLY_READ_CONVERSION 0 0.0000000009313225746154785 + UNITS GB GB + FORMAT_STRING "%0.2f" + APPEND_ITEM DART_LOGS_BYTES 64 INT "Size of the DART Logs folder in bytes" + POLY_READ_CONVERSION 0 0.0000000009313225746154785 + UNITS GB GB + FORMAT_STRING "%0.2f" + APPEND_ITEM DART_DATABASE_BYTES 64 INT "Size of the DART Postgres database in bytes" + POLY_READ_CONVERSION 0 0.0000000009313225746154785 + UNITS GB GB + FORMAT_STRING "%0.2f" + APPEND_ITEM DART_STATUS_SECONDS 64 FLOAT "Seconds to complete status queries" + APPEND_ITEM PLE_FIRST_TIME_S 64 INT "Time of the first PacketLogEntry in the database seconds" + APPEND_ITEM PLE_FIRST_TIME_US 64 INT "Time of the first PacketLogEntry in the database microseconds" + APPEND_ITEM PLE_LAST_TIME_S 64 INT "Time of the last PacketLogEntry in the database seconds" + APPEND_ITEM PLE_LAST_TIME_US 64 INT "Time of the last PacketLogEntry in the database microseconds" + APPEND_ITEM DECOM_MESSAGE_TIME_S 64 INT "Time of decommutation message seconds" + APPEND_ITEM DECOM_MESSAGE_TIME_US 64 INT "Time of decommutation message microseconds" + APPEND_ITEM REDUCTION_MESSAGE_TIME_S 64 INT "Time of reduction message seconds" + APPEND_ITEM REDUCTION_MESSAGE_TIME_US 64 INT "Time of reduction message microseconds" + ITEM PLE_FIRST_TIME 0 0 DERIVED "Time of the first PacketLogEntry in the database" + READ_CONVERSION unix_time_formatted_conversion.rb PLE_FIRST_TIME_S PLE_FIRST_TIME_US + ITEM PLE_LAST_TIME 0 0 DERIVED "Time of the last PacketLogEntry in the database" + READ_CONVERSION unix_time_formatted_conversion.rb PLE_LAST_TIME_S PLE_LAST_TIME_US + ITEM DECOM_MESSAGE_TIME 0 0 DERIVED "Time of decommutation message" + READ_CONVERSION unix_time_formatted_conversion.rb DECOM_MESSAGE_TIME_S DECOM_MESSAGE_TIME_US + ITEM REDUCTION_MESSAGE_TIME 0 0 DERIVED "Time of reduction message" + READ_CONVERSION unix_time_formatted_conversion.rb REDUCTION_MESSAGE_TIME_S REDUCTION_MESSAGE_TIME_US diff --git a/demo/config/targets/DART/cmd_tlm_server.txt b/demo/config/targets/DART/cmd_tlm_server.txt new file mode 100644 index 000000000..4a5b61d3a --- /dev/null +++ b/demo/config/targets/DART/cmd_tlm_server.txt @@ -0,0 +1,6 @@ +# This is a segment of the main cmd_tlm_server.txt that will be used with +# AUTO_INTERFACE_TARGETS or INTERFACE_TARGET + +INTERFACE DART_INT dart_status_interface.rb + TARGET DART + DONT_CONNECT diff --git a/demo/config/targets/DART/screens/status.txt b/demo/config/targets/DART/screens/status.txt new file mode 100644 index 000000000..a4205acb8 --- /dev/null +++ b/demo/config/targets/DART/screens/status.txt @@ -0,0 +1,54 @@ +SCREEN AUTO AUTO 1.0 FIXED + +VERTICAL + + TITLE "DART Status" + + HORIZONTAL + VERTICALBOX + SECTIONHEADER "Ingest Status" + + LABELVALUE DART STATUS LAST_PLE_ID + LABELVALUE DART STATUS PLE_STATE_ERROR + LABELVALUE DART STATUS PLE_FIRST_TIME WITH_UNITS 22 + LABELVALUE DART STATUS PLE_LAST_TIME WITH_UNITS 22 + END + + VERTICALBOX + SECTIONHEADER "Decom Status" + + LABELVALUE DART STATUS PLE_STATE_NEED_DECOM + LABELVALUE DART STATUS DECOM_COUNT + LABELVALUE DART STATUS DECOM_ERROR_COUNT + LABELVALUE DART STATUS DECOM_MESSAGE_TIME WITH_UNITS 22 + LABEL DECOM_MESSAGE: + VALUE DART STATUS DECOM_MESSAGE CONVERTED 50 + END + END + HORIZONTAL + VERTICALBOX + SECTIONHEADER "Reduction Status" + + LABELVALUE DART STATUS REDUCTION_COUNT + LABELVALUE DART STATUS REDUCTION_ERROR_COUNT + LABELVALUE DART STATUS REDUCTION_MESSAGE_TIME WITH_UNITS 22 + LABEL REDUCTION_MESSAGE: + VALUE DART STATUS REDUCTION_MESSAGE CONVERTED 50 + END + + VERTICALBOX + SECTIONHEADER "Dart Database Status" + + LABELVALUE DART STATUS DART_DATA_BYTES + LABELVALUE DART STATUS DART_LOGS_BYTES + LABELVALUE DART STATUS DART_DATABASE_BYTES + LABELVALUE DART STATUS DART_STATUS_SECONDS + END + + END + + VERTICALBOX + LABELVALUE DART STATUS RECEIVED_TIMEFORMATTED WITH_UNITS 22 + END +END + diff --git a/demo/config/targets/DART/target.txt b/demo/config/targets/DART/target.txt new file mode 100644 index 000000000..80d8eb883 --- /dev/null +++ b/demo/config/targets/DART/target.txt @@ -0,0 +1,7 @@ +# Ignored Parameters +# IGNORE_PARAMETER parameter_name +IGNORE_PARAMETER PACKET_ID + +# Ignored Items +# IGNORE_ITEM item_name +IGNORE_ITEM PACKET_ID diff --git a/lib/cosmos/dart/app/helpers/application_helper.rb b/lib/cosmos/dart/app/helpers/application_helper.rb deleted file mode 100644 index de6be7945..000000000 --- a/lib/cosmos/dart/app/helpers/application_helper.rb +++ /dev/null @@ -1,2 +0,0 @@ -module ApplicationHelper -end diff --git a/lib/cosmos/dart/app/models/status.rb b/lib/cosmos/dart/app/models/status.rb new file mode 100644 index 000000000..3a33854c4 --- /dev/null +++ b/lib/cosmos/dart/app/models/status.rb @@ -0,0 +1,2 @@ +class Status < ApplicationRecord +end diff --git a/lib/cosmos/dart/db/migrate/20180423205644_create_statuses.rb b/lib/cosmos/dart/db/migrate/20180423205644_create_statuses.rb new file mode 100644 index 000000000..faa93f3dd --- /dev/null +++ b/lib/cosmos/dart/db/migrate/20180423205644_create_statuses.rb @@ -0,0 +1,22 @@ +class CreateStatuses < ActiveRecord::Migration[5.1] + def change + create_table :statuses do |t| + t.bigint :decom_count, :default => 0 + t.bigint :decom_error_count, :default => 0 + t.text :decom_message, :default => "" + t.datetime :decom_message_time + + t.bigint :reduction_count, :default => 0 + t.bigint :reduction_error_count, :default => 0 + t.text :reduction_message, :default => "" + t.datetime :reduction_message_time + + t.timestamps + end + status = Status.new + time = Time.utc(1970, 1, 1) + status.decom_message_time = time + status.reduction_message_time = time + status.save! + end +end diff --git a/lib/cosmos/dart/db/migrate/20180425211340_add_decom_state_index.rb b/lib/cosmos/dart/db/migrate/20180425211340_add_decom_state_index.rb new file mode 100644 index 000000000..abe7cf63e --- /dev/null +++ b/lib/cosmos/dart/db/migrate/20180425211340_add_decom_state_index.rb @@ -0,0 +1,5 @@ +class AddDecomStateIndex < ActiveRecord::Migration[5.1] + def change + add_index :packet_log_entries, [:decom_state] + end +end diff --git a/lib/cosmos/dart/db/migrate/20180509204705_remove_indexes.rb b/lib/cosmos/dart/db/migrate/20180509204705_remove_indexes.rb new file mode 100644 index 000000000..d97b27c41 --- /dev/null +++ b/lib/cosmos/dart/db/migrate/20180509204705_remove_indexes.rb @@ -0,0 +1,29 @@ +class RemoveIndexes < ActiveRecord::Migration[5.1] + + def change + remove_index :packet_log_entries, :is_tlm + remove_index :packet_log_entries, :target_id + remove_index :packet_log_entries, :meta_id + remove_index :packet_log_entries, :ready + remove_index :packet_log_entries, :packet_id + remove_index :packet_log_entries, :packet_log_id + + ActiveRecord::Base.connection.tables.each do |table| + # Since the decommutation tables are created dynamically we search + # through all the tables looking for tables named something like + # tXXX_YYY where XXX is the PacketConfig ID and YYY is the table index + if table.to_s =~ /^t(\d+)_(\d+)$/ # ASCII art? No! Regex! + packet_config_id = $1.to_i + table_index = $2.to_i + + ["", "_m", "_h", "_d"].each do |modifier| + table_name = table + modifier + remove_index table_name, :meta_id + remove_index table_name, :reduced_id + remove_index table_name, :reduced_state + add_index table_name, :reduced_state, :where => "reduced_state < 2" + end + end + end + end +end diff --git a/lib/cosmos/dart/db/migrate/20180510160002_remove_timestamps_from_ples.rb b/lib/cosmos/dart/db/migrate/20180510160002_remove_timestamps_from_ples.rb new file mode 100644 index 000000000..d45b5f47a --- /dev/null +++ b/lib/cosmos/dart/db/migrate/20180510160002_remove_timestamps_from_ples.rb @@ -0,0 +1,6 @@ +class RemoveTimestampsFromPles < ActiveRecord::Migration[5.1] + def change + remove_column :packet_log_entries, :created_at + remove_column :packet_log_entries, :updated_at + end +end diff --git a/lib/cosmos/dart/db/migrate/20180511194944_add_packet_log_id_to_tables.rb b/lib/cosmos/dart/db/migrate/20180511194944_add_packet_log_id_to_tables.rb new file mode 100644 index 000000000..3e7c1887a --- /dev/null +++ b/lib/cosmos/dart/db/migrate/20180511194944_add_packet_log_id_to_tables.rb @@ -0,0 +1,18 @@ +class AddPacketLogIdToTables < ActiveRecord::Migration[5.1] + def change + ActiveRecord::Base.connection.tables.each do |table| + # Since the decommutation tables are created dynamically we search + # through all the tables looking for tables named something like + # tXXX_YYY where XXX is the PacketConfig ID and YYY is the table index + if table.to_s =~ /^t(\d+)_(\d+)$/ # ASCII art? No! Regex! + packet_config_id = $1.to_i + table_index = $2.to_i + + ["", "_m", "_h", "_d"].each do |modifier| + table_name = table + modifier + add_column table_name, :packet_log_id, :integer + end + end + end + end +end diff --git a/lib/cosmos/dart/db/schema.rb b/lib/cosmos/dart/db/schema.rb index 9bf1edc23..d7420e47f 100644 --- a/lib/cosmos/dart/db/schema.rb +++ b/lib/cosmos/dart/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 20180116214338) do +ActiveRecord::Schema.define(version: 20180511194944) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" @@ -54,18 +54,11 @@ t.datetime "time", null: false t.integer "packet_log_id", null: false t.bigint "data_offset", null: false - t.datetime "created_at", null: false - t.datetime "updated_at", null: false t.bigint "meta_id" t.boolean "is_tlm", null: false t.integer "decom_state", default: 0 t.boolean "ready", default: false - t.index ["is_tlm"], name: "index_packet_log_entries_on_is_tlm" - t.index ["meta_id"], name: "index_packet_log_entries_on_meta_id" - t.index ["packet_id"], name: "index_packet_log_entries_on_packet_id" - t.index ["packet_log_id"], name: "index_packet_log_entries_on_packet_log_id" - t.index ["ready"], name: "index_packet_log_entries_on_ready" - t.index ["target_id"], name: "index_packet_log_entries_on_target_id" + t.index ["decom_state"], name: "index_packet_log_entries_on_decom_state" t.index ["time"], name: "index_packet_log_entries_on_time" end @@ -86,6 +79,19 @@ t.index ["target_id", "name", "is_tlm"], name: "index_packets_on_target_id_and_name_and_is_tlm", unique: true end + create_table "statuses", force: :cascade do |t| + t.bigint "decom_count", default: 0 + t.bigint "decom_error_count", default: 0 + t.text "decom_message", default: "" + t.datetime "decom_message_time" + t.bigint "reduction_count", default: 0 + t.bigint "reduction_error_count", default: 0 + t.text "reduction_message", default: "" + t.datetime "reduction_message_time" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + end + create_table "system_configs", force: :cascade do |t| t.string "name", null: false t.datetime "created_at", null: false diff --git a/lib/cosmos/dart/db/seeds.rb b/lib/cosmos/dart/db/seeds.rb index 1beea2acc..dc25c0111 100644 --- a/lib/cosmos/dart/db/seeds.rb +++ b/lib/cosmos/dart/db/seeds.rb @@ -5,3 +5,11 @@ # # movies = Movie.create([{ name: 'Star Wars' }, { name: 'Lord of the Rings' }]) # Character.create(name: 'Luke', movie: movies.first) +status = Status.first +unless status + status = Status.new + time = Time.utc(1970, 1, 1) + status.decom_message_time = time + status.reduction_message_time = time + status.save! +end diff --git a/lib/cosmos/dart/lib/dart_common.rb b/lib/cosmos/dart/lib/dart_common.rb index 2b268e5f9..8279c737a 100644 --- a/lib/cosmos/dart/lib/dart_common.rb +++ b/lib/cosmos/dart/lib/dart_common.rb @@ -165,6 +165,7 @@ def setup_packet_config(packet, packet_id, packet_config) t.bigint :ple_id t.bigint :meta_id t.bigint :reduced_id + t.integer :packet_log_id t.integer :reduced_state, :default => 0 table_data_types.each_with_index do |data_type, index| item_index = (table_index * MAX_COLUMNS_PER_TABLE) + index @@ -183,9 +184,7 @@ def setup_packet_config(packet, packet_id, packet_config) end end t.index :time - t.index :meta_id - t.index :reduced_state - t.index :reduced_id + t.index :reduced_state, :where => "reduced_state < 2" end create_reduction_table("t#{packet_config.id}_#{table_index}_h", table_data_types, table_index) # hour create_reduction_table("t#{packet_config.id}_#{table_index}_m", table_data_types, table_index) # month @@ -524,6 +523,7 @@ def lookup_item_id(packet_id, item_name) # @param array_size [Integer, nil] Size of the array or nil if no array # @return [Symbol] Database type such as :integer, :bigint, :string, etc. def cosmos_data_type_to_db_type(data_type, bit_size, array_size) + return nil if data_type.nil? or bit_size.nil? db_type = nil case data_type when :INT @@ -674,6 +674,7 @@ def create_reduction_table(table_name, table_data_types, table_index) t.integer :num_samples t.bigint :meta_id t.bigint :reduced_id + t.integer :packet_log_id t.integer :reduced_state, :default => 0 table_data_types.each_with_index do |data_type, index| item_index = (table_index * MAX_COLUMNS_PER_TABLE) + index @@ -686,9 +687,7 @@ def create_reduction_table(table_name, table_data_types, table_index) end end t.index :start_time - t.index :meta_id - t.index :reduced_state - t.index :reduced_id + t.index :reduced_state, :where => "reduced_state < 2" end end diff --git a/lib/cosmos/dart/lib/dart_database_cleaner.rb b/lib/cosmos/dart/lib/dart_database_cleaner.rb index fc8bf1e42..b7a558f24 100644 --- a/lib/cosmos/dart/lib/dart_database_cleaner.rb +++ b/lib/cosmos/dart/lib/dart_database_cleaner.rb @@ -9,6 +9,7 @@ # attribution addendums as found in the LICENSE.txt require 'dart_common' +require 'cosmos/packet_logs/packet_log_reader' class DartDatabaseCleaner include DartCommon @@ -17,18 +18,68 @@ class DartDatabaseCleaner # if the DART packet logs were moved and force is false (default). This is # deliberate because force causes all the lost (or moved) files to be deleted # which forces them to be re-imported at their new location. - def self.clean(force) + def self.clean(force, full = false) Cosmos::Logger::info("Starting database cleanup...") cleaner = DartDatabaseCleaner.new cleaner.clean_system_configs() cleaner.clean_packet_logs(force) cleaner.clean_packet_configs() cleaner.clean_packet_log_entries() - cleaner.clean_decommutation_tables() - cleaner.clean_reductions() + if full + cleaner.clean_decommutation_tables() + cleaner.clean_reductions() + end Cosmos::Logger::info("Database cleanup complete!") end + def remove_packet_log(filename) + filename = filename.gsub("\\", "/") # Fix slashes + filename = File.expand_path(filename, Cosmos::System.paths['DART_DATA']) # Make absolute path + if File.exists?(filename) + packet_log = PacketLog.where("filename = ?", filename).first + if packet_log + size = File.size(filename) + reader = Cosmos::PacketLogReader.new + reader.open(filename) + first_packet = nil + last_packet = nil + start_time = nil + end_time = nil + begin + first_packet = reader.first + last_packet = reader.last + start_time = first_packet.received_time + end_time = last_packet.received_time + rescue + if size == Cosmos::PacketLogReader::COSMOS2_HEADER_LENGTH or size == 0 + Cosmos::Logger::error("File contains no packets: #{filename}") + else + Cosmos::Logger::error("Error analyzing file: #{filename}") + end + exit(1) + ensure + reader.close + end + num_deleted = PacketLogEntry.where("time >= ? and time <= ? and packet_log_id = ?", start_time - 1.minute, end_time + 1.minute, packet_log.id).delete_all + Cosmos::Logger::info("Removed #{num_deleted} PacketLogEntries") + each_decom_and_reduced_table() do |packet_config_id, table_index, decom_model, minute_model, hour_model, day_model| + num_deleted = 0 + num_deleted += decom_model.where("time >= ? and time <= ? and packet_log_id = ?", start_time - 1.minute, end_time + 1.minute, packet_log.id).delete_all + num_deleted += minute_model.where("start_time >= ? and start_time <= ? and packet_log_id = ?", start_time - 1.minute, end_time + 2.minutes, packet_log.id).delete_all + num_deleted += hour_model.where("start_time >= ? and start_time <= ? and packet_log_id = ?", start_time - 1.minute, end_time + 1.hour + 1.minutes, packet_log.id).delete_all + num_deleted += day_model.where("start_time >= ? and start_time <= ? and packet_log_id = ?", start_time - 1.minute, end_time + 1.hour + 1.day, packet_log.id).delete_all + Cosmos::Logger::info("Deleted #{num_deleted} rows from #{decom_model.table_name} and reductions") + end + packet_log.delete + Cosmos::Logger::info("Packet Log Id #{packet_log.id} deleted") + else + Cosmos::Logger::error("File does not exist in database: #{filename}") + end + else + Cosmos::Logger::error("File does not exist: #{filename}") + end + end + # Ensure we have all the System Configs locally on the DART machine def clean_system_configs Cosmos::Logger::info("Cleaning up SystemConfig...") @@ -95,7 +146,7 @@ def clean_packet_configs end rescue => err Cosmos::Logger::error("Error cleaning up packet config: #{packet_config.id}: #{err.formatted}") - raise "Cleanup failure - Database requires manual correction" + raise $!, "Cleanup failure - Database requires manual correction: #{err.message}", $!.backtrace end end end @@ -108,7 +159,7 @@ def clean_packet_log_entries # outstanding entries which are not ready while packets are being received. # Note the normal shutdown process attempts to flush the log file and mark # all outstanding entries as ready so this would only happen during a crash. - ples = PacketLogEntry.where("ready != true") + ples = PacketLogEntry.where("decom_state = 0 and ready != true") return unless ples.length > 0 Cosmos::Logger::info("Removing unready packet log entries: #{ples.length}") ples.destroy_all diff --git a/lib/cosmos/dart/lib/dart_decom_query.rb b/lib/cosmos/dart/lib/dart_decom_query.rb index 21ec6cec1..fbbd42be5 100644 --- a/lib/cosmos/dart/lib/dart_decom_query.rb +++ b/lib/cosmos/dart/lib/dart_decom_query.rb @@ -53,7 +53,7 @@ class DartDecomQuery # for other reduction values), and meta_id. def query(request) request_start_time = Time.now - Cosmos::Logger.info("#{request_start_time.formatted}: QUERY: #{request}") + Cosmos::Logger.info("#{request_start_time.formatted}: query: #{request}") begin start_time_sec = request['start_time_sec'] @@ -136,7 +136,7 @@ def query(request) unless meta_ids.length > 0 meta_filters = request['meta_filters'] meta_filters ||= [] - + if meta_filters.length > 0 meta_ids = process_meta_filters(meta_filters, is_tlm, end_time) end @@ -149,10 +149,10 @@ def query(request) offset = 0 if offset < 0 return query_decom_reduced( - item[0], item[1], item[2], - value_type, is_tlm, - start_time, end_time, - reduction, reduction_modifier, + item[0], item[1], item[2], + value_type, is_tlm, + start_time, end_time, + reduction, reduction_modifier, item_name_modifier, limit, offset, meta_ids) rescue Exception => error @@ -168,6 +168,8 @@ def query(request) # @param is_tlm true or false # @return [Array] Array of item names def item_names(target_name, packet_name, is_tlm = true) + Cosmos::Logger.info("#{time.formatted}: item_names") + target = Target.where("name = ?", target_name).first raise "Target #{target_name} not found" unless target @@ -181,4 +183,104 @@ def item_names(target_name, packet_name, is_tlm = true) return item_names end + # Returns status on the DART Database + def dart_status + start_time = Time.now + Cosmos::Logger.info("#{start_time.formatted}: dart_status") + result = {} + status = Status.first + + # Ingest Status + # --------------- + # Last PacketLogEntry Id + last_ple = PacketLogEntry.select("id").last + if last_ple + result[:LAST_PLE_ID] = last_ple.id + else + result[:LAST_PLE_ID] = -1 + end + # Num PacketLogEntries Needing Decom state = 0 + result[:PLE_STATE_NEED_DECOM] = PacketLogEntry.where("decom_state = 0").count + # Num PacketLogEntries Errored - state >= 3 + result[:PLE_STATE_ERROR] = PacketLogEntry.where("decom_state >= 3").count + # First Time in Database + sort_first_ple = PacketLogEntry.order("time ASC").select("time").first + if sort_first_ple + result[:PLE_FIRST_TIME_S] = sort_first_ple.time.tv_sec + result[:PLE_FIRST_TIME_US] = sort_first_ple.time.tv_usec + else + result[:PLE_FIRST_TIME_S] = 0 + result[:PLE_FIRST_TIME_US] = 0 + end + # Last Time in Database + sort_last_ple = PacketLogEntry.order("time DESC").select("time").first + if sort_last_ple + result[:PLE_LAST_TIME_S] = sort_last_ple.time.tv_sec + result[:PLE_LAST_TIME_US] = sort_last_ple.time.tv_usec + else + result[:PLE_LAST_TIME_S] = 0 + result[:PLE_LAST_TIME_US] = 0 + end + + # Decom Status + # --------------- + # Decom Count + result[:DECOM_COUNT] = status.decom_count + # Decom Errors + result[:DECOM_ERROR_COUNT] = status.decom_error_count + # Decom Message + result[:DECOM_MESSAGE] = status.decom_message + # Decom Message Time + result[:DECOM_MESSAGE_TIME_S] = status.decom_message_time.tv_sec + result[:DECOM_MESSAGE_TIME_US] = status.decom_message_time.tv_usec + + # Reduction Status + # --------------- + # Reduction Count + result[:REDUCTION_COUNT] = status.reduction_count + # Reduction Errors + result[:REDUCTION_ERROR_COUNT] = status.reduction_error_count + # Reduction Message + result[:REDUCTION_MESSAGE] = status.reduction_message + # Reduction Time + result[:REDUCTION_MESSAGE_TIME_S] = status.reduction_message_time.tv_sec + result[:REDUCTION_MESSAGE_TIME_US] = status.reduction_message_time.tv_usec + + # Storage + # --------------- + Cosmos.set_working_dir do + # Size of outputs/dart/data folder + result[:DART_DATA_BYTES] = Dir.glob(File.join(Cosmos::System.paths['DART_DATA'], '**', '*')).map{ |f| File.size(f) }.inject(:+) + # Size of outputs/dart/logs folder + result[:DART_LOGS_BYTES] = Dir.glob(File.join(Cosmos::System.paths['DART_LOGS'], '**', '*')).map{ |f| File.size(f) }.inject(:+) + end + # Size of SQL Database + begin + result[:DART_DATABASE_BYTES] = ActiveRecord::Base.connection.execute("select pg_database_size('#{ActiveRecord::Base.connection_config[:database]}');")[0]['pg_database_size'] + rescue + result[:DART_DATABASE_BYTES] = -1 + end + + end_time = Time.now + delta = end_time - start_time + result[:DART_STATUS_SECONDS] = delta + + return result + end + + def clear_errors + time = Time.now + Cosmos::Logger.info("#{time.formatted}: clear_errors") + status = Status.first + status.decom_error_count = 0 + status.decom_message = '' + status.decom_message_time = time + status.reduction_error_count = 0 + status.reduction_message = '' + status.reduction_message_time = time + status.save! + + return nil + end + end diff --git a/lib/cosmos/dart/lib/dart_decommutator.rb b/lib/cosmos/dart/lib/dart_decommutator.rb index 8ec9290f0..9ee051d16 100644 --- a/lib/cosmos/dart/lib/dart_decommutator.rb +++ b/lib/cosmos/dart/lib/dart_decommutator.rb @@ -12,6 +12,24 @@ require 'dart_logging' require 'packet_log_entry' +class DartDecommutatorStatus + attr_accessor :count + attr_accessor :error_count + attr_accessor :message + attr_accessor :message_time + + def initialize + @count = 0 + @error_count = 0 + @message = '' + @message_time = Time.now + @cached_meta_ple = nil + @cached_system_meta = nil + @cached_system_meta_id = nil + @cached_system_config = nil + end +end + class DartDecommutator include DartCommon @@ -22,10 +40,12 @@ def initialize(worker_id = 0, num_workers = 1) sync_targets_and_packets() @worker_id = worker_id @num_workers = num_workers + @status = DartDecommutatorStatus.new end # Run forever looking for data to decommutate def run + status_time = Time.now + 60.seconds while true time_start = Time.now # Remember start time so we can throttle # Get all entries that are ready and decommutation hasn't started @@ -34,7 +54,6 @@ def run where("id % #{@num_workers} = #{@worker_id}").in_batches do |group| group.each do |ple| begin - # TODO - Optimize and cache meta and system config and packet config lookup meta_ple = get_meta_ple(ple) next unless meta_ple system_meta = get_system_meta(ple, meta_ple) @@ -48,12 +67,28 @@ def run # If we timeout this code will simply exit the application wait_for_ready_packet_config(packet_config) decom_packet(ple, packet, packet_config) + + # Update status + if Time.now > status_time + status_time = Time.now + 60.seconds + status = Status.first + if (Time.now - @status.message_time) <= 60.0 + status.decom_message = @status.message + status.decom_message_time = @status.message_time + status.save! + end + if @status.count > 0 or @status.error_count > 0 + Status.update_counters(status.id, :decom_count => @status.count, :decom_error_count => @status.error_count) + @status.count = 0 + @status.error_count = 0 + end + end rescue => err - Cosmos::Logger.error("PLE:#{ple.id}:ERROR") - Cosmos::Logger.error(err.formatted) + handle_error("PLE:#{ple.id}:ERROR\n#{err.formatted}") end end # each ple end # batches + # Throttle to no faster than 1 Hz delta = Time.now - time_start sleep(1 - delta) if delta < 1 && delta > 0 @@ -65,46 +100,61 @@ def run protected def get_meta_ple(ple) + return @cached_meta_ple if @cached_meta_ple and @cached_meta_ple.id == ple.meta_id + if ple.meta_id != ple.id - PacketLogEntry.find(ple.meta_id) + meta_ple = PacketLogEntry.find(ple.meta_id) else - ple + meta_ple = ple end + @cached_meta_ple = meta_ple + return meta_ple rescue => err ple.decom_state = PacketLogEntry::NO_META_PLE ple.save! - Cosmos::Logger.error("PLE:#{ple.id}:#{ple.decom_state_string}") + handle_error("PLE:#{ple.id}:#{ple.decom_state_string}") nil end def get_system_meta(ple, meta_ple) + return @cached_system_meta if @cached_system_meta and @cached_system_meta_id == meta_ple.id + system_meta = read_packet_from_ple(meta_ple) - return system_meta if system_meta + if system_meta + @cached_system_meta_id = meta_ple.id + @cached_system_meta = system_meta + return system_meta + end ple.decom_state = PacketLogEntry::NO_META_PACKET ple.save! - Cosmos::Logger.error("PLE:#{ple.id}:#{ple.decom_state_string}") + handle_error("PLE:#{ple.id}:#{ple.decom_state_string}") nil end def get_system_config(ple, system_meta) system_config_name = system_meta.read("CONFIG") - system_config = SystemConfig.where("name = ?", system_config_name).first - unless system_config - begin - # Try to create a new SystemConfig since it didn't exist - system_config = SystemConfig.create(:name => system_config_name) - rescue - # Another thread probably already created it - Try to get it one more time - system_config = SystemConfig.where("name = ?", system_config_name).first + if @cached_system_config and @cached_system_config.name == system_config_name + system_config = @cached_system_config + else + system_config = SystemConfig.where("name = ?", system_config_name).first + unless system_config + begin + # Try to create a new SystemConfig since it didn't exist + system_config = SystemConfig.create(:name => system_config_name) + rescue + # Another thread probably already created it - Try to get it one more time + system_config = SystemConfig.where("name = ?", system_config_name).first + end end end unless system_config ple.decom_state = PacketLogEntry::NO_SYSTEM_CONFIG ple.save! - Cosmos::Logger.error("PLE:#{ple.id}:#{ple.decom_state_string}") + handle_error("PLE:#{ple.id}:#{ple.decom_state_string}") return nil end + @cached_system_config = system_config # Switch to this system_config begin @@ -113,7 +163,7 @@ def get_system_config(ple, system_meta) Cosmos::Logger.error("Could not load system_config: #{system_config_name}") ple.decom_state = PacketLogEntry::NO_CONFIG ple.save! - Cosmos::Logger.error("PLE:#{ple.id}:#{ple.decom_state_string}") + handle_error("PLE:#{ple.id}:#{ple.decom_state_string}") return nil end system_config @@ -125,7 +175,7 @@ def get_packet(ple) ple.decom_state = PacketLogEntry::NO_PACKET ple.save! - Cosmos::Logger.error("PLE:#{ple.id}:#{ple.decom_state_string}") + handle_error("PLE:#{ple.id}:#{ple.decom_state_string}") nil end @@ -141,14 +191,14 @@ def get_packet_config(ple, packet, system_config) setup_packet_config(packet, packet_id, packet_config) Cosmos::Logger.info("Successfully Created PacketConfig: #{packet.config_name}") rescue => err - Cosmos::Logger.error(err.formatted) + handle_error(err.formatted) # Another thread probably already created it - Try to get it one more time packet_config = PacketConfig.where("packet_id = ? and name = ?", packet_id, packet.config_name).first end unless packet_config ple.decom_state = PacketLogEntry::NO_PACKET_CONFIG ple.save! - Cosmos::Logger.error("PLE:#{ple.id}:#{ple.decom_state_string}") + handle_error("PLE:#{ple.id}:#{ple.decom_state_string}") return nil end packet_config @@ -167,7 +217,7 @@ def wait_for_ready_packet_config(packet_config) packet_config.reload if (Time.now - ready_wait_start) > PACKET_CONFIG_READY_TIMEOUT - Cosmos::Logger.fatal("Timeout waiting for ready on PacketConfig:#{packet_config.id}") + handle_error("Timeout waiting for ready on PacketConfig:#{packet_config.id}") exit(1) end end @@ -211,6 +261,7 @@ def decom_packet(ple, packet, packet_config) row = model.new row.time = ple.time row.ple_id = ple.id + row.packet_log_id = ple.packet_log_id row.meta_id = ple.meta_id row.reduced_state = INITIALIZING table_values.each_with_index do |value, index| @@ -230,6 +281,14 @@ def decom_packet(ple, packet, packet_config) # The log entry has been decommutated, mark COMPLETE ple.decom_state = PacketLogEntry::COMPLETE ple.save! + @status.count += 1 Cosmos::Logger.debug("PLE:#{ple.id}:#{ple.decom_state_string}") end + + def handle_error(message) + Cosmos::Logger.error(message) + @status.error_count += 1 + @status.message = message + @status.message_time = Time.now + end end diff --git a/lib/cosmos/dart/lib/dart_importer.rb b/lib/cosmos/dart/lib/dart_importer.rb index 4dfcc83b7..74152c195 100644 --- a/lib/cosmos/dart/lib/dart_importer.rb +++ b/lib/cosmos/dart/lib/dart_importer.rb @@ -80,10 +80,12 @@ def import(filename, force) Cosmos::Logger.info("First and Last Packet in File not in database") # Check if time range of packets is not present in database - ple = PacketLogEntry.where("time >= ? or time <= ?", first_packet.received_time, last_packet.received_time).first + ple = PacketLogEntry.where("time >= ? and time <= ?", first_packet.received_time, last_packet.received_time).first if !ple # Can go fast if not present at all Cosmos::Logger.info(" Fast Import Enabled...") fast = true + else + Cosmos::Logger.warn("Time range already in database. Will verify each packet before adding") end else Cosmos::Logger.warn("File partially in database. Will verify each packet before adding") @@ -97,6 +99,8 @@ def import(filename, force) # Read File and Create PacketLogEntries count = 0 meta_id = nil + ple_data = "" + ple_data_count = 0 plr.open(filename) data_offset = plr.bytes_read plr.each(filename) do |packet| @@ -117,27 +121,39 @@ def import(filename, force) # No PacketLogEntry was found so create one from scratch unless ple - ple = PacketLogEntry.new - ple.target_id = target_id - ple.packet_id = packet_id - ple.time = packet.received_time - ple.packet_log_id = packet_log.id - ple.data_offset = data_offset - ple.meta_id = meta_id - ple.is_tlm = is_tlm - ple.ready = true - ple.save! - count += 1 - # SYSTEM META packets are special in that their meta_id is their own # PacketLogEntry ID from the database. All other packets have meta_id # values which point back to the last SYSTEM META PacketLogEntry ID. if target_name == 'SYSTEM'.freeze and packet_name == 'META'.freeze + if ple_data.length > 0 + ActiveRecord::Base.connection.execute("INSERT INTO packet_log_entries (target_id, packet_id, time, packet_log_id, data_offset, meta_id, is_tlm, ready) VALUES #{ple_data}") + ple_data.clear + ple_data_count = 0 + end + + ple = PacketLogEntry.new + ple.target_id = target_id + ple.packet_id = packet_id + ple.time = packet.received_time + ple.packet_log_id = packet_log.id + ple.data_offset = data_offset + ple.meta_id = meta_id + ple.is_tlm = is_tlm + ple.ready = false + ple.save!(validate: false) + # Need to update meta_id for this and all subsequent packets meta_id = ple.id ple.meta_id = meta_id - ple.save! + ple.ready = true + ple.save!(validate: false) + else + ple_data << "," if ple_data.length > 0 + ple_data << "(#{target_id},#{packet_id},'#{packet.received_time.dup.utc.iso8601(6)}',#{packet_log.id},#{data_offset},#{meta_id},#{is_tlm},true)" + ple_data_count += 1 end + + count += 1 else # A PacketLogEntry was found so this packet is skipped # If the packet is a SYSTEM META packet we keep track of the meta_id # for use in subsequent packets that aren't already in the database. @@ -146,8 +162,22 @@ def import(filename, force) meta_id = ple.id end end + + if ple_data_count >= 1000 + ActiveRecord::Base.connection.execute("INSERT INTO packet_log_entries (target_id, packet_id, time, packet_log_id, data_offset, meta_id, is_tlm, ready) VALUES #{ple_data}") + ple_data.clear + ple_data_count = 0 + end + data_offset = plr.bytes_read end + + if ple_data.length > 0 + ActiveRecord::Base.connection.execute("INSERT INTO packet_log_entries (target_id, packet_id, time, packet_log_id, data_offset, meta_id, is_tlm, ready) VALUES #{ple_data}") + ple_data.clear + ple_data_count = 0 + end + Cosmos::Logger.info("Added #{count} packet log entries to database") return 0 # Success code end diff --git a/lib/cosmos/dart/lib/dart_packet_log_writer.rb b/lib/cosmos/dart/lib/dart_packet_log_writer.rb index eb148b5b2..3568a5590 100644 --- a/lib/cosmos/dart/lib/dart_packet_log_writer.rb +++ b/lib/cosmos/dart/lib/dart_packet_log_writer.rb @@ -29,7 +29,13 @@ def initialize(*args) @db_queue = Queue.new @sync_count = 0 @sync_count_limit = DEFAULT_SYNC_COUNT_LIMIT - @not_ready_ple_ids = [] + @ple_data = "" + @sync_ple = nil + if @log_type == :TLM + @is_tlm = true + else + @is_tlm = false + end sync_targets_and_packets() @@ -42,6 +48,13 @@ def initialize(*args) def shutdown super() Cosmos.kill_thread(self, @db_thread) + handle_sync_ple() + queue_ple_data() + while @db_queue.length > 0 + ple_data = @db_queue.pop + break if ple_data.nil? + ActiveRecord::Base.connection.execute("INSERT INTO packet_log_entries (target_id, packet_id, time, packet_log_id, data_offset, meta_id, is_tlm, ready) VALUES #{ple_data}") + end end # Kick the database update thread to allow it to quit @@ -54,33 +67,75 @@ def graceful_kill # Override the default new file hook to create a PacketLog entry in the database def start_new_file_hook(packet) - # When we create a new file we mark any existing PLEs ready - PacketLogEntry.where("id" => @not_ready_ple_ids).update_all(ready: true) - @not_ready_ple_ids.clear - @sync_count = 0 + # When we create a new file we write out any queued ple_data + if @ple_data.length > 0 + @db_queue << @ple_data.clone + @ple_data.clear + @sync_count = 0 + end packet_log = PacketLog.new packet_log.filename = @filename.clone - if @log_type == :TLM - packet_log.is_tlm = true - else - packet_log.is_tlm = false - end + packet_log.is_tlm = @is_tlm packet_log.save! @packet_log_id = packet_log.id super(packet) end + def queue_ple_data + if @ple_data.length > 0 + @db_queue << @ple_data.clone + @ple_data.clear + @sync_count = 0 + end + end + + def handle_sync_ple + if @sync_ple + @file.fsync if @file + @sync_ple.ready = true + @sync_ple.save! + @sync_ple = nil + queue_ple_data() + end + end + # Override the default pre write hook to pop a message on the queue which # will be processed by the database thread. This also writes out the log # files to disk periodically for use by other DART processes. def pre_write_entry_hook(packet) - @sync_count += 1 - if @sync_count > @sync_count_limit - @file.fsync - @sync_count = 0 + handle_sync_ple() + + # SYSTEM META packets are special in that their meta_id is their own + # PacketLogEntry ID from the database. All other packets have meta_id + # values which point back to the last SYSTEM META PacketLogEntry ID. + target_id, packet_id = lookup_target_and_packet_id(packet.target_name, packet.packet_name, @is_tlm) + if packet.target_name == 'SYSTEM'.freeze and packet.packet_name == 'META'.freeze + ple = PacketLogEntry.new + ple.target_id = target_id + ple.packet_id = packet_id + ple.time = packet.received_time + ple.packet_log_id = @packet_log_id + ple.data_offset = @file_size + ple.meta_id = @meta_id + ple.is_tlm = @is_tlm + ple.ready = false + ple.save! + @meta_id = ple.id + ple.meta_id = @meta_id + ple.save! + @sync_ple = ple + else + @sync_count += 1 + if @sync_count > @sync_count_limit + @file.fsync + @db_queue << @ple_data.clone + @ple_data.clear + @sync_count = 0 + end + @ple_data << "," if @ple_data.length > 0 + @ple_data << "(#{target_id},#{packet_id},'#{packet.received_time.dup.utc.iso8601(6)}',#{@packet_log_id},#{@file_size},#{@meta_id},#{@is_tlm},true)" end - @db_queue << [packet.target_name, packet.packet_name, packet.received_time, @file_size, @packet_log_id, @sync_count] end # Build the target / packet table lookup table and then wait on the queue @@ -88,52 +143,15 @@ def pre_write_entry_hook(packet) # PacketLogEntry table. Each entry identifies a packet in the log file by # its target, packet, time, and data offset (among other things). def db_thread_body - if @log_type == :TLM - is_tlm = true - else - is_tlm = false - end - while true begin - target_name, packet_name, time, data_offset, packet_log_id, sync_count = @db_queue.pop - # Every time the sync_count resets by the pre_write_entry_hook the file - # is written out to disk. Thus we mark all the PacketLogEntrys to ready - # since we know the packets have been written to disk. - if sync_count == 0 or sync_count.nil? - PacketLogEntry.where("id" => @not_ready_ple_ids).update_all(ready: true) - @not_ready_ple_ids.clear - end - return if @cancel_threads or sync_count.nil? + ple_data = @db_queue.pop + return if @cancel_threads or ple_data.nil? + ActiveRecord::Base.connection.execute("INSERT INTO packet_log_entries (target_id, packet_id, time, packet_log_id, data_offset, meta_id, is_tlm, ready) VALUES #{ple_data}") rescue ThreadError # This can happen when the thread is killed return end - - target_id, packet_id = lookup_target_and_packet_id(target_name, packet_name, is_tlm) - - ple = PacketLogEntry.new - ple.target_id = target_id - ple.packet_id = packet_id - ple.time = time - ple.packet_log_id = packet_log_id - ple.data_offset = data_offset - ple.meta_id = @meta_id - ple.is_tlm = is_tlm - ple.ready = false - ple.save! - - # SYSTEM META packets are special in that their meta_id is their own - # PacketLogEntry ID from the database. All other packets have meta_id - # values which point back to the last SYSTEM META PacketLogEntry ID. - if target_name == 'SYSTEM'.freeze and packet_name == 'META'.freeze - # Need to update meta_id for this and all subsequent packets - @meta_id = ple.id - ple.meta_id = @meta_id - ple.save! - end - # Remember this new PacketLogEntry so we can mark it ready later - @not_ready_ple_ids << ple.id end end end diff --git a/lib/cosmos/dart/lib/dart_reducer_manager.rb b/lib/cosmos/dart/lib/dart_reducer_manager.rb index 1eb3156a4..dbf7533ea 100644 --- a/lib/cosmos/dart/lib/dart_reducer_manager.rb +++ b/lib/cosmos/dart/lib/dart_reducer_manager.rb @@ -10,6 +10,20 @@ require 'dart_reducer_worker_thread' +class DartReducerStatus + attr_accessor :count + attr_accessor :error_count + attr_accessor :message + attr_accessor :message_time + + def initialize + @count = 0 + @error_count = 0 + @message = '' + @message_time = Time.now + end +end + # Reduce the decommutated data into the database. It creates a number of # threads to perform the actual data reduction. Then it queries the database # for all the decommutation tables and determines which need to be reduced @@ -21,13 +35,15 @@ class DartReducerManager # # @param num_threads [Integer] The number of worker threads to create def initialize(num_threads = 5) - Cosmos::Logger.info("Dart Reducer Starting with #{num_threads} threads...") + message = "Dart Reducer Starting with #{num_threads} threads..." + Cosmos::Logger.info(message) @master_queue = Queue.new @locked_tables = [] @mutex = Mutex.new @threads = [] + @status = DartReducerStatus.new num_threads.times do |index| - @threads << DartReducerWorkerThread.new(@master_queue, @locked_tables, @mutex, index + 1) + @threads << DartReducerWorkerThread.new(@master_queue, @locked_tables, @mutex, index + 1, @status) end end @@ -51,6 +67,20 @@ def run queue_worker(:HOUR, packet_config_id, table_index, minute_model, hour_model) queue_worker(:DAY, packet_config_id, table_index, hour_model, day_model) end + + # Update status + status = Status.first + if (Time.now - @status.message_time) <= 60.0 + status.reduction_message = @status.message + status.reduction_message_time = @status.message_time + status.save! + end + if @status.count > 0 or @status.error_count > 0 + Status.update_counters(status.id, :reduction_count => @status.count, :reduction_error_count => @status.error_count) + @status.count = 0 + @status.error_count = 0 + end + # Throttle to no faster than once every 60 seconds delta = Time.now - time_start if delta < 60 and delta > 0 @@ -58,7 +88,8 @@ def run end end rescue Interrupt - Cosmos::Logger.info("Dart Reducer Shutting Down...") + message = "Dart Reducer Shutting Down..." + Cosmos::Logger.info(message) shutdown() exit(0) end diff --git a/lib/cosmos/dart/lib/dart_reducer_worker_thread.rb b/lib/cosmos/dart/lib/dart_reducer_worker_thread.rb index 5ba447c7b..eb8ee00d9 100644 --- a/lib/cosmos/dart/lib/dart_reducer_worker_thread.rb +++ b/lib/cosmos/dart/lib/dart_reducer_worker_thread.rb @@ -23,13 +23,15 @@ class DartReducerWorkerThread # third values are the PacketConfig ID and table index. # @param mutex [Mutex] Mutex used to synchronize access to the locked_tables # @param instance_num [Integer] Simple counter to trace the thread instance - def initialize(master_queue, locked_tables, mutex, instance_num) + # @param status [DartReducerStatus] Status structure + def initialize(master_queue, locked_tables, mutex, instance_num, status) @instance_num = instance_num @running = true @master_queue = master_queue @locked_tables = locked_tables @mutex = mutex @thread_queue = Queue.new + @status = status # Start the thread which will wait on @thread_queue.pop @thread = Thread.new { work() } # Add the local @thread_queue to the @master_queue so jobs can be added @@ -77,6 +79,7 @@ def work new_row.start_time = first_row_time new_row.num_samples = sample_rows.length new_row.meta_id = sample_rows[0].meta_id + new_row.packet_log_id = sample_rows[0].packet_log_id # Process each of the ItemToDecomTableMapping to get the item to be reduced mappings.each do |mapping| item_name = "i#{mapping.item_index}" @@ -105,7 +108,7 @@ def work max_sample = value avg_sample = value if value.nil? - Cosmos::Logger.error("#{item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}") + handle_error("#{item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}") next end else # :HOUR or :DAY @@ -116,19 +119,19 @@ def work avg_sample = row_to_reduce.read_attribute(avg_item_name) stddev_sample = row_to_reduce.read_attribute(stddev_item_name) if min_sample.nil? - Cosmos::Logger.error("#{min_item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}") + handle_error("#{min_item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}") next end if max_sample.nil? - Cosmos::Logger.error("#{max_item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}") + handle_error("#{max_item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}") next end if avg_sample.nil? - Cosmos::Logger.error("#{avg_item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}") + handle_error("#{avg_item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}") next end if stddev_sample.nil? - Cosmos::Logger.error("#{stddev_item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}") + handle_error("#{stddev_item_name} is nil in #{row_to_reduce.class}:#{row_to_reduce.id}") next end end @@ -193,6 +196,7 @@ def work base_model.where(id: sample_rows.map(&:id)).update_all(:reduced_id => new_row.id) new_row.reduced_state = DartCommon::READY_TO_REDUCE new_row.save! + @status.count += 1 rows = rows[-1..-1] # Start a new sample with the last item in the previous sample Cosmos::Logger.debug("Created #{new_row.class}:#{new_row.id} with #{mappings.length} items from #{new_row.num_samples} samples") @@ -201,7 +205,7 @@ def work end # while @running Cosmos::Logger.info("Reducer Thread #{@instance_num} Shutdown") rescue Exception => error - Cosmos::Logger.error("Reducer Thread Unexpectedly Died: #{error.formatted}") + handle_error("Reducer Thread Unexpectedly Died: #{error.formatted}") end # Shutdown the worker thread @@ -260,4 +264,11 @@ def job_attributes(job_type) raise "Reducer Thread Unexpected Job Type: #{job_type}" end end + + def handle_error(message) + Cosmos::Logger.error(message) + @status.error_count += 1 + @status.message = message + @status.message_time = Time.now + end end diff --git a/lib/cosmos/dart/processes/dart.rb b/lib/cosmos/dart/processes/dart.rb index 51e16c5bb..ab9f07fdf 100644 --- a/lib/cosmos/dart/processes/dart.rb +++ b/lib/cosmos/dart/processes/dart.rb @@ -8,6 +8,7 @@ # as published by the Free Software Foundation; version 3 with # attribution addendums as found in the LICENSE.txt +ENV['RAILS_ENV'] = 'production' require File.expand_path('../../config/environment', __FILE__) require 'dart_common' require 'dart_logging' @@ -45,7 +46,7 @@ def run end num_workers = ENV['DART_NUM_WORKERS'] - num_workers ||= 1 + num_workers ||= 2 num_workers = num_workers.to_i process_definitions = [ diff --git a/lib/cosmos/dart/processes/dart_decom_server.rb b/lib/cosmos/dart/processes/dart_decom_server.rb index 4d4a373db..2bf798ba6 100644 --- a/lib/cosmos/dart/processes/dart_decom_server.rb +++ b/lib/cosmos/dart/processes/dart_decom_server.rb @@ -8,6 +8,7 @@ # as published by the Free Software Foundation; version 3 with # attribution addendums as found in the LICENSE.txt +ENV['RAILS_ENV'] = 'production' require File.expand_path('../../config/environment', __FILE__) require 'dart_common' require 'dart_decom_query' @@ -21,7 +22,7 @@ json_drb = Cosmos::JsonDRb.new json_drb.acl = Cosmos::System.acl if Cosmos::System.acl begin - json_drb.method_whitelist = ['query', 'item_names'] + json_drb.method_whitelist = ['query', 'item_names', 'dart_status', 'clear_errors'] begin json_drb.start_service(Cosmos::System.listen_hosts['DART_DECOM'], Cosmos::System.ports['DART_DECOM'], DartDecomQuery.new) diff --git a/lib/cosmos/dart/processes/dart_import.rb b/lib/cosmos/dart/processes/dart_import.rb index 0d8953a9f..364155d88 100644 --- a/lib/cosmos/dart/processes/dart_import.rb +++ b/lib/cosmos/dart/processes/dart_import.rb @@ -54,6 +54,7 @@ exit(1) end +ENV['RAILS_ENV'] = 'production' require File.expand_path('../../config/environment', __FILE__) require 'dart_importer' diff --git a/lib/cosmos/dart/processes/dart_ingester.rb b/lib/cosmos/dart/processes/dart_ingester.rb index 323cb69fe..ff33cf6d7 100644 --- a/lib/cosmos/dart/processes/dart_ingester.rb +++ b/lib/cosmos/dart/processes/dart_ingester.rb @@ -8,6 +8,7 @@ # as published by the Free Software Foundation; version 3 with # attribution addendums as found in the LICENSE.txt +ENV['RAILS_ENV'] = 'production' require File.expand_path('../../config/environment', __FILE__) require 'dart_packet_log_writer' require 'dart_logging' diff --git a/lib/cosmos/dart/processes/dart_reducer.rb b/lib/cosmos/dart/processes/dart_reducer.rb index 24d264d79..305c0e2d0 100644 --- a/lib/cosmos/dart/processes/dart_reducer.rb +++ b/lib/cosmos/dart/processes/dart_reducer.rb @@ -8,6 +8,7 @@ # as published by the Free Software Foundation; version 3 with # attribution addendums as found in the LICENSE.txt +ENV['RAILS_ENV'] = 'production' require File.expand_path('../../config/environment', __FILE__) require 'dart_reducer_manager' require 'dart_logging' diff --git a/lib/cosmos/dart/processes/dart_stream_server.rb b/lib/cosmos/dart/processes/dart_stream_server.rb index 91c30ad4d..9a5253c56 100644 --- a/lib/cosmos/dart/processes/dart_stream_server.rb +++ b/lib/cosmos/dart/processes/dart_stream_server.rb @@ -8,6 +8,7 @@ # as published by the Free Software Foundation; version 3 with # attribution addendums as found in the LICENSE.txt +ENV['RAILS_ENV'] = 'production' require File.expand_path('../../config/environment', __FILE__) require 'dart_tcpip_server_interface' require 'dart_logging' diff --git a/lib/cosmos/dart/processes/dart_util.rb b/lib/cosmos/dart/processes/dart_util.rb new file mode 100644 index 000000000..73b815393 --- /dev/null +++ b/lib/cosmos/dart/processes/dart_util.rb @@ -0,0 +1,147 @@ +# encoding: ascii-8bit + +# Copyright 2018 Ball Aerospace & Technologies Corp. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addendums as found in the LICENSE.txt + +# This code must be run on the database server + +require 'ostruct' +require 'optparse' +require 'cosmos/version' + +options = OpenStruct.new +options.force = false + +parser = OptionParser.new do |option_parser| + option_parser.banner = "Usage: dart_util [options]" + option_parser.separator("") + + # Create the help option + option_parser.on("-h", "--help", "Show this message") do + puts option_parser + exit(0) + end + + # Create the version option + option_parser.on("-v", "--version", "Show version") do + puts "COSMOS Version: #{COSMOS_VERSION}" + puts "User Version: #{USER_VERSION}" if defined? USER_VERSION + exit(0) + end + + # Create the system option + option_parser.on("--system FILE", "Use an alternative system.txt file") do |arg| + System.instance(File.join(USERPATH, 'config', 'system', arg)) + end + + # Create the force option + option_parser.on("-f", "--force", "Force parsing entire file") do + options.force = true + end +end + +parser.parse!(ARGV) +action = ARGV[0] +unless action + puts parser + exit(1) +end + +ENV['RAILS_ENV'] = 'production' +require File.expand_path('../../config/environment', __FILE__) +require 'dart_database_cleaner' + +Cosmos.catch_fatal_exception do + case action.downcase + when 'showpleerrors' + ples = PacketLogEntry.where("decom_state >= 3").find_each do |ple| + puts "#{ple.id}: #{ple.decom_state_string}(#{ple.decom_state})" + end + + when 'resetpleerrors' + PacketLogEntry.where("decom_state >= 3").update_all(:decom_state => 0) + puts "All errored PLEs set to decom_state 0" + + when 'fullcleanup' + Cosmos::Logger.level = Cosmos::Logger::INFO + DartDatabaseCleaner.clean(false, true) + + when 'removepacketlog' + puts "Removing database entries for packet log #{ARGV[1]}" + puts "Note!!! This does not delete the file" + Cosmos::Logger.level = Cosmos::Logger::INFO + DartDatabaseCleaner.new.remove_packet_log(ARGV[1]) + + when 'showpacketlogs' + total_size = 0 + packet_logs = PacketLog.all + filenames = [] + reader = Cosmos::PacketLogReader.new + packet_logs.each do |pl| + filenames << pl.filename + if File.exist?(pl.filename) + exists = "FOUND " + size = File.size(pl.filename) + reader.open(pl.filename) + begin + first_packet = reader.first + last_packet = reader.last + start_time = first_packet.received_time.formatted + end_time = last_packet.received_time.formatted + rescue + if size == 128 or size == 0 + start_time = "EMPTY " + end_time = "EMPTY " + else + start_time = "ERROR " + end_time = "ERROR " + end + ensure + reader.close + end + else + size = 0 + start_time = "MISSING " + end_time = "MISSING " + exists = "MISSING" + end + puts "#{"%-32.32s" % File.basename(pl.filename)} #{exists} #{start_time} #{end_time} #{size}" + total_size += size + end + other_size = 0 + Cosmos.set_working_dir do + Dir[Cosmos::System.paths['DART_DATA'] + '/*.bin'].each do |filename| + next if filename[0] == '.' + next if filenames.include?(filename) + exists = "NOTINDB" + size = File.size(filename) + reader.open(filename) + begin + first_packet = reader.first + last_packet = reader.last + start_time = first_packet.received_time.formatted + end_time = last_packet.received_time.formatted + rescue + if size == 128 or size == 0 + start_time = "EMPTY " + end_time = "EMPTY " + else + start_time = "ERROR " + end_time = "ERROR " + end + ensure + reader.close + end + puts "#{"%-32.32s" % File.basename(filename)} #{exists} #{start_time} #{end_time} #{size}" + other_size += size + end + end + puts "Total size in database: #{"%0.2f GB" % (total_size.to_f / (1024 * 1024 * 1024))}" + puts "Total size not in database: #{"%0.2f GB" % (other_size.to_f / (1024 * 1024 * 1024))}" + end +end diff --git a/lib/cosmos/dart/processes/dart_worker.rb b/lib/cosmos/dart/processes/dart_worker.rb index 0142bb1f7..073dd8eb1 100644 --- a/lib/cosmos/dart/processes/dart_worker.rb +++ b/lib/cosmos/dart/processes/dart_worker.rb @@ -8,6 +8,7 @@ # as published by the Free Software Foundation; version 3 with # attribution addendums as found in the LICENSE.txt +ENV['RAILS_ENV'] = 'production' require File.expand_path('../../config/environment', __FILE__) require 'dart_common' require 'dart_logging' diff --git a/lib/cosmos/dart/spec/dart/dart_database_cleaner_spec.rb b/lib/cosmos/dart/spec/dart/dart_database_cleaner_spec.rb index 001ecb73f..23ba79a14 100644 --- a/lib/cosmos/dart/spec/dart/dart_database_cleaner_spec.rb +++ b/lib/cosmos/dart/spec/dart/dart_database_cleaner_spec.rb @@ -17,6 +17,7 @@ before(:each) do DatabaseCleaner.strategy = :truncation DatabaseCleaner.clean + Rails.application.load_seed @cleaner = DartDatabaseCleaner.new end @@ -29,7 +30,6 @@ expect(messages.select{|m| m =~ /Cleaning up PacketLog\./}.length).to eq 1 expect(messages.select{|m| m =~ /Cleaning up PacketConfig/}.length).to eq 1 expect(messages.select{|m| m =~ /Cleaning up PacketLogEntry/}.length).to eq 1 - expect(messages.select{|m| m =~ /Cleaning up Decommutation/}.length).to eq 1 expect(messages.select{|m| m =~ /Database cleanup complete/}.length).to eq 1 end end diff --git a/lib/cosmos/dart/spec/dart/dart_decom_query_spec.rb b/lib/cosmos/dart/spec/dart/dart_decom_query_spec.rb index 50937ca79..9e570e245 100644 --- a/lib/cosmos/dart/spec/dart/dart_decom_query_spec.rb +++ b/lib/cosmos/dart/spec/dart/dart_decom_query_spec.rb @@ -18,6 +18,7 @@ before(:each) do DatabaseCleaner.strategy = :truncation DatabaseCleaner.clean + Rails.application.load_seed @query = DartDecomQuery.new # Put all the known targets and packets into the DB @query.sync_targets_and_packets diff --git a/lib/cosmos/dart/spec/dart/dart_decommutator_spec.rb b/lib/cosmos/dart/spec/dart/dart_decommutator_spec.rb index bddf1f7e9..9dc3be252 100644 --- a/lib/cosmos/dart/spec/dart/dart_decommutator_spec.rb +++ b/lib/cosmos/dart/spec/dart/dart_decommutator_spec.rb @@ -17,6 +17,7 @@ before(:each) do DatabaseCleaner.strategy = :truncation DatabaseCleaner.clean + Rails.application.load_seed end describe "run" do diff --git a/lib/cosmos/dart/spec/dart/dart_importer_spec.rb b/lib/cosmos/dart/spec/dart/dart_importer_spec.rb index f64a24c30..d3a161518 100644 --- a/lib/cosmos/dart/spec/dart/dart_importer_spec.rb +++ b/lib/cosmos/dart/spec/dart/dart_importer_spec.rb @@ -25,6 +25,7 @@ before(:each) do DatabaseCleaner.strategy = :truncation DatabaseCleaner.clean + Rails.application.load_seed # Clean the dart logs Dir["#{Cosmos::System.paths['DART_LOGS']}/*"].each do |filename| FileUtils.rm_f filename diff --git a/lib/cosmos/dart/spec/dart/dart_packet_log_writer_spec.rb b/lib/cosmos/dart/spec/dart/dart_packet_log_writer_spec.rb index 64b0e19b0..d2fc5e1e2 100644 --- a/lib/cosmos/dart/spec/dart/dart_packet_log_writer_spec.rb +++ b/lib/cosmos/dart/spec/dart/dart_packet_log_writer_spec.rb @@ -15,10 +15,14 @@ before(:each) do DatabaseCleaner.strategy = :truncation DatabaseCleaner.clean + Rails.application.load_seed end describe "write" do it "creates PacketLogEntries and flushes the file" do + DatabaseCleaner.clean + Rails.application.load_seed + writer = DartPacketLogWriter.new( :TLM, # Log telemetry 'test_dart_tlm_', # Put dart_ in the log file name @@ -28,7 +32,7 @@ Cosmos::System.paths['DART_DATA']) # Log into the DART_DATA dir hs_packet = Cosmos::System.telemetry.packet("INST", "HEALTH_STATUS") - (DartPacketLogWriter::DEFAULT_SYNC_COUNT_LIMIT - 1).times do + (DartPacketLogWriter::DEFAULT_SYNC_COUNT_LIMIT).times do hs_packet.received_time = Time.now writer.write(hs_packet) sleep 0.01 @@ -39,7 +43,7 @@ ple = PacketLogEntry.find(1) expect(ple.target.name).to eq "SYSTEM" expect(ple.packet.name).to eq "META" - expect(ple.ready).to eq false # Hasn't been flushed yet + expect(ple.ready).to eq true packet = writer.read_packet_from_ple(ple) expect(packet.class).to eq Cosmos::Packet @@ -49,35 +53,38 @@ target = Target.find_by_name("INST") packet = Packet.find_by_name("HEALTH_STATUS") count = 0 - previous_time = Time.now - PacketLogEntry.where("target_id = ? and packet_id = ?", target.id, packet.id).each do |ple| - expect(ple.target.name).to eq "INST" - expect(ple.packet.name).to eq "HEALTH_STATUS" - expect(ple.ready).to eq false # Hasn't been flushed yet - - packet = writer.read_packet_from_ple(ple) - expect(packet.target_name).to eq "INST" - expect(packet.packet_name).to eq "HEALTH_STATUS" - expect(packet.received_time).to_not eq previous_time - previous_time = packet.received_time - count += 1 - end - expect(count).to eq (DartPacketLogWriter::DEFAULT_SYNC_COUNT_LIMIT - 1) + count = PacketLogEntry.where("target_id = ? and packet_id = ?", target.id, packet.id).count + expect(count).to eq (0) hs_packet.received_time = Time.now writer.write(hs_packet) # Write the packet that causes the flush sleep 0.1 count = 0 + previous_time = Time.now PacketLogEntry.all.each do |ple| - if count < DartPacketLogWriter::DEFAULT_SYNC_COUNT_LIMIT - expect(ple.ready).to eq true # Flushed! + if count == 0 + expect(ple.target.name).to eq "SYSTEM" + expect(ple.packet.name).to eq "META" + expect(ple.ready).to eq true + + packet = writer.read_packet_from_ple(ple) + expect(packet.target_name).to eq "SYSTEM" + expect(packet.packet_name).to eq "META" + expect(packet.received_time).to_not eq previous_time else - # The last write isn't flushed - expect(ple.ready).to eq false + expect(ple.target.name).to eq "INST" + expect(ple.packet.name).to eq "HEALTH_STATUS" + expect(ple.ready).to eq true + + packet = writer.read_packet_from_ple(ple) + expect(packet.target_name).to eq "INST" + expect(packet.packet_name).to eq "HEALTH_STATUS" + expect(packet.received_time).to_not eq previous_time end + previous_time = packet.received_time count += 1 end - # We wrote one SYSTEM META plus (DartPacketLogWriter::DEFAULT_SYNC_COUNT_LIMIT - 1) + # We wrote one SYSTEM META plus (DartPacketLogWriter::DEFAULT_SYNC_COUNT_LIMIT) # plus one more to cause the flush expect(count).to eq DartPacketLogWriter::DEFAULT_SYNC_COUNT_LIMIT + 1 writer.shutdown @@ -89,6 +96,8 @@ it "creates command logs" do DatabaseCleaner.clean + Rails.application.load_seed + meta = Cosmos::System.commands.packet("SYSTEM", "META") clr_cmd = Cosmos::System.commands.packet("INST", "CLEAR") # 128 byte file header, SYSTEM META has 24 byte header, @@ -111,36 +120,30 @@ ple = PacketLogEntry.find(1) expect(ple.target.name).to eq "SYSTEM" expect(ple.packet.name).to eq "META" - expect(ple.ready).to eq false # Hasn't been flushed yet + expect(ple.ready).to eq true + + clr_cmd.received_time = Time.now + writer.write(clr_cmd) # The second command should create a new log + sleep 0.1 # The second Log Entry is the command ple = PacketLogEntry.find(2) expect(ple.target.name).to eq "INST" expect(ple.packet.name).to eq "CLEAR" - expect(ple.ready).to eq false # Hasn't been flushed yet + expect(ple.ready).to eq true - clr_cmd.received_time = Time.now - writer.write(clr_cmd) # The second command should create a new log + writer.shutdown sleep 0.1 - # Check the the first two have been flushed - ple = PacketLogEntry.find(1) - expect(ple.ready).to eq true # Flushed - ple = PacketLogEntry.find(2) - expect(ple.ready).to eq true # Flushed - # The third and fourth are SYSTEM META and the command ple = PacketLogEntry.find(3) expect(ple.target.name).to eq "SYSTEM" expect(ple.packet.name).to eq "META" - expect(ple.ready).to eq false # Hasn't been flushed yet + expect(ple.ready).to eq true ple = PacketLogEntry.find(4) expect(ple.target.name).to eq "INST" expect(ple.packet.name).to eq "CLEAR" - expect(ple.ready).to eq false # Hasn't been flushed yet - - writer.shutdown - sleep 0.1 + expect(ple.ready).to eq true files = Dir["#{Cosmos::System.paths['DART_DATA']}/*_test_dart_cmd_*"] expect(files.length).to eq 2 diff --git a/lib/cosmos/dart/spec/dart/dart_reducer_manager_spec.rb b/lib/cosmos/dart/spec/dart/dart_reducer_manager_spec.rb index 9866dd2fd..c4e9172ec 100644 --- a/lib/cosmos/dart/spec/dart/dart_reducer_manager_spec.rb +++ b/lib/cosmos/dart/spec/dart/dart_reducer_manager_spec.rb @@ -21,6 +21,7 @@ before(:each) do DatabaseCleaner.strategy = :truncation DatabaseCleaner.clean + Rails.application.load_seed end def setup_ples(entries, delta_time) @@ -48,13 +49,13 @@ def setup_ples(entries, delta_time) end ples = 0 count = 0 + writer.shutdown while ples != (entries + 1) # SYSTEM META is the plus 1 ples = PacketLogEntry.count sleep 0.1 # Allow the log writer to work count += 1 break if count == 100 # 10s end - writer.shutdown sleep 0.1 expect(count).to be < 100 diff --git a/lib/cosmos/dart/spec/dart/dart_tcpip_server_interface_spec.rb b/lib/cosmos/dart/spec/dart/dart_tcpip_server_interface_spec.rb index 3e4d719f4..db23689a7 100644 --- a/lib/cosmos/dart/spec/dart/dart_tcpip_server_interface_spec.rb +++ b/lib/cosmos/dart/spec/dart/dart_tcpip_server_interface_spec.rb @@ -22,6 +22,7 @@ before(:each) do DatabaseCleaner.strategy = :truncation DatabaseCleaner.clean + Rails.application.load_seed end describe "initialize" do @@ -48,8 +49,6 @@ @request.write('REQUEST', "BLAH") interface.write(@request) sleep 0.1 - # TODO: Why is the interface still connected? - # expect(interface.connected?).to be false i.disconnect Dir["#{Cosmos::System.paths['DART_LOGS']}/*"].each do |file| diff --git a/lib/cosmos/gui/widgets/dart_meta_frame.rb b/lib/cosmos/gui/widgets/dart_meta_frame.rb index 078842a74..ada24195b 100644 --- a/lib/cosmos/gui/widgets/dart_meta_frame.rb +++ b/lib/cosmos/gui/widgets/dart_meta_frame.rb @@ -82,9 +82,14 @@ def initialize(parent) setLayout(@layout) - @resize_timer = Qt::Timer.new - @resize_timer.connect(SIGNAL('timeout()')) { update_meta_item_names() } - @resize_timer.start(100) + Thread.new do + sleep(0.1) + Qt.execute_in_main_thread(true) do + unless self.disposed? + update_meta_item_names() + end + end + end end def meta_filters @@ -100,9 +105,11 @@ def update_meta_item_names server = JsonDRbObject.new(System.connect_hosts['DART_DECOM'], System.ports['DART_DECOM']) item_names = server.item_names("SYSTEM", "META") Qt.execute_in_main_thread do - @meta_item_name.clear - item_names.each do |item| - @meta_item_name.addItem(item) + unless self.disposed? + @meta_item_name.clear + item_names.each do |item| + @meta_item_name.addItem(item) + end end end @got_meta_item_names = true diff --git a/lib/cosmos/interfaces.rb b/lib/cosmos/interfaces.rb index f224a8cda..584a8192a 100644 --- a/lib/cosmos/interfaces.rb +++ b/lib/cosmos/interfaces.rb @@ -9,6 +9,7 @@ require 'cosmos/interfaces/tcpip_server_interface' require 'cosmos/interfaces/udp_interface' require 'cosmos/interfaces/linc_interface' +require 'cosmos/interfaces/dart_status_interface' require 'cosmos/interfaces/protocols/protocol' require 'cosmos/interfaces/protocols/burst_protocol' diff --git a/lib/cosmos/interfaces/dart_status_interface.rb b/lib/cosmos/interfaces/dart_status_interface.rb new file mode 100644 index 000000000..9f518f421 --- /dev/null +++ b/lib/cosmos/interfaces/dart_status_interface.rb @@ -0,0 +1,91 @@ +# encoding: ascii-8bit + +# Copyright 2018 Ball Aerospace & Technologies Corp. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addendums as found in the LICENSE.txt + +require 'cosmos/interfaces/interface' +require 'cosmos/io/json_drb_object' +require 'cosmos/utilities/sleeper' + +module Cosmos + + # Defines all the attributes and methods common to all interface classes + # used by COSMOS. + class DartStatusInterface < Interface + # Initialize default attribute values + def initialize(query_period = 20.0) + super() + @write_raw_allowed = false + @dart = nil + @first = true + @query_period = query_period.to_f + @query_time = nil + end + + # Connects the interface to its target(s). Must be implemented by a + # subclass. + def connect + super() + @first = true + @status_packet = System.telemetry.packet('DART', 'STATUS').clone + @status_packet.write('PACKET_ID', 1) + @clear_errors_command = System.commands.packet('DART', 'CLEAR_ERRORS') + @sleeper = Sleeper.new + @dart = JsonDRbObject.new(System.connect_hosts['DART_DECOM'], System.ports['DART_DECOM']) + end + + # Indicates if the interface is connected to its target(s) or not. Must be + # implemented by a subclass. + def connected? + if @dart + true + else + false + end + end + + # Disconnects the interface from its target(s). Must be implemented by a + # subclass. + def disconnect + super() + @sleeper.cancel + @dart.disconnect if @dart + @dart = nil + end + + def read_interface + canceled = false + unless @first + if @query_time + sleep_time = @query_period - (Time.now - @query_time) + canceled = @sleeper.sleep(sleep_time) if sleep_time > 0 + end + @query_time = Time.now + end + @first = false + unless canceled + data = @dart.dart_status + write_interface_base(@dart.request_data.to_s) + read_interface_base(@dart.response_data.to_s) + data.each do |key, value| + @status_packet.write(key, value) + end + return @status_packet.buffer + else + return nil + end + end + + def write_interface(data) + @dart.clear_errors + write_interface_base(@dart.request_data.to_s) + read_interface_base(@dart.response_data.to_s) + return nil + end + end +end diff --git a/lib/cosmos/io/json_drb.rb b/lib/cosmos/io/json_drb.rb index f4d08056e..3fa663218 100644 --- a/lib/cosmos/io/json_drb.rb +++ b/lib/cosmos/io/json_drb.rb @@ -17,8 +17,13 @@ require 'cosmos/io/json_rpc' require 'cosmos/io/json_drb_rack' require 'rack/handler/puma' -if RUBY_ENGINE == 'ruby' and %w(2.2.7 2.2.8 2.3.4 2.4.1).include? RUBY_VERSION - require 'stopgap_13632' +if RUBY_ENGINE == 'ruby' and %w(2.2.7 2.2.8 2.2.9 2.2.10 2.3.4 2.4.1).include? RUBY_VERSION + begin + require 'stopgap_13632' + rescue Exception => err + msg = "Error loading stopgap. Make sure gem install stopgap_13632 succeeds: #{err.message}" + raise $!, msg, $!.backtrace + end end # Add methods to the Puma::Launcher and Puma::Single class so we can tell diff --git a/lib/cosmos/io/json_drb_object.rb b/lib/cosmos/io/json_drb_object.rb index e908671ab..9dc64b8b3 100644 --- a/lib/cosmos/io/json_drb_object.rb +++ b/lib/cosmos/io/json_drb_object.rb @@ -32,6 +32,9 @@ module Cosmos # server.cmd(*args) # class JsonDRbObject + attr_reader :request_data + attr_reader :response_data + # @param hostname [String] The name of the machine which has started # the JSON service # @param port [Integer] The port number of the JSON service @@ -46,6 +49,8 @@ def initialize(hostname, port, connect_timeout = 1.0) raise error end end + @request_data = "" + @response_data = "" @hostname = hostname @port = port @uri = URI("http://#{@hostname}:#{@port}") @@ -117,25 +122,25 @@ def make_request(method_name, method_params, first_try) request = JsonRpcRequest.new(method_name, method_params, @id) @id += 1 - request_data = request.to_json(:allow_nan => true) + @request_data = request.to_json(:allow_nan => true) begin STDOUT.puts "\nRequest:\n" if JsonDRb.debug? - STDOUT.puts request_data if JsonDRb.debug? + STDOUT.puts @request_data if JsonDRb.debug? @request_in_progress = true headers = {'Content-Type' => 'application/json-rpc'} res = @http.post(@uri, - :body => request_data, + :body => @request_data, :header => headers) - response_data = res.body + @response_data = res.body @request_in_progress = false STDOUT.puts "Response:\n" if JsonDRb.debug? - STDOUT.puts response_data if JsonDRb.debug? + STDOUT.puts @response_data if JsonDRb.debug? rescue => e disconnect() return false if first_try raise DRb::DRbConnError, e.message, e.backtrace end - response_data + @response_data end def handle_response(response_data) diff --git a/lib/cosmos/packets/packet_config.rb b/lib/cosmos/packets/packet_config.rb index 017eaac7c..32ecffdbb 100644 --- a/lib/cosmos/packets/packet_config.rb +++ b/lib/cosmos/packets/packet_config.rb @@ -395,8 +395,13 @@ def process_current_item(parser, keyword, params) # require should be performed in target.txt klass = params[0].filename_to_class_name.to_class raise parser.error("#{params[0].filename_to_class_name} class not found. Did you require the file in target.txt?", usage) unless klass - @current_item.send("#{keyword.downcase}=".to_sym, - klass.new(*params[1..(params.length - 1)])) + conversion = klass.new(*params[1..(params.length - 1)]) + @current_item.send("#{keyword.downcase}=".to_sym, conversion) + if conversion.converted_type.nil? or conversion.converted_bit_size.nil? + msg = "Read Conversion #{params[0].filename_to_class_name} on item #{@current_item.name} does not specify converted type or bit size. Will not be supported by DART" + @warnings << msg + Logger.instance.warn @warnings[-1] + end rescue Exception => err raise parser.error(err) end @@ -445,6 +450,11 @@ def process_current_item(parser, keyword, params) raise parser.error("Invalid converted_type: #{@converted_type}.") unless [:INT, :UINT, :FLOAT, :STRING, :BLOCK].include? @converted_type end @converted_bit_size = Integer(params[1]) if params[1] + if @converted_type.nil? or @converted_bit_size.nil? + msg = "Generic Conversion on item #{@current_item.name} does not specify converted type or bit size. Will not be supported by DART" + @warnings << msg + Logger.instance.warn @warnings[-1] + end # Define a set of limits for the current telemetry item when 'LIMITS' diff --git a/lib/cosmos/tools/tlm_grapher/tabbed_plots_tool/tabbed_plots_dart_thread.rb b/lib/cosmos/tools/tlm_grapher/tabbed_plots_tool/tabbed_plots_dart_thread.rb index eec829491..bbab5d605 100644 --- a/lib/cosmos/tools/tlm_grapher/tabbed_plots_tool/tabbed_plots_dart_thread.rb +++ b/lib/cosmos/tools/tlm_grapher/tabbed_plots_tool/tabbed_plots_dart_thread.rb @@ -50,6 +50,7 @@ def initialize(tabbed_plots_config, progress_dialog = nil, time_start = nil, tim time_end = Time.now unless time_end progress_dialog.set_step_progress(0) if progress_dialog index = 0 + query_string = "" required_queries.each do |target_name, packet_name, item_name, value_type, array_index, dart_reduction, dart_reduced_type| begin break if @cancel @@ -87,7 +88,7 @@ def initialize(tabbed_plots_config, progress_dialog = nil, time_start = nil, tim rescue Exception => error @errors << error break if @cancel - progress_dialog.append_text("Error querying #{query_string} : #{error.class}:#{error.message}\n#{error.backtrace.join("\n")}\n", 2) if progress_dialog + progress_dialog.append_text("Error querying #{query_string} : #{error.class}:#{error.message}\n#{error.backtrace.join("\n")}\n") if progress_dialog # If a progress dialog is shown we can't just bail on this error or # it will close and the user will have no idea what happened # Thus we'll spin here waiting for them to close the dialog @@ -96,8 +97,8 @@ def initialize(tabbed_plots_config, progress_dialog = nil, time_start = nil, tim sleep(0.1) until progress_dialog.complete? end break # Bail out because something bad happened - end - index += 1 + end + index += 1 end progress_dialog.set_step_progress(1.0) if progress_dialog and not @cancel progress_dialog.set_overall_progress(0.5) if progress_dialog and not @cancel @@ -118,7 +119,7 @@ def initialize(tabbed_plots_config, progress_dialog = nil, time_start = nil, tim rescue Exception => error @errors << error return if @cancel - progress_dialog.append_text("DART Thread Error #{error.class}:#{error.message}\n#{error.backtrace.join("\n")}\n", 2) if progress_dialog + progress_dialog.append_text("DART Thread Error #{error.class}:#{error.message}\n#{error.backtrace.join("\n")}\n") if progress_dialog # If a progress dialog is shown we can't just bail on this error or # it will close and the user will have no idea what happened # Thus we'll spin here waiting for them to close the dialog diff --git a/lib/cosmos/tools/tlm_grapher/tabbed_plots_tool/tabbed_plots_logfile_thread.rb b/lib/cosmos/tools/tlm_grapher/tabbed_plots_tool/tabbed_plots_logfile_thread.rb index 894512ed3..e0777bfa8 100644 --- a/lib/cosmos/tools/tlm_grapher/tabbed_plots_tool/tabbed_plots_logfile_thread.rb +++ b/lib/cosmos/tools/tlm_grapher/tabbed_plots_tool/tabbed_plots_logfile_thread.rb @@ -68,7 +68,7 @@ def initialize(log_files, packet_log_reader, tabbed_plots_config, progress_dialo rescue Exception => error @errors << error break if @cancel - progress_dialog.append_text("Error processing #{log_file}:\n#{error.class} : #{error.message}\n#{error.backtrace.join("\n")}\n", 2) if progress_dialog + progress_dialog.append_text("Error processing #{log_file}:\n#{error.class} : #{error.message}\n#{error.backtrace.join("\n")}\n") if progress_dialog # If a progress dialog is shown we can't just bail on this error or # it will close and the user will have no idea what happened # Thus we'll spin here waiting for them to close the dialog diff --git a/lib/cosmos/top_level.rb b/lib/cosmos/top_level.rb index 80773638f..3d4d73244 100644 --- a/lib/cosmos/top_level.rb +++ b/lib/cosmos/top_level.rb @@ -113,7 +113,7 @@ def self.define_user_path(start_dir = Dir.pwd) # Last chance - Check environment if ENV['COSMOS_USERPATH'] disable_warnings do - Cosmos.const_set(:USERPATH, ENV['COSMOS_USERPATH']) + Cosmos.const_set(:USERPATH, ENV['COSMOS_USERPATH'].gsub("\\", "/")) end else # Give up and assume we are in the tools directory