Skip to content

Commit

Permalink
Merge pull request #44 from amosproj/feature/36-control-flow-implemen…
Browse files Browse the repository at this point in the history
…tation

Feature/36 control flow implementation
  • Loading branch information
rbbozkurt authored Nov 7, 2023
2 parents f3c4d14 + b9b3f4e commit 6ce7aa2
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 0 deletions.
183 changes: 183 additions & 0 deletions src/controller/Controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: 2023 Berkay Bozkurt <resitberkaybozkurt@gmail.com>

from queue import Queue
from threading import Lock, Thread
from typing import Any

from messenger import Message, MessageType, create_data_message


class ControllerMeta(type):

"""
Thread safe singleton implementation of Controller
"""

_instances = {} # Dictionary to store instances of Controller
_lock: Lock = Lock() # A lock to ensure thread-safety when creating instances

def __call__(cls, *args: Any, **kwds: Any):
with cls._lock:
if cls not in cls._instances:
instance = super().__call__(
*args, **kwds
) # Create a new instance of Controller
cls._instances[
cls
] = instance # Store the instance in the _instances dictionary

return cls._instances[cls] # Return the instance of Controller


class Controller(metaclass=ControllerMeta):
"""
Controller class with message processing and sending functionality.
"""

def __init__(self, name: str) -> None:
self.name = name
self._finish_flag = False
self._finish_flag_lock = Lock()
self._message_queue: Queue[Message] = Queue(0) # Queue for processing messages
self._routing_queue: Queue[Message] = Queue(0) # Queue for routing messages
self._message_queue_processor_thread: Thread = (
None # Thread for processing messages
)
self._routing_queue_processor_thread: Thread = (
None # Thread for routing messages
)
self._start_message_queue_processing_thread() # Start the message processing thread
self._start_routing_queue_processing_thread() # Start the routing thread

def _message_queue_processor(self):
while True:
# do read operation with lock to get latest value
with self._finish_flag_lock:
# if set True then exit loop
if self._finish_flag:
break
if not self._message_queue.empty():
try:
# Get a message from the message queue and process it‚
msg = self._message_queue.get()
# Simulate processing of the message
print(f"Processing on {msg}")
self._enqueue_routing(msg)
# Simulate completion of processing
print(f"Processed {msg}")
# Handle any errors during message processing
except Exception as e:
print(f"Error while processing message: {e}")
finally:
# Mark the task as done in the processing queue
self._message_queue.task_done()
print(f"Message queue processor thread exited.")

def _routing_queue_processor(self):
while True:
with self._finish_flag_lock:
if self._finish_flag:
break
if not self._routing_queue.empty():
try:
# Mark the task as done in the processing queue
msg = self._routing_queue.get()
print(f"Routing {msg}")
if msg.data_type == MessageType.DATA:
self._route_to_EVP(msg)
elif msg.data_type == MessageType.PREDICTION:
self._route_to_BDC(msg)
else:
print(f"Unknown message type: {msg.data_type}")
print(f"Routed {msg}")
# Handle any errors during message routing
except Exception as e:
print(f"Error while routing message: {e}")
finally:
# Mark the task as done in the processing queue
self._routing_queue.task_done()
print(f"Routing queue processor thread exited.")

# Start the message processing thread
def _start_message_queue_processing_thread(self):
if (
not self._message_queue_processor_thread
or not self._message_queue_processor_thread.is_alive()
):
self._message_queue_processor_thread = Thread(
target=self._message_queue_processor, daemon=True
)
self._message_queue_processor_thread.start()

# Start the message routing thread
def _start_routing_queue_processing_thread(self):
if (
not self._routing_queue_processor_thread
or not self._routing_queue_processor_thread.is_alive()
):
self._routing_queue_processor_thread = Thread(
target=self._routing_queue_processor, daemon=True
)
self._routing_queue_processor_thread.start()

def _route_to_BDC(self, msg: Message):
# TODO call the method of base data collector
return

def _route_to_EVP(self, msg: Message):
# TODO call the method of estimated value predictor
return

# Enqueue a message in the processing queue
def _enqueue_message(self, msg: Message):
self._message_queue.put(msg)

# Enqueue a message in the routing queue
def _enqueue_routing(self, msg: Message):
self._routing_queue.put(msg)

# Public interface to send a message
def send_message(self, msg: Message):
"""
processes message, forwards to related components.
"""
if not self._finish_flag:
self._enqueue_message(msg)
else:
print(f"Controller finished can not send messages... ")

def finish(self):
"""
finishes controller, after all waiting messages are processed and routed
"""

# wait till queues are empty.
while not self._message_queue.empty() or not self._routing_queue.empty():
print(f"Waiting for message and routing threads to finish their jobs... ")

with self._finish_flag_lock:
# Set the finish flag to signal threads to stop
self._finish_flag = True

print(f"Finishing threads... ")

# Wait for the message queue processing thread to finish
if (
self._message_queue_processor_thread
and self._message_queue_processor_thread.is_alive()
):
print(f"Finishing message queue processor thread...")
self._message_queue_processor_thread.join()
# Wait for the routing queue processing thread to finish
if (
self._routing_queue_processor_thread
and self._routing_queue_processor_thread.is_alive()
):
print(f"Finishing routing queue processor thread...")
self._routing_queue_processor_thread.join()

# check if there are any elements in queues, if not, all cool!
print(f"Threads finished... ")
print(f"routing queue size... {self._routing_queue.unfinished_tasks}")
print(f"message queue size... {self._message_queue.unfinished_tasks}")
48 changes: 48 additions & 0 deletions src/controller/messenger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: 2023 Berkay Bozkurt <resitberkaybozkurt@gmail.com>

from enum import Enum
from typing import Dict, Optional

from pydantic import BaseModel


class MessageType(Enum):
DATA = "data"
PREDICTION = "prediction"


class SenderType(Enum):
BDC = "base_data_collector"
EVP = "estimated_value_predictor"


class Message(BaseModel):
sender_name: SenderType
data_type: MessageType
data: Optional[Dict] = {}
result: Optional[Dict] = {}


def create_data_message(lead_id, **features):
"""
Creates a data message, called by BDC.
"""
message = Message(
sender_name=SenderType.BDC,
data_type=MessageType.DATA,
data={"lead_id": lead_id, **features},
)
return message


def create_prediction_message(lead_id, prediction_value):
"""
Create a prediction message, called by EVP.
"""
message = Message(
sender_name=SenderType.EVP,
data_type=MessageType.PREDICTION,
result={"lead_id": lead_id, "prediction value": prediction_value},
)
return message

0 comments on commit 6ce7aa2

Please sign in to comment.