Skip to content

Commit

Permalink
SILKIT-1691: Prevent creation of multiple RPCServerInternal for same …
Browse files Browse the repository at this point in the history
…ClientUUID (#178)

* dev: Prevent creation of multiple RPCServerInternal for same ClientUUID
  • Loading branch information
KonradBkd authored Feb 4, 2025
1 parent bf76816 commit fcdf7bc
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 7 deletions.
4 changes: 4 additions & 0 deletions SilKit/IntegrationTests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ add_silkit_test_to_executable(SilKitInternalIntegrationTests
SOURCES ITest_Internals_DataPubSub.cpp
)

add_silkit_test_to_executable(SilKitIntegrationTests
SOURCES ITest_LabelsMatching.cpp
)

add_silkit_test_to_executable(SilKitInternalIntegrationTests
SOURCES ITest_Internals_TargetedMessaging.cpp
)
Expand Down
200 changes: 200 additions & 0 deletions SilKit/IntegrationTests/ITest_LabelsMatching.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// SPDX-FileCopyrightText: 2025 Vector Informatik GmbH
//
// SPDX-License-Identifier: MIT

#include "ITestFixture.hpp"

#include "silkit/services/pubsub/all.hpp"
#include "silkit/services/rpc/all.hpp"

namespace {
using namespace SilKit::Tests;
using namespace SilKit::Config;
using namespace SilKit::Services;
using namespace SilKit::Services::PubSub;
using namespace SilKit::Services::Rpc;
using namespace std::chrono_literals;

struct ITest_LabelMatching: ITest_SimTestHarness
{
using ITest_SimTestHarness::ITest_SimTestHarness;
};

TEST_F(ITest_LabelMatching, pubsub_multiple_controllers_same_topic_different_labels)
{
SetupFromParticipantList({"Pub1", "Sub1"});

auto topic = "T";
auto mediaType = "M";

size_t numReceivedSubCtrl1{0};
size_t numReceivedSubCtrl2{0};
size_t numReceivedSubCtrl3{0};

{
/////////////////////////////////////////////////////////////////////////
// Sub1
/////////////////////////////////////////////////////////////////////////
const auto participantName = "Sub1";
auto&& simParticipant = _simTestHarness->GetParticipant(participantName);
auto&& participant = simParticipant->Participant();

PubSubSpec spec1{topic, mediaType};
spec1.AddLabel("K", "L1", MatchingLabel::Kind::Mandatory);
participant->CreateDataSubscriber(
"SubCtrl1", spec1,
[&numReceivedSubCtrl1](IDataSubscriber* /*subscriber*/, const DataMessageEvent& /*dataMessageEvent*/) {
numReceivedSubCtrl1++;
});

PubSubSpec spec2{topic, mediaType};
spec2.AddLabel("K", "L2", MatchingLabel::Kind::Mandatory);
participant->CreateDataSubscriber(
"SubCtrl2", spec2,
[&numReceivedSubCtrl2](IDataSubscriber* /*subscriber*/, const DataMessageEvent& /*dataMessageEvent*/) {
numReceivedSubCtrl2++;
});

PubSubSpec spec3{topic, mediaType};
spec3.AddLabel("K", "L3", MatchingLabel::Kind::Mandatory);
participant->CreateDataSubscriber(
"SubCtrl3", spec3,
[&numReceivedSubCtrl3](IDataSubscriber* /*subscriber*/, const DataMessageEvent& /*dataMessageEvent*/) {
numReceivedSubCtrl3++;
});
}

{
/////////////////////////////////////////////////////////////////////////
// Pub1
/////////////////////////////////////////////////////////////////////////
const auto participantName = "Pub1";
auto&& simParticipant = _simTestHarness->GetParticipant(participantName);
auto&& participant = simParticipant->Participant();
auto&& lifecycleService = simParticipant->GetOrCreateLifecycleService();
auto&& timeSyncService = simParticipant->GetOrCreateTimeSyncService();

PubSubSpec spec1{topic, mediaType};
spec1.AddLabel("K", "L1", MatchingLabel::Kind::Optional);
auto&& dataPublisher1 = participant->CreateDataPublisher("PubCtrl1", spec1, 0);

PubSubSpec spec2{topic, mediaType};
spec2.AddLabel("K", "L2", MatchingLabel::Kind::Optional);
auto&& dataPublisher2 = participant->CreateDataPublisher("PubCtrl2", spec2, 0);

PubSubSpec spec3{topic, mediaType};
spec3.AddLabel("K", "L3", MatchingLabel::Kind::Optional);
auto&& dataPublisher3 = participant->CreateDataPublisher("PubCtrl3", spec3, 0);


timeSyncService->SetSimulationStepHandler(
[dataPublisher1, dataPublisher2, dataPublisher3, lifecycleService](auto now, auto) {
if (now == 0ms)
{
dataPublisher1->Publish(std::vector<uint8_t>{1});
dataPublisher2->Publish(std::vector<uint8_t>{2});
dataPublisher3->Publish(std::vector<uint8_t>{3});
}
else if (now == 10ms)
{
lifecycleService->Stop("Stopping test");
}
},
1ms);
}


_simTestHarness->Run(1s);

EXPECT_EQ(numReceivedSubCtrl1, 1);
EXPECT_EQ(numReceivedSubCtrl2, 1);
EXPECT_EQ(numReceivedSubCtrl3, 1);
}

TEST_F(ITest_LabelMatching, rpc_multiple_controllers_same_topic_different_labels)
{
// This test ensures that we do not regress on SILKIT-1691

SetupFromParticipantList({"Client1", "Server1"});

auto functionName = "F";
auto mediaType = "M";

size_t numReceivedCallResults1{0};

size_t numReceivedCalls1{0};
size_t numReceivedCalls2{0};
size_t numReceivedCalls3{0};

{
/////////////////////////////////////////////////////////////////////////
// Server1
/////////////////////////////////////////////////////////////////////////
const auto participantName = "Server1";
auto&& simParticipant = _simTestHarness->GetParticipant(participantName);
auto&& participant = simParticipant->Participant();

RpcSpec spec1{functionName, mediaType};
spec1.AddLabel("K", "L1", MatchingLabel::Kind::Mandatory);
participant->CreateRpcServer(
"ServerCtrl1", spec1,
[&numReceivedCalls1](IRpcServer* server, RpcCallEvent event) {
numReceivedCalls1++;
server->SubmitResult(event.callHandle, std::vector<uint8_t>{1});
});

RpcSpec spec2{functionName, mediaType};
spec2.AddLabel("K", "L2", MatchingLabel::Kind::Mandatory);
participant->CreateRpcServer("ServerCtrl2", spec2,
[&numReceivedCalls2](IRpcServer* server, RpcCallEvent event) {
numReceivedCalls2++;
server->SubmitResult(event.callHandle, std::vector<uint8_t>{2});
});

RpcSpec spec3{functionName, mediaType};
spec3.AddLabel("K", "L3", MatchingLabel::Kind::Mandatory);
participant->CreateRpcServer("ServerCtrl3", spec3,
[&numReceivedCalls3](IRpcServer* server, RpcCallEvent event) {
numReceivedCalls3++;
server->SubmitResult(event.callHandle, std::vector<uint8_t>{3});
});
}

{
/////////////////////////////////////////////////////////////////////////
// Client1
/////////////////////////////////////////////////////////////////////////
const auto participantName = "Client1";
auto&& simParticipant = _simTestHarness->GetParticipant(participantName);
auto&& participant = simParticipant->Participant();
auto&& lifecycleService = simParticipant->GetOrCreateLifecycleService();
auto&& timeSyncService = simParticipant->GetOrCreateTimeSyncService();

RpcSpec spec1{functionName, mediaType};
spec1.AddLabel("K", "L1", MatchingLabel::Kind::Optional);
auto&& client1 = participant->CreateRpcClient(
"ClientCtrl1", spec1, [&numReceivedCallResults1](auto*, const auto&) { numReceivedCallResults1++; });

timeSyncService->SetSimulationStepHandler(
[client1, lifecycleService](auto now, auto) {
if (now == 0ms)
{
client1->Call(std::vector<uint8_t>{1});
}
else if (now == 10ms)
{
lifecycleService->Stop("Stopping test");
}
}, 1ms);
}

_simTestHarness->Run(1s);

EXPECT_EQ(numReceivedCallResults1, 1);

EXPECT_EQ(numReceivedCalls1, 1);
EXPECT_EQ(numReceivedCalls2, 0);
EXPECT_EQ(numReceivedCalls3, 0);
}

} //end namespace
19 changes: 13 additions & 6 deletions SilKit/source/services/rpc/RpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,16 @@ void RpcServer::RegisterServiceDiscovery()
return tmp;
};

auto clientUUID = getVal(Core::Discovery::supplKeyRpcClientUUID);

// Early abort creation if Client is already connected
if (_internalRpcServers.count(clientUUID) > 0)
{
return;
}

auto functionName = getVal(Core::Discovery::supplKeyRpcClientFunctionName);
auto clientMediaType = getVal(Core::Discovery::supplKeyRpcClientMediaType);
auto clientUUID = getVal(Core::Discovery::supplKeyRpcClientUUID);
std::string labelsStr = getVal(Core::Discovery::supplKeyRpcClientLabels);
auto clientLabels = SilKit::Config::Deserialize<std::vector<SilKit::Services::MatchingLabel>>(labelsStr);

Expand Down Expand Up @@ -96,9 +103,9 @@ void RpcServer::SubmitResult(IRpcCallHandle* callHandle, Util::Span<const uint8_

{
std::unique_lock<decltype(_internalRpcServersMx)> lock{_internalRpcServersMx};
for (auto* internalRpcServer : _internalRpcServers)
for (auto internalRpcServer : _internalRpcServers)
{
submitResultCounter += (internalRpcServer->SubmitResult(callHandle, resultData) ? 1 : 0);
submitResultCounter += (internalRpcServer.second->SubmitResult(callHandle, resultData) ? 1 : 0);
}
}

Expand All @@ -118,17 +125,17 @@ void RpcServer::AddInternalRpcServer(const std::string& clientUUID, std::string
_dataSpec.FunctionName(), clientUUID, joinedMediaType, clientLabels, _handler, this));

std::unique_lock<decltype(_internalRpcServersMx)> lock{_internalRpcServersMx};
_internalRpcServers.push_back(internalRpcServer);
_internalRpcServers.emplace(clientUUID, internalRpcServer);
}

void RpcServer::SetCallHandler(RpcCallHandler handler)
{
_handler = handler;

std::unique_lock<decltype(_internalRpcServersMx)> lock{_internalRpcServersMx};
for (auto* internalRpcServer : _internalRpcServers)
for (auto internalRpcServer : _internalRpcServers)
{
internalRpcServer->SetRpcHandler(handler);
internalRpcServer.second->SetRpcHandler(handler);
}
}

Expand Down
3 changes: 2 additions & 1 deletion SilKit/source/services/rpc/RpcServer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */

#include <vector>
#include <future>
#include <unordered_map>

#include "silkit/services/rpc/IRpcServer.hpp"
#include "silkit/services/rpc/IRpcCallHandle.hpp"
Expand Down Expand Up @@ -73,7 +74,7 @@ class RpcServer
Core::IParticipantInternal* _participant{nullptr};

std::mutex _internalRpcServersMx;
std::vector<RpcServerInternal*> _internalRpcServers;
std::unordered_map<std::string, RpcServerInternal*> _internalRpcServers;
};

// ================================================================================
Expand Down

0 comments on commit fcdf7bc

Please sign in to comment.