Skip to content

Commit

Permalink
Changes to support streamer
Browse files Browse the repository at this point in the history
* Allow for arbitrary number of Test Agents in a given test
* Update rust to 0.1.1 release
* Remove invoke_method, as it is L2 API
* Add few tests for streamer
  • Loading branch information
matthewd0123 committed Jul 19, 2024
1 parent 7f30d18 commit f72166d
Show file tree
Hide file tree
Showing 30 changed files with 440 additions and 703 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/tck-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ jobs:
java-version: '17'
distribution: 'temurin'
cache: maven
- name: Install dependencies
run: |
cd scripts
python build_up_java_latest.py
#- name: Install dependencies
# run: |
# cd scripts
# python build_up_java_latest.py
- name: Build up_client_socket_java with Maven
working-directory: up_client_socket/java
run: |
Expand Down Expand Up @@ -88,11 +88,11 @@ jobs:
if ("ue2" in feature){
for (var language_two in feature["ue2"]){
var second_ue = feature["ue2"][language_two]
var command_str = "behave --define uE1=" + port_language + " --define uE2=" + second_ue + " --define transport=" + port_transport + " --format json --outfile './reports/" + feature["feature_name"] + "_" + port_language + "_" + second_ue + ".json' --format html --outfile './reports/" + feature["feature_name"] + "_" + port_language + "_" + second_ue + ".html' './features/tests/" + feature["path"] + "/" + feature["feature_name"] + ".feature'"
var command_str = "behave --define uE1=" + port_language + " --define uE2=" + second_ue + " --define transport1=" + port_transport + " --define transport2=" + port_transport + " --format json --outfile './reports/" + feature["feature_name"] + "_" + port_language + "_" + second_ue + ".json' --format html --outfile './reports/" + feature["feature_name"] + "_" + port_language + "_" + second_ue + ".html' './features/tests/" + feature["path"] + "/" + feature["feature_name"] + ".feature'"
command_list.push(command_str);
}
} else {
var command_str = "behave --define uE1=" + port_language + " --define transport=" + port_transport + " --format json --outfile './reports/" + feature["feature_name"] + "_" + port_language + ".json' --format html --outfile './reports/" + feature["feature_name"] + "_" + port_language + ".html' './features/tests/" + feature["path"] + "/" + feature["feature_name"] + ".feature'"
var command_str = "behave --define uE1=" + port_language + " --define transport1=" + port_transport + " --format json --outfile './reports/" + feature["feature_name"] + "_" + port_language + ".json' --format html --outfile './reports/" + feature["feature_name"] + "_" + port_language + ".html' './features/tests/" + feature["path"] + "/" + feature["feature_name"] + ".feature'"
command_list.push(command_str);
}
}
Expand Down
1 change: 1 addition & 0 deletions dispatcher/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def _flood_to_sockets(self, data: bytes):
"""
# for up_client_socket in self.connected_sockets.copy(): # copy() to avoid RuntimeError
for up_client_socket in self.connected_sockets:
logger.info(f"sending data to {up_client_socket.getpeername()}")
try:
up_client_socket.sendall(data)
except ConnectionAbortedError as e:
Expand Down
5 changes: 5 additions & 0 deletions test_agent/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<artifactId>json</artifactId>
<version>20231013</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.8.0</version>
</dependency>
</dependencies>
<build>
<finalName>tck-test-agent-java</finalName>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ public class ActionCommands {
public static final String SERIALIZE_UUID = "uuid_serialize";
public static final String DESERIALIZE_UUID = "uuid_deserialize";
public static final String VALIDATE_UATTRIBUTES = "uattributes_validate";
public static final String INITIALIZE_TRANSPORT = "initialize_transport";

}
14 changes: 14 additions & 0 deletions test_agent/java/src/main/java/org/eclipse/uprotocol/Test.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.eclipse.uprotocol;

import java.util.*;

public class Test {
public static void main(String[] args)
{
UUID uuid = UUID.fromString("0190c6fe-db8b-70c2-a28f-c98eb4b0d91c");


System.out.println("The least significant 64 bit: " + uuid.getLeastSignificantBits());
System.out.println("The most significant 64 bit: " + uuid.getMostSignificantBits());
}
}
78 changes: 52 additions & 26 deletions test_agent/java/src/main/java/org/eclipse/uprotocol/TestAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

package org.eclipse.uprotocol;

import org.apache.commons.cli.*;


import com.google.gson.Gson;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
Expand All @@ -31,6 +34,7 @@
import org.eclipse.uprotocol.Constants.ActionCommands;
import org.eclipse.uprotocol.Constants.Constant;
import org.eclipse.uprotocol.transport.UListener;
import org.eclipse.uprotocol.transport.UTransport;
import org.eclipse.uprotocol.transport.builder.UMessageBuilder;
import org.eclipse.uprotocol.transport.validate.UAttributesValidator;
import org.eclipse.uprotocol.uri.serializer.UriSerializer;
Expand All @@ -46,7 +50,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
Expand All @@ -60,26 +63,30 @@
import java.util.logging.Logger;

public class TestAgent {


private static final Socket clientSocket;
private static final SocketUTransport transport;
private static UTransport transport;
private static final Map<String, ActionHandler> actionHandlers = new HashMap<>();
private static final Logger logger = Logger.getLogger("JavaTestAgent");
private static final UListener listener = TestAgent::handleOnReceive;
private static final Gson gson = new Gson();
private static final UUri RESPONSE_URI;
private static String transportName;
private static String sdkName;

static {
actionHandlers.put(ActionCommands.SEND_COMMAND, TestAgent::handleSendCommand);
actionHandlers.put(ActionCommands.REGISTER_LISTENER_COMMAND, TestAgent::handleRegisterListenerCommand);
actionHandlers.put(ActionCommands.UNREGISTER_LISTENER_COMMAND, TestAgent::handleUnregisterListenerCommand);
actionHandlers.put(ActionCommands.INVOKE_METHOD_COMMAND, TestAgent::handleInvokeMethodCommand);
actionHandlers.put(ActionCommands.SERIALIZE_URI, TestAgent::handleSerializeUriCommand);
actionHandlers.put(ActionCommands.DESERIALIZE_URI, TestAgent::handleDeserializeUriCommand);
actionHandlers.put(ActionCommands.VALIDATE_URI, TestAgent::handleValidateUriCommand);
actionHandlers.put(ActionCommands.VALIDATE_UUID, TestAgent::handleValidateUuidCommand);
actionHandlers.put(ActionCommands.SERIALIZE_UUID, TestAgent::handleSerializeUuidCommand);
actionHandlers.put(ActionCommands.DESERIALIZE_UUID, TestAgent::handleDeserializeUuidCommand);
actionHandlers.put(ActionCommands.VALIDATE_UATTRIBUTES, TestAgent::handleUAttributesValidateCommand);
actionHandlers.put(ActionCommands.INITIALIZE_TRANSPORT, TestAgent::handleInitializeTransportCommand);
}

static {
Expand Down Expand Up @@ -134,7 +141,7 @@ private static void sendToTestManager(Message proto, String action, String recei

private static void writeDataToTMSocket(JSONObject responseDict, String action) {
responseDict.put("action", action);
responseDict.put("ue", "java");
responseDict.put("ue", sdkName);
try {
OutputStream outputStream = clientSocket.getOutputStream();
outputStream.write(responseDict.toString().getBytes(StandardCharsets.UTF_8));
Expand All @@ -146,6 +153,18 @@ private static void writeDataToTMSocket(JSONObject responseDict, String action)
}
}

private static Object handleInitializeTransportCommand(Map<String, Object> jsonData) {
UUri uri = (UUri) ProtoConverter.dictToProto((Map<String, Object>) jsonData.get("data"), UUri.newBuilder());
try {
transport = new SocketUTransport(uri);
} catch (IOException e) {
e.printStackTrace();
sendToTestManager(Map.of("result", "INVALID_ARGUMENT", "message", ""), ActionCommands.INITIALIZE_TRANSPORT);
}
sendToTestManager(Map.of("result", "OK", "message", ""), ActionCommands.INITIALIZE_TRANSPORT);
return null;
}

private static CompletionStage<UStatus> handleSendCommand(Map<String, Object> jsonData) {
UMessage uMessage = (UMessage) ProtoConverter.dictToProto((Map<String, Object>) jsonData.get("data"),
UMessage.newBuilder());
Expand All @@ -165,27 +184,6 @@ private static CompletionStage<UStatus> handleUnregisterListenerCommand(Map<Stri
return transport.unregisterListener(uri, listener);
}

private static Object handleInvokeMethodCommand(Map<String, Object> jsonData) {
Map<String, Object> data = (Map<String, Object>) jsonData.get("data");
// Convert data and payload to protocol buffers
UUri uri = (UUri) ProtoConverter.dictToProto(data, UUri.newBuilder());
String payload = (String) data.get("payload");
ByteString value = null;
if (payload instanceof String && (payload).startsWith("BYTES:")) {
String byteString = (payload).substring(6); // Remove 'BYTES:' prefix
value = ByteString.copyFromUtf8(byteString);
} else if (payload instanceof String) {
value = ByteString.copyFromUtf8(payload);
}

UPayload setPayload = new UPayload(value, UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY);
CompletionStage<UPayload> responseFuture = transport.invokeMethod(uri, setPayload, CallOptions.DEFAULT);
responseFuture.whenComplete((responseMessage, exception) -> {
sendToTestManager(Map.of("payload", responseMessage.data().toStringUtf8()), ActionCommands.INVOKE_METHOD_COMMAND, (String) jsonData.get("test_id"));
});
return null;
}

private static Object handleSerializeUriCommand(Map<String, Object> jsonData) {
Map<String, Object> data = (Map<String, Object>) jsonData.get("data");
UUri uri = (UUri) ProtoConverter.dictToProto(data, UUri.newBuilder());
Expand Down Expand Up @@ -497,7 +495,35 @@ private static void handleOnReceive(UMessage uMessage) {

}

@SuppressWarnings("null")
public static void main(String[] args) {

Options options = new Options();

Option input = new Option("t", "transport", true, "Select Transport");
input.setRequired(true);
options.addOption(input);

Option output = new Option("s", "sdkname", true, "Select SDK Name");
output.setRequired(true);
options.addOption(output);

CommandLineParser parser = new DefaultParser();
HelpFormatter formatter = new HelpFormatter();
CommandLine cmd = null;//not a good practice, it serves it purpose

try {
cmd = parser.parse(options, args);
} catch (ParseException e) {
System.out.println(e.getMessage());
formatter.printHelp("utility-name", options);

System.exit(1);
}

transportName = cmd.getOptionValue("transport");
sdkName = cmd.getOptionValue("sdkname");

Thread receiveThread = new Thread(() -> {
try {
receiveFromTM();
Expand All @@ -509,7 +535,7 @@ public static void main(String[] args) {
});
receiveThread.start();
JSONObject obj = new JSONObject();
obj.put("SDK_name", "java");
obj.put("SDK_name", sdkName);
sendToTestManager(obj, "initialize");
}

Expand Down
1 change: 1 addition & 0 deletions test_agent/python/constants/actioncommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@
VALIDATE_UATTRIBUTES = "uattributes_validate"
MICRO_SERIALIZE_URI = "micro_serialize_uri"
MICRO_DESERIALIZE_URI = "micro_deserialize_uri"
INITIALIZE_TRANSPORT = "initialize_transport"
59 changes: 31 additions & 28 deletions test_agent/python/testagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import socket
import sys
import time
from concurrent.futures import Future
from argparse import ArgumentParser
from datetime import datetime, timezone
from threading import Thread
from typing import Any, Dict, List, Union
Expand All @@ -31,7 +31,6 @@
from google.protobuf.descriptor import FieldDescriptor
from google.protobuf.message import Message
from google.protobuf.wrappers_pb2 import StringValue
from uprotocol.communication.calloptions import CallOptions
from uprotocol.communication.upayload import UPayload
from uprotocol.transport.builder.umessagebuilder import UMessageBuilder
from uprotocol.transport.ulistener import UListener
Expand Down Expand Up @@ -63,6 +62,8 @@

RESPONSE_URI = UUri(ue_id=1, ue_version_major=1, resource_id=0)

sdkname = "python"
transport_name = "socket"

class SocketUListener(UListener):
def on_receive(self, umsg: UMessage) -> None:
Expand All @@ -83,6 +84,8 @@ def on_receive(self, umsg: UMessage) -> None:
else:
send_to_test_manager(umsg, actioncommands.RESPONSE_ON_RECEIVE)

transport = SocketUTransport(RESPONSE_URI)
listener = SocketUListener()

def message_to_dict(message: Message) -> Dict[str, Any]:
"""Converts protobuf Message to Dict and keeping respective data types
Expand Down Expand Up @@ -135,7 +138,7 @@ def send_to_test_manager(
response_dict = {
"data": response,
"action": action,
"ue": "python",
"ue": sdkname,
"test_id": received_test_id,
}
response_dict = json.dumps(response_dict).encode("utf-8")
Expand Down Expand Up @@ -198,27 +201,6 @@ def handle_unregister_listener_command(json_msg):
return transport.unregister_listener(uri, listener)


def handle_invoke_method_command(json_msg):
uri = dict_to_proto(json_msg["data"], UUri())
logger.info(json_msg["data"]["payload"])
value = json_msg["data"]["payload"]
if isinstance(value, str) and "BYTES:" in value:
value = value.replace("BYTES:", "")
value = value.encode("utf-8")
payload = UPayload(data=value)
res_future: Future = transport.invoke_method(uri, payload, CallOptions(timeout=10000))

def handle_response(message):
message: Message = message.result()
send_to_test_manager(
message,
actioncommands.INVOKE_METHOD_COMMAND,
received_test_id=json_msg["test_id"],
)

res_future.add_done_callback(handle_response)


def handle_serialize_uuri(json_msg: Dict[str, Any]):
uri: UUri = dict_to_proto(json_msg["data"], UUri())
serialized_uuri: str = UriSerializer.serialize(uri).lower()
Expand Down Expand Up @@ -420,19 +402,28 @@ def handle_uattributes_validate_command(json_msg: Dict[str, Any]):
received_test_id=json_msg["test_id"],
)

def handle_initialize_transport_command(json_msg: Dict[str, Any]):
global transport, listener
uri = dict_to_proto(json_msg["data"], UUri())
if transport_name == "socket":
transport = SocketUTransport(uri)
listener = SocketUListener()
else:
raise ValueError("Invalid transport name")
send_to_test_manager({"result": "OK", "message": ""}, actioncommands.INITIALIZE_TRANSPORT)

action_handlers = {
actioncommands.SEND_COMMAND: handle_send_command,
actioncommands.REGISTER_LISTENER_COMMAND: handle_register_listener_command,
actioncommands.UNREGISTER_LISTENER_COMMAND: handle_unregister_listener_command,
actioncommands.INVOKE_METHOD_COMMAND: handle_invoke_method_command,
actioncommands.SERIALIZE_URI: handle_serialize_uuri,
actioncommands.DESERIALIZE_URI: handle_deserialize_uri,
actioncommands.SERIALIZE_UUID: handle_serialize_uuid,
actioncommands.DESERIALIZE_UUID: handle_deserialize_uuid,
actioncommands.VALIDATE_URI: handle_uri_validate_command,
actioncommands.VALIDATE_UATTRIBUTES: handle_uattributes_validate_command,
actioncommands.VALIDATE_UUID: handle_uuid_validate_command,
actioncommands.INITIALIZE_TRANSPORT: handle_initialize_transport_command
}


Expand All @@ -459,10 +450,22 @@ def receive_from_tm():


if __name__ == "__main__":
listener = SocketUListener()
transport = SocketUTransport(RESPONSE_URI)
parser = ArgumentParser()

parser.add_argument("-t", "--transport", dest="transport", help="Select Transport", metavar="TRANSPORT")

parser.add_argument("-s", "--sdkname", dest="sdkname", help="Write SDK Name", metavar="SDKNAME")

args = parser.parse_args()

if args.sdkname is not None:
sdkname = args.sdkname

if args.transport is not None:
transport_name = args.transport

ta_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ta_socket.connect(constants.TEST_MANAGER_ADDR)
thread = Thread(target=receive_from_tm)
thread.start()
send_to_test_manager({"SDK_name": "python"}, "initialize")
send_to_test_manager({"SDK_name": sdkname}, "initialize")
Loading

0 comments on commit f72166d

Please sign in to comment.