Skip to content

Commit

Permalink
Connection rewrite: update client and server connection logic (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
goldarte authored Oct 7, 2019
1 parent 590431c commit ca4b4aa
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 43 deletions.
54 changes: 23 additions & 31 deletions Drone/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
parent_dir = os.path.dirname(current_dir)
sys.path.insert(0, parent_dir)

#logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

import messaging_lib as messaging
Expand Down Expand Up @@ -103,7 +103,7 @@ def start(self):
try:
while True:
self._reconnect()
self._process_connections()
#self._process_connections()

except (KeyboardInterrupt, ):
logger.critical("Caught interrupt, exiting!")
Expand Down Expand Up @@ -142,7 +142,7 @@ def _reconnect(self, timeout=3.0, attempt_limit=5):
def _connect(self):
self.connected = True
self.client_socket.setblocking(False)
events = selectors.EVENT_READ | selectors.EVENT_WRITE
events = selectors.EVENT_READ # | selectors.EVENT_WRITE
self.selector.register(self.client_socket, events, data=self.server_connection)
self.server_connection.connect(self.selector, self.client_socket, (self.server_host, self.server_port))
self._process_connections()
Expand Down Expand Up @@ -187,39 +187,31 @@ def on_broadcast_bind(self):
def _process_connections(self):
while True:
events = self.selector.select(timeout=1)
if events:
for key, mask in events:
if key.data is None:
pass
else:
connection = key.data
try:
connection.process_events(mask)

except Exception as error:
logger.error(
"Exception {} occurred for {}! Resetting connection!".format(error, connection.addr)
)
self.server_connection.close()
self.connected = False

if isinstance(error, OSError):
if error.errno == errno.EINTR:
raise KeyboardInterrupt
else:
time.sleep(0.001)
# logging.debug("tick")
for key, mask in events: # TODO add notifier to client!
connection = key.data
if connection is None:
pass
else:
try:
connection.process_events(mask)

except Exception as error:
logger.error(
"Exception {} occurred for {}! Resetting connection!".format(error, connection.addr)
)
self.server_connection.close()
self.connected = False

if isinstance(error, OSError):
if error.errno == errno.EINTR:
raise KeyboardInterrupt


if not self.selector.get_map():
logger.warning("No active connections left!")
return

time.sleep(0.001)

#def connection_processor(self):
# while not self._shutdown_event.is_set():
# self._running_event.wait()
# self._process_connections()


@messaging.request_callback("id")
def _response_id():
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
* [Аддон для Blender 2.8](https://github.com/artem30801/CleverSwarm/tree/master/blender-addon) для преобразования анимации полёта коптеров, созданной в Blender, в файлы полётов для каждого коптера
* [Образ для Raspberry Pi](https://github.com/artem30801/CleverSwarm/releases/latest) для быстрого запуска ПО на коптере

## Документация
## Документация:
Инструкция по запуску ПО находится [здесь](docs/start-tutorial.md).

Подробная документация расположена в папке [docs](https://github.com/artem30801/CleverSwarm/tree/master/docs).


21 changes: 16 additions & 5 deletions Server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,20 +143,27 @@ def _client_processor(self):
logging.info("Client processor (selector) thread started!")
self.server_socket.listen()
self.server_socket.setblocking(False)
self.sel.register(self.server_socket, selectors.EVENT_READ | selectors.EVENT_WRITE, data=None)
self.sel.register(self.server_socket, selectors.EVENT_READ, data=None) #| selectors.EVENT_WRITE

messaging.NotifierSock().bind((self.ip, self.port))

while self.client_processor_thread_running.is_set():
events = self.sel.select()
logging.error('tick')
for key, mask in events:
if key.data is None:
# logging.error(mask)
# logging.error(str(key.data))
client = key.data
if client is None:
self._connect_client(key.fileobj)
else:
client = key.data
elif isinstance(client, messaging.ConnectionManager):
try:
client.process_events(mask)
except Exception as error:
logging.error("Exception {} occurred for {}! Resetting connection!".format(error, client.addr))
client.close()
else: # Notifier
client.process_events(mask)

logging.info("Client autoconnect thread stopped!")

Expand All @@ -165,7 +172,11 @@ def _connect_client(self, sock):
logging.info("Got connection from: {}".format(str(addr)))
conn.setblocking(False)

if not any([client_addr == addr[0] for client_addr in Client.clients.keys()]):
if addr[0] == self.ip and messaging.NotifierSock().addr is None:
client = messaging.NotifierSock()
logging.info("Notifier sock client")

elif not any([client_addr == addr[0] for client_addr in Client.clients.keys()]):
client = Client(addr[0])
logging.info("New client")
else:
Expand Down
2 changes: 2 additions & 0 deletions Server/server_qt.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import threading


def wait(end, interrupter=threading.Event(), maxsleep=0.1):
# Added features to interrupter sleep and set max sleeping interval

Expand All @@ -30,6 +31,7 @@ def wait(end, interrupter=threading.Event(), maxsleep=0.1):
else:
time.sleep(diff / 2)


def confirmation_required(text="Are you sure?", label="Confirm operation?"):
def inner(f):

Expand Down
79 changes: 73 additions & 6 deletions messaging_lib.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import io
import sys
import json
import socket
import struct
import random
import logging
Expand All @@ -12,13 +13,28 @@
except ImportError:
import selectors2 as selectors

#import logging_lib
# import logging_lib

PendingRequest = collections.namedtuple("PendingRequest", ["value", "requested_value", # "expires_on",
"callback", "callback_args", "callback_kwargs",
])
logger = logging.getLogger(__name__)
#logger = logging_lib.Logger(_logger, True)


# logger = logging_lib.Logger(_logger, True)


class _Singleton(type):
""" A metaclass that creates a Singleton base class when called. """
_instances = {}

def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(_Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]


class Singleton(_Singleton('SingletonMeta', (object,), {})): pass


class MessageManager:
Expand Down Expand Up @@ -196,14 +212,17 @@ def _set_selector_events_mask(self, mode):
events = selectors.EVENT_READ | selectors.EVENT_WRITE
else:
raise ValueError("Invalid events mask mode {}.".format(mode))
self.selector.modify(self.socket, events, data=self)

key = self.selector.modify(self.socket, events, data=self)
logging.debug("Switched selector of {} to mode {}".format(self.addr, key.events))
return key

def connect(self, client_selector, client_socket, client_addr):
self.selector = client_selector
self.socket = client_socket
self.addr = client_addr

self._set_selector_events_mask('rw')
self._set_selector_events_mask('r')

def close(self):
logger.info("Closing connection to {}".format(self.addr))
Expand Down Expand Up @@ -269,7 +288,8 @@ def _read(self):

def process_received(self, income_message):
message_type = income_message.jsonheader["message-type"]
logger.debug("Received message! Header: {}, content: {}".format(income_message.jsonheader, income_message.content))
logger.debug(
"Received message! Header: {}, content: {}".format(income_message.jsonheader, income_message.content))

if message_type == "message":
self._process_message(income_message)
Expand Down Expand Up @@ -339,6 +359,8 @@ def write(self):
self._send_buffer += message
if self._send_buffer:
self._write()
else:
self._set_selector_events_mask('r') # we're done writing

def _write(self):
try:
Expand All @@ -347,7 +369,8 @@ def _write(self):
# Resource temporarily unavailable (errno EWOULDBLOCK)
pass
except Exception as error:
logger.warning("Attempt to send message {} to {} failed due error: {}".format(self._send_buffer, self.addr, error))
logger.warning(
"Attempt to send message {} to {} failed due error: {}".format(self._send_buffer, self.addr, error))

if not self.resume_queue:
self._send_buffer = b''
Expand All @@ -361,6 +384,10 @@ def _send(self, data):
with self._send_lock:
self._send_queue.append(data)

if self.selector.get_key(self.socket).events != selectors.EVENT_WRITE:
self._set_selector_events_mask('w')
NotifierSock().notify()

def get_response(self, requested_value, callback, request_args=None, # timeout=30,
callback_args=(), callback_kwargs=None):
if request_args is None:
Expand Down Expand Up @@ -397,3 +424,43 @@ def send_file(self, filepath, dest_filepath): # clever_restart=False
self._send(MessageManager.create_message(
data, "binary", "filetransfer", "binary", {"filepath": dest_filepath}
))


class NotifierSock(Singleton):
def __init__(self):
self.receive_socket = None
self.addr = None

self._notify_socket = None
self._notify_lock = threading.Lock()

def bind(self, server_addr):
self._notify_socket = socket.socket()
self._notify_socket.connect(server_addr)
logger.info("Notify socket: bind")

def connect(self, _, client_socket, client_addr):
self.receive_socket = client_socket
self.addr = client_addr

logger.info("Notify socket: connected")

def notify(self):
with self._notify_lock:
if self.addr is not None:
self._notify_socket.sendall(bytes(1))
logger.debug("Notify socket: notified")

def process_events(self, mask):
if mask & selectors.EVENT_READ:
try:
data = self.receive_socket.recv(1024)
except Exception: # TODO remove
pass
else:
if data:
logger.debug("Notifier received {} from {}".format(data, self.addr))
else:
self.addr = None
logger.warning("Notifier: connection to {} lost!".format(self.addr))

0 comments on commit ca4b4aa

Please sign in to comment.