From fcdf7bca11cea0a7eb67983fc09e2fc516779577 Mon Sep 17 00:00:00 2001 From: KonradBkd <117755498+KonradBkd@users.noreply.github.com> Date: Tue, 4 Feb 2025 13:41:44 +0100 Subject: [PATCH] SILKIT-1691: Prevent creation of multiple RPCServerInternal for same ClientUUID (#178) * dev: Prevent creation of multiple RPCServerInternal for same ClientUUID --- SilKit/IntegrationTests/CMakeLists.txt | 4 + .../IntegrationTests/ITest_LabelsMatching.cpp | 200 ++++++++++++++++++ SilKit/source/services/rpc/RpcServer.cpp | 19 +- SilKit/source/services/rpc/RpcServer.hpp | 3 +- 4 files changed, 219 insertions(+), 7 deletions(-) create mode 100644 SilKit/IntegrationTests/ITest_LabelsMatching.cpp diff --git a/SilKit/IntegrationTests/CMakeLists.txt b/SilKit/IntegrationTests/CMakeLists.txt index d0170b6d3..dc57b9fe3 100644 --- a/SilKit/IntegrationTests/CMakeLists.txt +++ b/SilKit/IntegrationTests/CMakeLists.txt @@ -75,6 +75,10 @@ add_silkit_test_to_executable(SilKitInternalIntegrationTests SOURCES ITest_Internals_DataPubSub.cpp ) +add_silkit_test_to_executable(SilKitIntegrationTests + SOURCES ITest_LabelsMatching.cpp +) + add_silkit_test_to_executable(SilKitInternalIntegrationTests SOURCES ITest_Internals_TargetedMessaging.cpp ) diff --git a/SilKit/IntegrationTests/ITest_LabelsMatching.cpp b/SilKit/IntegrationTests/ITest_LabelsMatching.cpp new file mode 100644 index 000000000..d37bd2bdf --- /dev/null +++ b/SilKit/IntegrationTests/ITest_LabelsMatching.cpp @@ -0,0 +1,200 @@ +// SPDX-FileCopyrightText: 2025 Vector Informatik GmbH +// +// SPDX-License-Identifier: MIT + +#include "ITestFixture.hpp" + +#include "silkit/services/pubsub/all.hpp" +#include "silkit/services/rpc/all.hpp" + +namespace { +using namespace SilKit::Tests; +using namespace SilKit::Config; +using namespace SilKit::Services; +using namespace SilKit::Services::PubSub; +using namespace SilKit::Services::Rpc; +using namespace std::chrono_literals; + +struct ITest_LabelMatching: ITest_SimTestHarness +{ + using ITest_SimTestHarness::ITest_SimTestHarness; +}; + +TEST_F(ITest_LabelMatching, pubsub_multiple_controllers_same_topic_different_labels) +{ + SetupFromParticipantList({"Pub1", "Sub1"}); + + auto topic = "T"; + auto mediaType = "M"; + + size_t numReceivedSubCtrl1{0}; + size_t numReceivedSubCtrl2{0}; + size_t numReceivedSubCtrl3{0}; + + { + ///////////////////////////////////////////////////////////////////////// + // Sub1 + ///////////////////////////////////////////////////////////////////////// + const auto participantName = "Sub1"; + auto&& simParticipant = _simTestHarness->GetParticipant(participantName); + auto&& participant = simParticipant->Participant(); + + PubSubSpec spec1{topic, mediaType}; + spec1.AddLabel("K", "L1", MatchingLabel::Kind::Mandatory); + participant->CreateDataSubscriber( + "SubCtrl1", spec1, + [&numReceivedSubCtrl1](IDataSubscriber* /*subscriber*/, const DataMessageEvent& /*dataMessageEvent*/) { + numReceivedSubCtrl1++; + }); + + PubSubSpec spec2{topic, mediaType}; + spec2.AddLabel("K", "L2", MatchingLabel::Kind::Mandatory); + participant->CreateDataSubscriber( + "SubCtrl2", spec2, + [&numReceivedSubCtrl2](IDataSubscriber* /*subscriber*/, const DataMessageEvent& /*dataMessageEvent*/) { + numReceivedSubCtrl2++; + }); + + PubSubSpec spec3{topic, mediaType}; + spec3.AddLabel("K", "L3", MatchingLabel::Kind::Mandatory); + participant->CreateDataSubscriber( + "SubCtrl3", spec3, + [&numReceivedSubCtrl3](IDataSubscriber* /*subscriber*/, const DataMessageEvent& /*dataMessageEvent*/) { + numReceivedSubCtrl3++; + }); + } + + { + ///////////////////////////////////////////////////////////////////////// + // Pub1 + ///////////////////////////////////////////////////////////////////////// + const auto participantName = "Pub1"; + auto&& simParticipant = _simTestHarness->GetParticipant(participantName); + auto&& participant = simParticipant->Participant(); + auto&& lifecycleService = simParticipant->GetOrCreateLifecycleService(); + auto&& timeSyncService = simParticipant->GetOrCreateTimeSyncService(); + + PubSubSpec spec1{topic, mediaType}; + spec1.AddLabel("K", "L1", MatchingLabel::Kind::Optional); + auto&& dataPublisher1 = participant->CreateDataPublisher("PubCtrl1", spec1, 0); + + PubSubSpec spec2{topic, mediaType}; + spec2.AddLabel("K", "L2", MatchingLabel::Kind::Optional); + auto&& dataPublisher2 = participant->CreateDataPublisher("PubCtrl2", spec2, 0); + + PubSubSpec spec3{topic, mediaType}; + spec3.AddLabel("K", "L3", MatchingLabel::Kind::Optional); + auto&& dataPublisher3 = participant->CreateDataPublisher("PubCtrl3", spec3, 0); + + + timeSyncService->SetSimulationStepHandler( + [dataPublisher1, dataPublisher2, dataPublisher3, lifecycleService](auto now, auto) { + if (now == 0ms) + { + dataPublisher1->Publish(std::vector{1}); + dataPublisher2->Publish(std::vector{2}); + dataPublisher3->Publish(std::vector{3}); + } + else if (now == 10ms) + { + lifecycleService->Stop("Stopping test"); + } + }, + 1ms); + } + + + _simTestHarness->Run(1s); + + EXPECT_EQ(numReceivedSubCtrl1, 1); + EXPECT_EQ(numReceivedSubCtrl2, 1); + EXPECT_EQ(numReceivedSubCtrl3, 1); +} + +TEST_F(ITest_LabelMatching, rpc_multiple_controllers_same_topic_different_labels) +{ + // This test ensures that we do not regress on SILKIT-1691 + + SetupFromParticipantList({"Client1", "Server1"}); + + auto functionName = "F"; + auto mediaType = "M"; + + size_t numReceivedCallResults1{0}; + + size_t numReceivedCalls1{0}; + size_t numReceivedCalls2{0}; + size_t numReceivedCalls3{0}; + + { + ///////////////////////////////////////////////////////////////////////// + // Server1 + ///////////////////////////////////////////////////////////////////////// + const auto participantName = "Server1"; + auto&& simParticipant = _simTestHarness->GetParticipant(participantName); + auto&& participant = simParticipant->Participant(); + + RpcSpec spec1{functionName, mediaType}; + spec1.AddLabel("K", "L1", MatchingLabel::Kind::Mandatory); + participant->CreateRpcServer( + "ServerCtrl1", spec1, + [&numReceivedCalls1](IRpcServer* server, RpcCallEvent event) { + numReceivedCalls1++; + server->SubmitResult(event.callHandle, std::vector{1}); + }); + + RpcSpec spec2{functionName, mediaType}; + spec2.AddLabel("K", "L2", MatchingLabel::Kind::Mandatory); + participant->CreateRpcServer("ServerCtrl2", spec2, + [&numReceivedCalls2](IRpcServer* server, RpcCallEvent event) { + numReceivedCalls2++; + server->SubmitResult(event.callHandle, std::vector{2}); + }); + + RpcSpec spec3{functionName, mediaType}; + spec3.AddLabel("K", "L3", MatchingLabel::Kind::Mandatory); + participant->CreateRpcServer("ServerCtrl3", spec3, + [&numReceivedCalls3](IRpcServer* server, RpcCallEvent event) { + numReceivedCalls3++; + server->SubmitResult(event.callHandle, std::vector{3}); + }); + } + + { + ///////////////////////////////////////////////////////////////////////// + // Client1 + ///////////////////////////////////////////////////////////////////////// + const auto participantName = "Client1"; + auto&& simParticipant = _simTestHarness->GetParticipant(participantName); + auto&& participant = simParticipant->Participant(); + auto&& lifecycleService = simParticipant->GetOrCreateLifecycleService(); + auto&& timeSyncService = simParticipant->GetOrCreateTimeSyncService(); + + RpcSpec spec1{functionName, mediaType}; + spec1.AddLabel("K", "L1", MatchingLabel::Kind::Optional); + auto&& client1 = participant->CreateRpcClient( + "ClientCtrl1", spec1, [&numReceivedCallResults1](auto*, const auto&) { numReceivedCallResults1++; }); + + timeSyncService->SetSimulationStepHandler( + [client1, lifecycleService](auto now, auto) { + if (now == 0ms) + { + client1->Call(std::vector{1}); + } + else if (now == 10ms) + { + lifecycleService->Stop("Stopping test"); + } + }, 1ms); + } + + _simTestHarness->Run(1s); + + EXPECT_EQ(numReceivedCallResults1, 1); + + EXPECT_EQ(numReceivedCalls1, 1); + EXPECT_EQ(numReceivedCalls2, 0); + EXPECT_EQ(numReceivedCalls3, 0); +} + +} //end namespace diff --git a/SilKit/source/services/rpc/RpcServer.cpp b/SilKit/source/services/rpc/RpcServer.cpp index 3a0d0b716..48052c076 100644 --- a/SilKit/source/services/rpc/RpcServer.cpp +++ b/SilKit/source/services/rpc/RpcServer.cpp @@ -58,9 +58,16 @@ void RpcServer::RegisterServiceDiscovery() return tmp; }; + auto clientUUID = getVal(Core::Discovery::supplKeyRpcClientUUID); + + // Early abort creation if Client is already connected + if (_internalRpcServers.count(clientUUID) > 0) + { + return; + } + auto functionName = getVal(Core::Discovery::supplKeyRpcClientFunctionName); auto clientMediaType = getVal(Core::Discovery::supplKeyRpcClientMediaType); - auto clientUUID = getVal(Core::Discovery::supplKeyRpcClientUUID); std::string labelsStr = getVal(Core::Discovery::supplKeyRpcClientLabels); auto clientLabels = SilKit::Config::Deserialize>(labelsStr); @@ -96,9 +103,9 @@ void RpcServer::SubmitResult(IRpcCallHandle* callHandle, Util::Span lock{_internalRpcServersMx}; - for (auto* internalRpcServer : _internalRpcServers) + for (auto internalRpcServer : _internalRpcServers) { - submitResultCounter += (internalRpcServer->SubmitResult(callHandle, resultData) ? 1 : 0); + submitResultCounter += (internalRpcServer.second->SubmitResult(callHandle, resultData) ? 1 : 0); } } @@ -118,7 +125,7 @@ void RpcServer::AddInternalRpcServer(const std::string& clientUUID, std::string _dataSpec.FunctionName(), clientUUID, joinedMediaType, clientLabels, _handler, this)); std::unique_lock lock{_internalRpcServersMx}; - _internalRpcServers.push_back(internalRpcServer); + _internalRpcServers.emplace(clientUUID, internalRpcServer); } void RpcServer::SetCallHandler(RpcCallHandler handler) @@ -126,9 +133,9 @@ void RpcServer::SetCallHandler(RpcCallHandler handler) _handler = handler; std::unique_lock lock{_internalRpcServersMx}; - for (auto* internalRpcServer : _internalRpcServers) + for (auto internalRpcServer : _internalRpcServers) { - internalRpcServer->SetRpcHandler(handler); + internalRpcServer.second->SetRpcHandler(handler); } } diff --git a/SilKit/source/services/rpc/RpcServer.hpp b/SilKit/source/services/rpc/RpcServer.hpp index 0b9757198..9a0e42ef3 100644 --- a/SilKit/source/services/rpc/RpcServer.hpp +++ b/SilKit/source/services/rpc/RpcServer.hpp @@ -23,6 +23,7 @@ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #include #include +#include #include "silkit/services/rpc/IRpcServer.hpp" #include "silkit/services/rpc/IRpcCallHandle.hpp" @@ -73,7 +74,7 @@ class RpcServer Core::IParticipantInternal* _participant{nullptr}; std::mutex _internalRpcServersMx; - std::vector _internalRpcServers; + std::unordered_map _internalRpcServers; }; // ================================================================================