Skip to content

Commit

Permalink
Initial Commit
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewd0123 committed Apr 16, 2024
1 parent 5a2f9dd commit 72e4b66
Show file tree
Hide file tree
Showing 17 changed files with 817 additions and 449 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/tck-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: TCK Tests

on:
push:
branches: [ "main" ]
branches: [ "main", "test_patrick_changes" ]
pull_request:
branches: [ "main" ]
branches: [ "main", "test_patrick_changes" ]

permissions:
contents: read
Expand Down Expand Up @@ -88,7 +88,6 @@ jobs:
echo "Running Test: $filename"
behave --format json --outfile "./reports/${filename}.json" --format html --outfile "./reports/${filename}.html" "$full_path"
echo "Finished Test: $filename"
sleep 70
done
- name: Get Behave Scripts
uses: actions/github-script@v6
Expand Down
88 changes: 52 additions & 36 deletions dispatcher/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import logging
import selectors
import socket
from threading import Thread, Lock
import sys
from threading import Lock
from typing import Set

logging.basicConfig(format='%(levelname)s| %(filename)s:%(lineno)s %(message)s')
logger = logging.getLogger('File:Line# Debugger')
Expand All @@ -38,20 +40,23 @@

class Dispatcher:
"""
Dispatcher class handles incoming connections and forwards messages
to all connected up-clients.
"""
Dispatcher class handles incoming connections and forwards messages
to all connected up-clients.
"""

def __init__(self):
"""
Initialize the Dispatcher class.
"""
self.selector = selectors.DefaultSelector()
self.connected_sockets = set()
self.connected_sockets: Set[socket.socket] = set()
self.lock = Lock()
self.server = None

# Create server socket
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if sys.platform != "win32":
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind(DISPATCHER_ADDR)
self.server.listen(100)
self.server.setblocking(False)
Expand All @@ -60,13 +65,17 @@ def __init__(self):

# Register server socket for accepting connections
self.selector.register(self.server, selectors.EVENT_READ, self._accept_client_conn)

# Cleanup essentials
self.dispatcher_exit = False

def _accept_client_conn(self, server):
def _accept_client_conn(self, server: socket.socket):
"""
Callback function for accepting up-client connections.
:param server: The server socket.
"""

up_client_socket, _ = server.accept()
logger.info(f'accepted conn. {up_client_socket.getpeername()}')

Expand All @@ -76,69 +85,76 @@ def _accept_client_conn(self, server):
# Register socket for receiving data
self.selector.register(up_client_socket, selectors.EVENT_READ, self._receive_from_up_client)

def _receive_from_up_client(self, up_client_socket):
def _receive_from_up_client(self, up_client_socket: socket.socket):
"""
Callback function for receiving data from up-client sockets.
:param up_client_socket: The client socket.
"""
try:

recv_data = up_client_socket.recv(BYTES_MSG_LENGTH)
recv_data: bytes = up_client_socket.recv(BYTES_MSG_LENGTH)

if not recv_data or recv_data == b"":
self._close_socket(up_client_socket)
if recv_data == b"":
self._close_connected_socket(up_client_socket)
return

logger.info(f"received data: {recv_data}")
self._flood_to_sockets(up_client_socket, recv_data)
self._flood_to_sockets(recv_data)
except:
logger.error("Received error while reading data from up-client")
self._close_socket(up_client_socket)
self._close_connected_socket(up_client_socket)

def _flood_to_sockets(self, sender_socket, data):
def _flood_to_sockets(self, data: bytes):
"""
Flood data from a sender socket to all other connected sockets.
:param sender_socket: The socket from which the data is being sent.
:param data: The data to be sent.
"""
with self.lock:
for up_client_socket in self.connected_sockets.copy(): # copy() to avoid RuntimeError
# if up_client_socket != sender_socket:
try:
up_client_socket.sendall(data)
except ConnectionAbortedError as e:
logger.error(f"Error sending data to {up_client_socket.getpeername()}: {e}")
self._close_socket(up_client_socket)
# for up_client_socket in self.connected_sockets.copy(): # copy() to avoid RuntimeError
for up_client_socket in self.connected_sockets:
try:
up_client_socket.sendall(data)
except ConnectionAbortedError as e:
logger.error(f"Error sending data to {up_client_socket.getpeername()}: {e}")
self._close_connected_socket(up_client_socket)

def listen_for_client_connections(self):
"""
Start listening for client connections and handle events.
"""
while True:
events = self.selector.select()
while not self.dispatcher_exit:
events = self.selector.select(timeout=0)
for key, _ in events:
callback = key.data
callback(key.fileobj)

def _close_socket(self, up_client_socket):
def _close_connected_socket(self, up_client_socket: socket.socket):
"""
Close a client socket and unregister it from the selector.
:param up_client_socket: The client socket to be closed.
"""
logger.info(f"closing socket {up_client_socket.getpeername()}")
with self.lock:
self.connected_sockets.remove(up_client_socket)

self.selector.unregister(up_client_socket)
up_client_socket.close()

def close(self):
self.dispatcher_exit = True
for utransport_socket in self.connected_sockets.copy():
self._close_connected_socket(utransport_socket)
# Close server socket
try:
with self.lock:
self.connected_sockets.remove(up_client_socket)
self.selector.unregister(up_client_socket)
up_client_socket.close()
except:
pass


if __name__ == '__main__':
dispatcher = Dispatcher()
thread = Thread(target=dispatcher.listen_for_client_connections)
thread.start()
self.selector.unregister(self.server)
self.server.close()
logger.info("Server socket closed!")
except Exception as e:
logger.error(f"Error closing server socket: {e}")

# Close selector
self.selector.close()
logger.info("Dispatcher closed!")
121 changes: 104 additions & 17 deletions test_agent/java/src/main/java/org/eclipse/uprotocol/ProtoConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,24 @@

package org.eclipse.uprotocol;

import com.google.gson.Gson;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;

import com.google.protobuf.Descriptors.EnumValueDescriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

public class ProtoConverter {
import org.json.JSONArray;
import org.json.JSONObject;

import java.nio.charset.StandardCharsets;

public class ProtoConverter {

public static Message dictToProto(Map<String, Object> parentJsonObj, Message.Builder parentProtoObj) {
populateFields(parentJsonObj, parentProtoObj);
return parentProtoObj.build();
Expand All @@ -49,7 +55,7 @@ private static void populateFields(Map<String, Object> jsonObj, Message.Builder

if (fieldDescriptor != null) {
if (value instanceof String && ((String) value).startsWith("BYTES:")) {
String byteString = ((String) value).substring(7); // Remove 'BYTES:' prefix
String byteString = ((String) value).substring(6); // Remove 'BYTES:' prefix
ByteString byteValue = ByteString.copyFromUtf8(byteString);
protoObj.setField(fieldDescriptor, byteValue);
} else {
Expand Down Expand Up @@ -95,23 +101,104 @@ private static void setFieldValue(Message.Builder protoObj, Descriptors.FieldDes
}
break;
default:
// Handle other types as needed
break;
}
}

public static JSONObject convertMessageToJSON(Message message) {
JSONObject result = new JSONObject();

List<FieldDescriptor> allFields = message.getDescriptorForType().getFields();
for (FieldDescriptor field : allFields) {
String fieldName = field.getName();
Object defaultOrSetValue = message.getField(field);
Object value = getattr(message, field, defaultOrSetValue);

if (value instanceof byte[]) {
value = new String((byte[]) value, StandardCharsets.UTF_8);
}

if (value instanceof Message) {
result.put(fieldName, convertMessageToJSON((Message) value));
}
else if (field.isRepeated()) {
JSONArray repeated = new JSONArray();
for(Object subMsg: (List<Object>) value) {
if (subMsg instanceof Message) {
repeated.put( convertMessageToJSON((Message) subMsg) );
}
else{
repeated.put(subMsg);
}
}
result.put(fieldName, repeated);

}
else if (field.isRequired() || field.isOptional()) {
result.put(fieldName, value);
}
}

return result;
}

public static Map<String, Object> convertMessageToMap(Message message) {
Map<String, Object> map;
JsonFormat.Printer printer = JsonFormat.printer().includingDefaultValueFields().preservingProtoFieldNames();
Gson gson = new Gson();

try {
String jsonString = printer.print(message);
map = gson.fromJson(jsonString, Map.class);
} catch (InvalidProtocolBufferException ex) {
map = new HashMap<>();
Map<String, Object> result = new HashMap<>();

List<FieldDescriptor> allFields = message.getDescriptorForType().getFields();
for (FieldDescriptor field : allFields) {
String fieldName = field.getName();
Object defaultOrSetValue = message.getField(field);
Object value = getattr(message, field, defaultOrSetValue);
if (value instanceof EnumValueDescriptor) {
value = ((EnumValueDescriptor) value).getNumber();
}

if (value instanceof ByteString) {
value = ((ByteString) value).toStringUtf8();
}


if (value instanceof Message) {
result.put(fieldName, convertMessageToMap((Message) value));
}
else if (field.isRepeated()) {
List<Object> repeated = new ArrayList<>();
for(Object subMsg: (List<Object>) value) {
if (subMsg instanceof Message) {
repeated.add( convertMessageToMap((Message) subMsg) );
}
else{
repeated.add(subMsg);
}
}
result.put(fieldName, repeated);

}
else if (field.isRequired() || field.isOptional()) {
result.put(fieldName, value);
}
}

return result;
}

public static Object getattr(Message message, FieldDescriptor field, Object defaultValue) {
try {
Map<FieldDescriptor, Object> fields2Values = message.getAllFields();
Object value = fields2Values.get(field);

if (value == null) {
return defaultValue;
}
else {
return value;
}

}catch (Exception e) {
System.out.println(e);
return defaultValue;
}
return map;
}


Expand Down
Loading

0 comments on commit 72e4b66

Please sign in to comment.