Skip to content

Commit

Permalink
feat: add is_alive to DAQJobStats and remote config changes
Browse files Browse the repository at this point in the history
- add necessary methods to Supervisor for that
- add `start_time` to `DAQJobStats`
- add `drop_remote_messages` to `DAQRemoteConfig`
- fix tests
  • Loading branch information
furkan-bilgin committed Nov 26, 2024
1 parent 9aa189e commit c104404
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 9 deletions.
17 changes: 15 additions & 2 deletions src/enrgdaq/daq/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import logging
import threading
import uuid
from dataclasses import dataclass
from datetime import timedelta
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from queue import Empty, Queue
from typing import Any, Optional

Expand Down Expand Up @@ -121,6 +121,18 @@ def handle_message(self, message: "DAQJobMessage") -> bool:
raise Exception(
f"Message type '{type(message)}' is not accepted by '{type(self).__name__}'"
)
# Drop remote messages silently if configured to do so
if self.config.remote_config.drop_remote_messages:
if message.daq_job_info and message.daq_job_info.supervisor_config:
remote_supervisor_id = (
message.daq_job_info.supervisor_config.supervisor_id
)
else:
remote_supervisor_id = "unknown"
self._logger.debug(
f"Dropping remote message '{type(message)}' from '{remote_supervisor_id}' because drop_remote_messages is True"
)
return True
return True

def start(self):
Expand Down Expand Up @@ -180,3 +192,4 @@ def free(self):
class DAQJobThread:
daq_job: DAQJob
thread: threading.Thread
start_time: datetime = field(default_factory=datetime.now)
2 changes: 2 additions & 0 deletions src/enrgdaq/daq/jobs/handle_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def handle_message(self, message: DAQJobMessageStats) -> bool:

keys = [
"daq_job",
"is_alive",
"last_message_in_date",
"message_in_count",
"last_message_out_date",
Expand All @@ -70,6 +71,7 @@ def unpack_record(record: DAQJobStatsRecord):
data_to_send.append(
[
daq_job_type.__name__,
str(msg.is_alive).lower(),
*unpack_record(msg.message_in_stats),
*unpack_record(msg.message_out_stats),
*unpack_record(msg.restart_stats),
Expand Down
6 changes: 5 additions & 1 deletion src/enrgdaq/daq/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,13 @@ class DAQRemoteConfig(Struct, kw_only=True):
Attributes:
remote_topic (Optional[str]): The topic to send the message to for remote communication.
remote_disable (Optional[bool]): Whether to disable remote communication.
remote_disable (Optional[bool]): Whether to send messages from this DAQ job to remote. If True, messages will not be sent to any remote.
drop_remote_messages (Optional[bool]): Whether to drop remote messages. If True, messages from remote will not be processed, but messages may still be sent to remote.
"""

remote_topic: Optional[str] = DEFAULT_REMOTE_TOPIC
remote_disable: Optional[bool] = False
drop_remote_messages: Optional[bool] = False


class DAQJobConfig(Struct, kw_only=True):
Expand Down Expand Up @@ -141,11 +143,13 @@ class DAQJobStats(Struct):
message_in_stats (DAQJobStatsRecord): The statistics for incoming messages.
message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages.
restart_stats (DAQJobStatsRecord): The statistics for restarts.
is_alive (bool): Whether the DAQJob is alive.
"""

message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
is_alive: bool = True


class DAQJobStopError(Exception):
Expand Down
57 changes: 51 additions & 6 deletions src/enrgdaq/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,16 @@
from enrgdaq.models import SupervisorConfig

DAQ_SUPERVISOR_SLEEP_TIME_SECONDS = 0.2
"""Time in seconds to sleep between iterations of the supervisor's main loop."""

DAQ_JOB_QUEUE_ACTION_TIMEOUT = 0.1
"""Time in seconds to wait for a DAQ job to process a message."""

DAQ_JOB_MARK_AS_ALIVE_TIME_SECONDS = 5
"""Time in seconds to mark a DAQ job as alive after it has been running for that long."""

DAQ_SUPERVISOR_STATS_MESSAGE_INTERVAL_SECONDS = 1
"""Time in seconds between sending supervisor stats messages."""


@dataclass
Expand All @@ -41,13 +50,16 @@ class Supervisor:
daq_job_stats (DAQJobStatsDict): Dictionary holding statistics for each DAQ job type.
restart_schedules (list[RestartDAQJobSchedule]): List of schedules for restarting DAQ jobs.
_logger (logging.Logger): Logger instance for logging supervisor activities.
_last_stats_message_time (datetime): Last time a stats message was sent.
"""

daq_job_threads: list[DAQJobThread]
daq_job_stats: DAQJobStatsDict
restart_schedules: list[RestartDAQJobSchedule]
_logger: logging.Logger

_last_stats_message_time: datetime

def init(self):
"""
Initializes the supervisor, loads configuration, starts DAQ job threads, and warns for lack of DAQ jobs.
Expand All @@ -68,6 +80,10 @@ def init(self):
}
self.warn_for_lack_of_daq_jobs()

self._last_stats_message_time = datetime.now() - timedelta(
seconds=DAQ_SUPERVISOR_STATS_MESSAGE_INTERVAL_SECONDS
)

def start_daq_job_threads(self) -> list[DAQJobThread]:
return start_daq_jobs(load_daq_jobs("configs/", self.config))

Expand Down Expand Up @@ -103,7 +119,10 @@ def loop(self):
# Restart jobs that have stopped or are scheduled to restart
self.restart_daq_jobs()

# Get messages from enrgdaq.daq. Jobs
# Handle thread alive stats for dead & alive threads
self.handle_thread_alive_stats(dead_threads)

# Get messages from DAQ Jobs
daq_messages_out = self.get_messages_from_daq_jobs()

# Add supervisor messages
Expand All @@ -112,6 +131,28 @@ def loop(self):
# Send messages to appropriate DAQ Jobs
self.send_messages_to_daq_jobs(daq_messages_out)

def handle_thread_alive_stats(self, dead_threads: list[DAQJobThread]):
"""
Handles the alive stats for the dead threads.
Args:
dead_threads (list[DAQJobThread]): List of dead threads.
"""

for thread in self.daq_job_threads:
if (
datetime.now() + timedelta(seconds=DAQ_JOB_MARK_AS_ALIVE_TIME_SECONDS)
< thread.start_time
):
self.get_daq_job_stats(
self.daq_job_stats, type(thread.daq_job)
).is_alive = True

for thread in dead_threads:
self.get_daq_job_stats(
self.daq_job_stats, type(thread.daq_job)
).is_alive = False

def get_restart_schedules(self, dead_threads: list[DAQJobThread]):
"""
Gets the restart schedules for the dead threads.
Expand Down Expand Up @@ -181,12 +222,16 @@ def get_supervisor_messages(self) -> list[DAQJobMessage]:
messages = []

# Send stats message
messages.append(
DAQJobMessageStats(
stats=self.daq_job_stats,
daq_job_info=self._get_supervisor_daq_job_info(),
if datetime.now() > self._last_stats_message_time + timedelta(
seconds=DAQ_SUPERVISOR_STATS_MESSAGE_INTERVAL_SECONDS
):
self._last_stats_message_time = datetime.now()
messages.append(
DAQJobMessageStats(
stats=self.daq_job_stats,
daq_job_info=self._get_supervisor_daq_job_info(),
)
)
)
return messages

def get_daq_job_stats(
Expand Down
3 changes: 3 additions & 0 deletions src/tests/test_supervisor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import unittest
from datetime import datetime, timedelta
from queue import Queue
from unittest.mock import MagicMock, patch

Expand Down Expand Up @@ -65,8 +66,10 @@ def test_loop(
):
mock_thread_alive = MagicMock()
mock_thread_alive.thread.is_alive.return_value = True
mock_thread_alive.start_time = datetime.now() - timedelta(seconds=5)
mock_thread_dead = MagicMock()
mock_thread_dead.thread.is_alive.return_value = False
mock_thread_dead.start_time = datetime.now() - timedelta(seconds=10)
mock_restart_daq_job.return_value = mock_thread_alive

self.supervisor.daq_job_threads = [mock_thread_alive, mock_thread_dead]
Expand Down

0 comments on commit c104404

Please sign in to comment.