Skip to content

Commit

Permalink
feat: add some docs
Browse files Browse the repository at this point in the history
  • Loading branch information
furkan-bilgin committed Nov 16, 2024
1 parent 3cc7df1 commit 6a7f2e0
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 9 deletions.
47 changes: 47 additions & 0 deletions src/enrgdaq/daq/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,23 @@


class DAQJob:
"""
DAQJob is a base class for data acquisition jobs. It handles the configuration,
message queues, and provides methods for consuming and handling messages.
Attributes:
allowed_message_in_types (list[type[DAQJobMessage]]): List of allowed message types for input.
config_type (Any): Type of the configuration.
config (Any): Configuration object.
message_in (Queue[DAQJobMessage]): Queue for incoming messages.
message_out (Queue[DAQJobMessage]): Queue for outgoing messages.
instance_id (int): Unique instance identifier.
unique_id (str): Unique identifier for the job.
restart_offset (timedelta): Offset for restarting the job.
info (DAQJobInfo): Information about the job.
_has_been_freed (bool): Flag indicating if the job has been freed.
_logger (logging.Logger): Logger instance for the job.
"""

allowed_message_in_types: list[type[DAQJobMessage]] = []
config_type: Any
config: Any
Expand Down Expand Up @@ -79,6 +96,20 @@ def _process_message(message):
break

def handle_message(self, message: "DAQJobMessage") -> bool:
"""
Handles a message received from the message queue.
Args:
message (DAQJobMessage): The message to handle.
Returns:
bool: True if the message was handled, False otherwise.
Raises:
DAQJobStopError: If the message is a DAQJobMessageStop.
Exception: If the message is not accepted by the job.
"""

if isinstance(message, DAQJobMessageStop):
raise DAQJobStopError(message.reason)
# check if the message is accepted
Expand All @@ -96,6 +127,13 @@ def start(self):
raise NotImplementedError

def _create_info(self) -> "DAQJobInfo":
"""
Creates a DAQJobInfo object for the job.
Returns:
DAQJobInfo: The created DAQJobInfo object.
"""

return DAQJobInfo(
daq_job_type=self.config.daq_job_type
if isinstance(self.config, DAQJobConfig)
Expand All @@ -107,6 +145,15 @@ def _create_info(self) -> "DAQJobInfo":
)

def _put_message_out(self, message: DAQJobMessage):
"""
Puts a message in the message_out queue.
Should be called by DAQJob itself.
Args:
message (DAQJobMessage): The message to put in the queue.
"""

message.daq_job_info = self.info
message.remote_config = self.config.remote_config

Expand Down
66 changes: 66 additions & 0 deletions src/enrgdaq/daq/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@


class LogVerbosity(str, Enum):
"""
Enum representing the verbosity levels for logging.
Used in DAQJobConfig.
"""

DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
Expand All @@ -27,6 +33,21 @@ def to_logging_level(self) -> int:

@dataclass
class DAQJobInfo:
"""
A class to represent the information of a DAQJob.
Attributes:
daq_job_type : str
The type of the DAQ job.
daq_job_class_name : str
The class name of the DAQ job, typically the name of the class itself.
unique_id : str
A unique identifier for the DAQ job.
instance_id : int
An instance identifier for the DAQ job.
supervisor_config : Optional[SupervisorConfig]
Configuration for the supervisor, if any.
"""

daq_job_type: str
daq_job_class_name: str # has type(self).__name__
unique_id: str
Expand All @@ -45,17 +66,45 @@ def mock() -> "DAQJobInfo":


class DAQRemoteConfig(Struct, kw_only=True):
"""
Configuration for remote communication of DAQJobMessages.
Used both in DAQJobConfig and in DAQJobMessageStore.
Attributes:
remote_topic (Optional[str]): The topic to send the message to for remote communication.
remote_disable (Optional[bool]): Whether to disable remote communication.
"""

remote_topic: Optional[str] = DEFAULT_REMOTE_TOPIC
remote_disable: Optional[bool] = False


class DAQJobConfig(Struct, kw_only=True):
"""
DAQJobConfig is the base configuration class for DAQJobs.
Attributes:
verbosity (LogVerbosity): The verbosity level for logging. Defaults to LogVerbosity.INFO.
remote_config (Optional[DAQRemoteConfig]): The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
daq_job_type (str): The type of the DAQ job.
"""

verbosity: LogVerbosity = LogVerbosity.INFO
remote_config: Optional[DAQRemoteConfig] = field(default_factory=DAQRemoteConfig)
daq_job_type: str


class DAQJobMessage(Struct, kw_only=True):
"""
DAQJobMessage is the base class for messages sent between DAQJobs.
Attributes:
id (Optional[str]): The unique identifier for the message. Defaults to a UUID.
timestamp (Optional[datetime]): The timestamp for the message. Defaults to the current datetime.
is_remote (bool): Whether the message is sent by a remote DAQJob. Defaults to False.
daq_job_info (Optional[DAQJobInfo]): The information about the DAQJob that sent the message. Defaults to None.
remote_config (DAQRemoteConfig): The remote configuration for the DAQ job. Defaults to an instance of DAQRemoteConfig.
"""

id: Optional[str] = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: Optional[datetime] = field(default_factory=datetime.now)
is_remote: bool = False
Expand All @@ -68,6 +117,14 @@ class DAQJobMessageStop(DAQJobMessage):


class DAQJobStatsRecord(Struct):
"""
A class to represent a record of statistics for a DAQJob.
Attributes:
count (int): The number of times the DAQJob has been called.
last_updated (Optional[datetime]): The last time the DAQJob was called.
"""

count: int = 0
last_updated: Optional[datetime] = None

Expand All @@ -77,6 +134,15 @@ def increase(self, amount: int = 1):


class DAQJobStats(Struct):
"""
A class to represent statistics for a DAQJob. Gets created and updated by Supervisor.
Attributes:
message_in_stats (DAQJobStatsRecord): The statistics for incoming messages.
message_out_stats (DAQJobStatsRecord): The statistics for outgoing messages.
restart_stats (DAQJobStatsRecord): The statistics for restarts.
"""

message_in_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
message_out_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
restart_stats: DAQJobStatsRecord = field(default_factory=DAQJobStatsRecord)
Expand Down
21 changes: 21 additions & 0 deletions src/enrgdaq/daq/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,22 @@


class DAQJobStore(DAQJob):
"""
DAQJobStore is an abstract base class for data acquisition job stores. It extends the DAQJob class
and provides additional functionality for handling and storing messages.
Attributes:
allowed_store_config_types (list): A list of allowed store configuration types.
"""

allowed_store_config_types: list

def start(self):
"""
Starts the continuous loop for consuming and storing data.
This method runs an infinite loop that repeatedly calls the `consume`
and `store_loop` methods.
"""

while True:
self.consume()
self.store_loop()
Expand All @@ -27,6 +40,14 @@ def handle_message(self, message: DAQJobMessage) -> bool:
return super().handle_message(message)

def can_store(self, message: DAQJobMessage) -> bool:
"""
Determines if the given message can be stored based on its configuration.
Args:
message (DAQJobMessage): The message to be checked.
Returns:
bool: True if the message can be stored, False otherwise.
"""

if not isinstance(message, DAQJobMessageStore):
return False
is_message_allowed = False
Expand Down
33 changes: 25 additions & 8 deletions src/enrgdaq/daq/store/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,30 @@ def has_store_config(self, store_type: Any) -> bool:


class DAQJobMessageStore(DAQJobMessage):
"""
DAQJobMessageStore is a class that extends DAQJobMessage and is used to store
configuration and data related to a DAQ (Data Acquisition) job.
Attributes:
store_config (DAQJobStoreConfig): Configuration for the DAQ job store.
keys (list[str]): List of keys associated with the data.
data (list[list[Any]]): Nested list containing the data.
tag (str | None): Optional tag associated with the DAQ job.
"""

store_config: DAQJobStoreConfig
keys: list[str]
data: list[list[Any]]
tag: str | None = None

def get_remote_config(self) -> Optional[DAQRemoteConfig]:
"""
Retrieves the remote configuration from the store_config.
Iterates through the attributes of `self.store_config` to find an instance
of `DAQJobStoreConfigBase` that has a non-None `remote_config` attribute.
Returns:
Optional[DAQRemoteConfig]: The remote configuration if found, otherwise None.
"""

for key in dir(self.store_config):
value = getattr(self.store_config, key)
if not isinstance(value, DAQJobStoreConfigBase):
Expand All @@ -46,16 +64,15 @@ class StorableDAQJobConfig(DAQJobConfig):
store_config: DAQJobStoreConfig


class DAQJobStoreTarget(Struct):
instances: Optional[list["DAQJobStoreTargetInstance"]] = None


class DAQJobStoreTargetInstance(Struct):
supervisor_id: Optional[str] = None
is_self: Optional[bool] = None
class DAQJobStoreConfigBase(Struct, kw_only=True):
"""
DAQJobStoreConfigBase is a configuration class for DAQ job store,
that is expected to be extended by specific store configurations, such as CSV, MySQL, etc.
Attributes:
remote_config (Optional[DAQRemoteConfig]): Configuration for remote DAQ.
"""

class DAQJobStoreConfigBase(Struct, kw_only=True):
remote_config: Optional[DAQRemoteConfig] = None


Expand Down
11 changes: 11 additions & 0 deletions src/enrgdaq/daq/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ def get_all_daq_job_types():
def get_daq_job_class(
daq_job_type: str, warn_deprecated: bool = False
) -> Optional[type[DAQJob]]:
"""
Gets the DAQJob class for a given DAQ job type.
Args:
daq_job_type (str): The type of the DAQ job.
warn_deprecated (bool): Whether to warn if the DAQ job type is deprecated.
Returns:
Optional[type[DAQJob]]: The DAQJob class for the given DAQ job type, or None if not found.
"""

daq_job_class = None
if daq_job_type in DAQ_JOB_TYPE_TO_CLASS:
daq_job_class = DAQ_JOB_TYPE_TO_CLASS[daq_job_type]
Expand Down
6 changes: 6 additions & 0 deletions src/enrgdaq/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@


class SupervisorConfig(Struct):
"""
A configuration class for a supervisor.
Attributes:
supervisor_id (str): The unique identifier for the supervisor that is going to be used primarily for DAQJobRemote.
"""

supervisor_id: str

def clone(self):
Expand Down
Loading

0 comments on commit 6a7f2e0

Please sign in to comment.