Skip to content

Commit

Permalink
Make updates based on comments
Browse files Browse the repository at this point in the history
* Make updates from Divya Vuppala's comments.
  • Loading branch information
matthewd0123 committed Jul 15, 2024
1 parent 3bac7dd commit c72d538
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class TestAgent {
private static final Logger logger = Logger.getLogger("JavaTestAgent");
private static final UListener listener = TestAgent::handleOnReceive;
private static final Gson gson = new Gson();
private static final UUri RESPONSE_URI;

static {
actionHandlers.put(ActionCommands.SEND_COMMAND, TestAgent::handleSendCommand);
Expand All @@ -81,9 +82,13 @@ public class TestAgent {
actionHandlers.put(ActionCommands.VALIDATE_UATTRIBUTES, TestAgent::handleUAttributesValidateCommand);
}

static {
RESPONSE_URI = UUri.newBuilder().setUeId(1).setUeVersionMajor(1).setResourceId(0).build();
}

static {
try {
transport = new SocketUTransport();
transport = new SocketUTransport(RESPONSE_URI);
clientSocket = new Socket(Constant.TEST_MANAGER_IP, Constant.TEST_MANAGER_PORT);
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
4 changes: 3 additions & 1 deletion test_agent/python/testagent.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
logger = logging.getLogger("File:Line# Debugger")
logger.setLevel(logging.DEBUG)

RESPONSE_URI = UUri(ue_id=1, ue_version_major=1, resource_id=0)


class SocketUListener(UListener):
def on_receive(self, umsg: UMessage) -> None:
Expand Down Expand Up @@ -458,7 +460,7 @@ def receive_from_tm():

if __name__ == "__main__":
listener = SocketUListener()
transport = SocketUTransport()
transport = SocketUTransport(RESPONSE_URI)
ta_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ta_socket.connect(constants.TEST_MANAGER_ADDR)
thread = Thread(target=receive_from_tm)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,16 @@ public class SocketUTransport implements UTransport, RpcClient {
private static final String DISPATCHER_IP = "127.0.0.1";
private static final Integer DISPATCHER_PORT = 44444;
private static final int BYTES_MSG_LENGTH = 32767;
private static final UUri RESPONSE_URI;

static {
RESPONSE_URI = UUri.newBuilder().setUeId(1).setUeVersionMajor(1).setResourceId(0).build();
}

private final Socket socket;
private final ConcurrentHashMap<UUID, CompletionStage<UPayload>> reqid_to_future;
private final ConcurrentHashMap<UUri, ArrayList<UListener>> uri_to_listener;
private final Object lock = new Object();

private UUri source;

public SocketUTransport() throws IOException {
public SocketUTransport(UUri newSource) throws IOException {
source = newSource;
reqid_to_future = new ConcurrentHashMap<>();
uri_to_listener = new ConcurrentHashMap<>();
socket = new Socket(DISPATCHER_IP, DISPATCHER_PORT);
Expand Down Expand Up @@ -246,7 +243,7 @@ public CompletionStage<UStatus> unregisterListener(UUri sourceFilter, UUri sinkF
* @return A CompletableFuture that will hold the response message for the request.
*/
public CompletionStage<UPayload> invokeMethod(UUri methodUri, UPayload requestPayload, CallOptions options) {
UMessage umsg = UMessageBuilder.request(RESPONSE_URI, methodUri, options.timeout()).build(requestPayload);
UMessage umsg = UMessageBuilder.request(source, methodUri, options.timeout()).build(requestPayload);
UUID requestId = umsg.getAttributes().getId();
CompletionStage<UPayload> responseFuture = new CompletableFuture<>();
reqid_to_future.put(requestId, responseFuture);
Expand Down Expand Up @@ -298,6 +295,6 @@ public void close() {
* @return The source.
*/
public UUri getSource() {
return RESPONSE_URI;
return source;
}
}
16 changes: 12 additions & 4 deletions up_client_socket/python/socket_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
logger = logging.getLogger(__name__)
DISPATCHER_ADDR: tuple = ("127.0.0.1", 44444)
BYTES_MSG_LENGTH: int = 32767
RESPONSE_URI = UUri(ue_id=1, ue_version_major=1, resource_id=0)


def timeout_counter(response, req_id, timeout):
Expand All @@ -59,11 +58,13 @@ def timeout_counter(response, req_id, timeout):


class SocketUTransport(UTransport, RpcClient):
def __init__(self):
def __init__(self, source: UUri):
"""
Creates a uEntity with Socket Connection, as well as a map of registered topics.
param source: The URI associated with the UTransport.
"""

self.source = source
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect(DISPATCHER_ADDR)

Expand Down Expand Up @@ -189,7 +190,7 @@ def invoke_method(self, method_uri: UUri, request_payload: UPayload, options: Ca
"""
Invokes a method with the provided URI, request payload, and options.
"""
umsg = UMessageBuilder.request(RESPONSE_URI, method_uri, options.timeout).build_from_upayload(request_payload)
umsg = UMessageBuilder.request(self.source, method_uri, options.timeout).build_from_upayload(request_payload)
# Get uAttributes's request id
request_id = umsg.attributes.id

Expand All @@ -207,4 +208,11 @@ def get_source(self) -> UUri:
"""
Returns the source URI of the UTransport.
"""
return RESPONSE_URI
return self.source

def close(self):
"""
Closes the socket connection.
"""
self.socket.close()
logger.info(f"{self.__class__.__name__} Socket Connection Closed")

0 comments on commit c72d538

Please sign in to comment.