Skip to content

Commit

Permalink
Make updates based on comments
Browse files Browse the repository at this point in the history
* Make updates from Divya Vuppala's comments.
  • Loading branch information
matthewd0123 committed Jul 10, 2024
1 parent 89608bd commit 5ddcc56
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class TestAgent {
private static final Logger logger = Logger.getLogger("JavaTestAgent");
private static final UListener listener = TestAgent::handleOnReceive;
private static final Gson gson = new Gson();
private static final UUri RESPONSE_URI;

static {
actionHandlers.put(ActionCommands.SEND_COMMAND, TestAgent::handleSendCommand);
Expand All @@ -79,9 +80,13 @@ public class TestAgent {
actionHandlers.put(ActionCommands.VALIDATE_UATTRIBUTES, TestAgent::handleUAttributesValidateCommand);
}

static {
RESPONSE_URI = UUri.newBuilder().setUeId(1).setUeVersionMajor(1).setResourceId(0).build();
}

static {
try {
transport = new SocketUTransport();
transport = new SocketUTransport(RESPONSE_URI);
clientSocket = new Socket(Constant.TEST_MANAGER_IP, Constant.TEST_MANAGER_PORT);
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
8 changes: 5 additions & 3 deletions test_agent/python/testagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,22 @@
logger = logging.getLogger("File:Line# Debugger")
logger.setLevel(logging.DEBUG)

RESPONSE_URI = UUri(ue_id=1, ue_version_major=1, resource_id=0)


class SocketUListener(UListener):
def on_receive(self, umsg: UMessage) -> None:
logger.info("Listener received")
if umsg.attributes.type == UMessageType.UMESSAGE_TYPE_REQUEST:
attributes = UMessageBuilder.response(
message = UMessageBuilder.response(
umsg.attributes.sink,
umsg.attributes.source,
umsg.attributes.id,
).build()
any_obj = any_pb2.Any()
any_obj.Pack(StringValue(value="SuccessRPCResponse"))
res_msg = UMessage(
attributes=attributes,
attributes=message.attributes,
payload=UPayload(
value=any_obj.SerializeToString(),
format=UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
Expand Down Expand Up @@ -479,7 +481,7 @@ def receive_from_tm():

if __name__ == "__main__":
listener = SocketUListener()
transport = SocketUTransport()
transport = SocketUTransport(RESPONSE_URI)
ta_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ta_socket.connect(constants.TEST_MANAGER_ADDR)
thread = Thread(target=receive_from_tm)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.eclipse.uprotocol.uri.validator.UriValidator;
import org.eclipse.uprotocol.communication.CallOptions;
import org.eclipse.uprotocol.v1.*;
import org.eclipse.uprotocol.validation.ValidationResult;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -46,19 +45,16 @@ public class SocketUTransport implements UTransport, RpcClient {
private static final String DISPATCHER_IP = "127.0.0.1";
private static final Integer DISPATCHER_PORT = 44444;
private static final int BYTES_MSG_LENGTH = 32767;
private static final UUri RESPONSE_URI;

static {
RESPONSE_URI = UUri.newBuilder().setUeId(1).setUeVersionMajor(1).setResourceId(0).build();
}

private final Socket socket;
private final ConcurrentHashMap<UUID, CompletionStage<UMessage>> reqid_to_future;
private final ConcurrentHashMap<UUri, ArrayList<UListener>> uri_to_listener;
private final Object lock = new Object();

private UUri source;

public SocketUTransport() throws IOException {
public SocketUTransport(UUri newSource) throws IOException {
source = newSource;
reqid_to_future = new ConcurrentHashMap<>();
uri_to_listener = new ConcurrentHashMap<>();
socket = new Socket(DISPATCHER_IP, DISPATCHER_PORT);
Expand Down Expand Up @@ -242,7 +238,7 @@ public CompletionStage<UStatus> unregisterListener(UUri sourceFilter, UUri sinkF
* @return A CompletableFuture that will hold the response message for the request.
*/
public CompletionStage<UPayload> invokeMethod(UUri methodUri, UPayload requestPayload, CallOptions options) {
UMessage umsg = UMessageBuilder.request(RESPONSE_URI, methodUri, options.timeout()).build(requestPayload);
UMessage umsg = UMessageBuilder.request(source, methodUri, options.timeout()).build(requestPayload);
UUID requestId = umsg.getAttributes().getId();
CompletionStage<UMessage> responseFuture = new CompletableFuture<>();
reqid_to_future.put(requestId, responseFuture);
Expand Down Expand Up @@ -292,6 +288,6 @@ public void close() {
* @return The source.
*/
public UUri getSource() {
return RESPONSE_URI;
return source;
}
}
16 changes: 12 additions & 4 deletions up_client_socket/python/socket_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
logger = logging.getLogger(__name__)
DISPATCHER_ADDR: tuple = ("127.0.0.1", 44444)
BYTES_MSG_LENGTH: int = 32767
RESPONSE_URI = UUri(ue_id=1, ue_version_major=1, resource_id=0)


def timeout_counter(response, req_id, timeout):
Expand All @@ -60,11 +59,13 @@ def timeout_counter(response, req_id, timeout):


class SocketUTransport(UTransport, RpcClient):
def __init__(self):
def __init__(self, source: UUri):
"""
Creates a uEntity with Socket Connection, as well as a map of registered topics.
param source: The URI associated with the UTransport.
"""

self.source = source
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect(DISPATCHER_ADDR)

Expand Down Expand Up @@ -196,7 +197,7 @@ def invoke_method(self, method_uri: UUri, request_payload: UPayload, options: Ca
"""
Invokes a method with the provided URI, request payload, and options.
"""
umsg = UMessageBuilder.request(RESPONSE_URI, method_uri, options.timeout).build_from_upayload(request_payload)
umsg = UMessageBuilder.request(self.source, method_uri, options.timeout).build_from_upayload(request_payload)
# Get uAttributes's request id
request_id = umsg.attributes.id

Expand All @@ -214,4 +215,11 @@ def get_source(self) -> UUri:
"""
Returns the source URI of the UTransport.
"""
return RESPONSE_URI
return self.source

def close(self):
"""
Closes the socket connection.
"""
self.socket.close()
logger.info(f"{self.__class__.__name__} Socket Connection Closed")

0 comments on commit 5ddcc56

Please sign in to comment.