diff --git a/test_agent/java/src/main/java/org/eclipse/uprotocol/TestAgent.java b/test_agent/java/src/main/java/org/eclipse/uprotocol/TestAgent.java index 9b0682b5..15213a83 100644 --- a/test_agent/java/src/main/java/org/eclipse/uprotocol/TestAgent.java +++ b/test_agent/java/src/main/java/org/eclipse/uprotocol/TestAgent.java @@ -64,6 +64,7 @@ public class TestAgent { private static final Logger logger = Logger.getLogger("JavaTestAgent"); private static final UListener listener = TestAgent::handleOnReceive; private static final Gson gson = new Gson(); + private static final UUri RESPONSE_URI; static { actionHandlers.put(ActionCommands.SEND_COMMAND, TestAgent::handleSendCommand); @@ -79,9 +80,13 @@ public class TestAgent { actionHandlers.put(ActionCommands.VALIDATE_UATTRIBUTES, TestAgent::handleUAttributesValidateCommand); } + static { + RESPONSE_URI = UUri.newBuilder().setUeId(1).setUeVersionMajor(1).setResourceId(0).build(); + } + static { try { - transport = new SocketUTransport(); + transport = new SocketUTransport(RESPONSE_URI); clientSocket = new Socket(Constant.TEST_MANAGER_IP, Constant.TEST_MANAGER_PORT); } catch (IOException e) { throw new RuntimeException(e); diff --git a/test_agent/python/testagent.py b/test_agent/python/testagent.py index 2e98986c..8c7e6f45 100644 --- a/test_agent/python/testagent.py +++ b/test_agent/python/testagent.py @@ -61,12 +61,14 @@ logger = logging.getLogger("File:Line# Debugger") logger.setLevel(logging.DEBUG) +RESPONSE_URI = UUri(ue_id=1, ue_version_major=1, resource_id=0) + class SocketUListener(UListener): def on_receive(self, umsg: UMessage) -> None: logger.info("Listener received") if umsg.attributes.type == UMessageType.UMESSAGE_TYPE_REQUEST: - attributes = UMessageBuilder.response( + message = UMessageBuilder.response( umsg.attributes.sink, umsg.attributes.source, umsg.attributes.id, @@ -74,7 +76,7 @@ def on_receive(self, umsg: UMessage) -> None: any_obj = any_pb2.Any() any_obj.Pack(StringValue(value="SuccessRPCResponse")) res_msg = UMessage( - attributes=attributes, + attributes=message.attributes, payload=UPayload( value=any_obj.SerializeToString(), format=UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY, @@ -479,7 +481,7 @@ def receive_from_tm(): if __name__ == "__main__": listener = SocketUListener() - transport = SocketUTransport() + transport = SocketUTransport(RESPONSE_URI) ta_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ta_socket.connect(constants.TEST_MANAGER_ADDR) thread = Thread(target=receive_from_tm) diff --git a/up_client_socket/java/src/main/java/org/eclipse/uprotocol/SocketUTransport.java b/up_client_socket/java/src/main/java/org/eclipse/uprotocol/SocketUTransport.java index 2784e435..21008749 100644 --- a/up_client_socket/java/src/main/java/org/eclipse/uprotocol/SocketUTransport.java +++ b/up_client_socket/java/src/main/java/org/eclipse/uprotocol/SocketUTransport.java @@ -29,7 +29,6 @@ import org.eclipse.uprotocol.uri.validator.UriValidator; import org.eclipse.uprotocol.communication.CallOptions; import org.eclipse.uprotocol.v1.*; -import org.eclipse.uprotocol.validation.ValidationResult; import java.io.IOException; import java.io.InputStream; @@ -46,19 +45,16 @@ public class SocketUTransport implements UTransport, RpcClient { private static final String DISPATCHER_IP = "127.0.0.1"; private static final Integer DISPATCHER_PORT = 44444; private static final int BYTES_MSG_LENGTH = 32767; - private static final UUri RESPONSE_URI; - - static { - RESPONSE_URI = UUri.newBuilder().setUeId(1).setUeVersionMajor(1).setResourceId(0).build(); - } private final Socket socket; private final ConcurrentHashMap> reqid_to_future; private final ConcurrentHashMap> uri_to_listener; private final Object lock = new Object(); + private UUri source; - public SocketUTransport() throws IOException { + public SocketUTransport(UUri newSource) throws IOException { + source = newSource; reqid_to_future = new ConcurrentHashMap<>(); uri_to_listener = new ConcurrentHashMap<>(); socket = new Socket(DISPATCHER_IP, DISPATCHER_PORT); @@ -242,7 +238,7 @@ public CompletionStage unregisterListener(UUri sourceFilter, UUri sinkF * @return A CompletableFuture that will hold the response message for the request. */ public CompletionStage invokeMethod(UUri methodUri, UPayload requestPayload, CallOptions options) { - UMessage umsg = UMessageBuilder.request(RESPONSE_URI, methodUri, options.timeout()).build(requestPayload); + UMessage umsg = UMessageBuilder.request(source, methodUri, options.timeout()).build(requestPayload); UUID requestId = umsg.getAttributes().getId(); CompletionStage responseFuture = new CompletableFuture<>(); reqid_to_future.put(requestId, responseFuture); @@ -292,6 +288,6 @@ public void close() { * @return The source. */ public UUri getSource() { - return RESPONSE_URI; + return source; } } diff --git a/up_client_socket/python/socket_transport.py b/up_client_socket/python/socket_transport.py index 2c8f24cf..d27dbec4 100644 --- a/up_client_socket/python/socket_transport.py +++ b/up_client_socket/python/socket_transport.py @@ -42,7 +42,6 @@ logger = logging.getLogger(__name__) DISPATCHER_ADDR: tuple = ("127.0.0.1", 44444) BYTES_MSG_LENGTH: int = 32767 -RESPONSE_URI = UUri(ue_id=1, ue_version_major=1, resource_id=0) def timeout_counter(response, req_id, timeout): @@ -60,11 +59,13 @@ def timeout_counter(response, req_id, timeout): class SocketUTransport(UTransport, RpcClient): - def __init__(self): + def __init__(self, source: UUri): """ Creates a uEntity with Socket Connection, as well as a map of registered topics. + param source: The URI associated with the UTransport. """ + self.source = source self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect(DISPATCHER_ADDR) @@ -196,7 +197,7 @@ def invoke_method(self, method_uri: UUri, request_payload: UPayload, options: Ca """ Invokes a method with the provided URI, request payload, and options. """ - umsg = UMessageBuilder.request(RESPONSE_URI, method_uri, options.timeout).build_from_upayload(request_payload) + umsg = UMessageBuilder.request(self.source, method_uri, options.timeout).build_from_upayload(request_payload) # Get uAttributes's request id request_id = umsg.attributes.id @@ -214,4 +215,11 @@ def get_source(self) -> UUri: """ Returns the source URI of the UTransport. """ - return RESPONSE_URI + return self.source + + def close(self): + """ + Closes the socket connection. + """ + self.socket.close() + logger.info(f"{self.__class__.__name__} Socket Connection Closed")