Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iox #27 1 client server port interfaces #275

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions doc/installation-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ The `CMakeLists.txt` from `iceoryx_meta` can be used to easily develop iceoryx w

| switch | description |
|:---------|:-------------|
| `IOX_MAX_PORT_NUMBER` | the maximum number of publisher and subscriber ports `RouDi` can distribute to the clients |
| `IOX_MAX_PUBLISHERS` | the maximum number of publishers one `RouDi` instance can manage |
budrus marked this conversation as resolved.
Show resolved Hide resolved
| `IOX_MAX_SUBSCRIBERS_PER_PUBLISHER` | the maximum number of subscriber a publisher can deliver chunks to|
| `IOX_MAX_PUBLISHER_HISTORY` | the maximum number chunks available for the publisher history |
| `IOX_MAX_CHUNKS_ALLOCATED_PER_PUBLISHER_SIMULTANEOUSLY` | the maximum number of chunks a sender can allocate at a given time |
| `IOX_MAX_SUBSCRIBERS` | the maximum number of subscribers one `RouDi` instance can manage |
| `IOX_MAX_CHUNKS_HELD_PER_SUBSCRIBER_SIMULTANEOUSLY` | the maximum number of chunks a subscriber can hold at a given time |
| `IOX_MAX_INTERFACE_NUMBER` | the maximum number for interface ports, which are used for e.g. gateways |
| `IOX_MAX_SUBSCRIBERS_PER_PUBLISHER` | the maximum number of subscriber a publisher can deliver chunks |
| `IOX_MAX_CHUNKS_ALLOCATE_PER_SENDER` | the maximum number of chunks a sender can hold at a given time |
| `IOX_MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR` | the maximum number chunks available for the chunk history |
| `IOX_MAX_CHUNKS_HELD_PER_RECEIVER` | the maximum number of chunks a receiver can hold at a given time |

Have a look at `iceoryx_posh/cmake/iceoryx_posh_deployment.cmake` for the default values of this constants.

Expand Down
4 changes: 2 additions & 2 deletions iceoryx_binding_c/source/c_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ iox_popo_AllocationResult
iox_pub_allocate_chunk(PublisherPortData* const self, void** const chunk, const uint32_t payloadSize)
{
auto result =
PublisherPortUser(self).allocateChunk(payloadSize).and_then([&](ChunkHeader* h) { *chunk = h->payload(); });
PublisherPortUser(self).tryAllocateChunk(payloadSize).and_then([&](ChunkHeader* h) { *chunk = h->payload(); });
if (result.has_error())
{
switch (result.get_error())
Expand Down Expand Up @@ -77,7 +77,7 @@ void iox_pub_send_chunk(PublisherPortData* const self, void* const chunk)
const void* iox_pub_try_get_previous_chunk(PublisherPortData* const self)
{
const void* returnValue = nullptr;
PublisherPortUser(self).getLastChunk().and_then([&](const ChunkHeader* h) { returnValue = h->payload(); });
PublisherPortUser(self).tryGetPreviousChunk().and_then([&](const ChunkHeader* h) { returnValue = h->payload(); });
return returnValue;
}

Expand Down
10 changes: 5 additions & 5 deletions iceoryx_binding_c/source/c_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ iox_SubscribeState iox_sub_get_subscription_state(SubscriberPortData* const self

iox_popo_ChunkReceiveResult iox_sub_get_chunk(SubscriberPortData* const self, const void** const payload)
{
auto result = SubscriberPortUser(self).getChunk();
auto result = SubscriberPortUser(self).tryGetChunk();
if (result.has_error())
{
return (result.get_error() == ChunkReceiveError::TOO_MANY_CHUNKS_HELD_IN_PARALLEL)
Expand Down Expand Up @@ -110,21 +110,21 @@ bool iox_sub_has_new_chunks(SubscriberPortData* const self)

bool iox_sub_has_lost_chunks(SubscriberPortData* const self)
{
return SubscriberPortUser(self).hasLostChunks();
return SubscriberPortUser(self).hasLostChunksSinceLastCall();
}

bool iox_sub_attach_condition_variable(SubscriberPortData* const self, ConditionVariableData* const cvHandle)
{
return SubscriberPortUser(self).attachConditionVariable(cvHandle);
return SubscriberPortUser(self).setConditionVariable(cvHandle);
}

bool iox_sub_detach_condition_variable(SubscriberPortData* const self)
{
return SubscriberPortUser(self).detachConditionVariable();
return SubscriberPortUser(self).unsetConditionVariable();
}

bool iox_sub_is_condition_variable_attached(SubscriberPortData* const self)
{
return SubscriberPortUser(self).isConditionVariableAttached();
return SubscriberPortUser(self).isConditionVariableSet();
}

10 changes: 5 additions & 5 deletions iceoryx_binding_c/test/integrationtests/test_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ class c_iox_pub_test : public Test
PublisherPortUser userPort(ptr);
PublisherPortRouDi roudiPort(ptr);

roudiPort.getCaProMessage();
roudiPort.tryGetCaProMessage();
iox::capro::CaproMessage caproMessage(iox::capro::CaproMessageType::SUB,
iox::capro::ServiceDescription("a", "b", "c"));
caproMessage.m_chunkQueueData = &m_chunkQueueData;
auto maybeCaProMessage = roudiPort.dispatchCaProMessage(caproMessage);
auto maybeCaProMessage = roudiPort.dispatchCaProMessageAndGetPossibleResponse(caproMessage);
}

void Unsubscribe(popo::PublisherPortData* ptr)
Expand All @@ -83,7 +83,7 @@ class c_iox_pub_test : public Test
iox::capro::CaproMessage caproMessage(iox::capro::CaproMessageType::UNSUB,
iox::capro::ServiceDescription("a", "b", "c"));
caproMessage.m_chunkQueueData = &m_chunkQueueData;
auto maybeCaProMessage = roudiPort.dispatchCaProMessage(caproMessage);
auto maybeCaProMessage = roudiPort.dispatchCaProMessageAndGetPossibleResponse(caproMessage);
}

static constexpr size_t MEMORY_SIZE = 1024 * 1024;
Expand All @@ -106,7 +106,7 @@ class c_iox_pub_test : public Test

// publisher port w/ history
PublisherPortData m_publisherPortDataHistory{
capro::ServiceDescription("x", "y", "z"), "myApp", &m_memoryManager, MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR};
capro::ServiceDescription("x", "y", "z"), "myApp", &m_memoryManager, MAX_PUBLISHER_HISTORY};
};

TEST_F(c_iox_pub_test, initialStateIsNotOffered)
Expand Down Expand Up @@ -222,7 +222,7 @@ TEST_F(c_iox_pub_test, sendDeliversChunk)
iox_pub_send_chunk(&m_publisherPortData, chunk);

iox::popo::ChunkQueuePopper<ChunkQueueData_t> m_chunkQueuePopper(&m_chunkQueueData);
auto maybeSharedChunk = m_chunkQueuePopper.pop();
auto maybeSharedChunk = m_chunkQueuePopper.tryPop();

ASSERT_TRUE(maybeSharedChunk.has_value());
EXPECT_TRUE(*maybeSharedChunk == chunk);
Expand Down
46 changes: 23 additions & 23 deletions iceoryx_binding_c/test/integrationtests/test_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ class c_iox_sub_test : public Test

void Subscribe(SubscriberPortData* ptr)
{
uint64_t queueCapacity = MAX_CHUNKS_HELD_PER_RECEIVER;
uint64_t queueCapacity = MAX_CHUNKS_HELD_PER_SUBSCRIBER_SIMULTANEOUSLY;
iox_sub_subscribe(ptr, queueCapacity);

SubscriberPortSingleProducer(ptr).getCaProMessage();
SubscriberPortSingleProducer(ptr).tryGetCaProMessage();
iox::capro::CaproMessage caproMessage(iox::capro::CaproMessageType::ACK, TEST_SERVICE_DESCRIPTION);
SubscriberPortSingleProducer(ptr).dispatchCaProMessage(caproMessage);
SubscriberPortSingleProducer(ptr).dispatchCaProMessageAndGetPossibleResponse(caproMessage);
}

static constexpr size_t MEMORY_SIZE = 1024 * 1024 * 100;
uint8_t m_memory[MEMORY_SIZE];
static constexpr uint32_t NUM_CHUNKS_IN_POOL = MAX_CHUNKS_HELD_PER_RECEIVER + 2;
static constexpr uint32_t NUM_CHUNKS_IN_POOL = MAX_CHUNKS_HELD_PER_SUBSCRIBER_SIMULTANEOUSLY + 2;
static constexpr uint32_t CHUNK_SIZE = 128;

Allocator m_memoryAllocator{m_memory, MEMORY_SIZE};
Expand All @@ -94,7 +94,7 @@ TEST_F(c_iox_sub_test, offerLeadsToSubscibeRequestedState)
uint64_t queueCapacity = 1u;
iox_sub_subscribe(&m_portPtr, queueCapacity);

SubscriberPortSingleProducer(&m_portPtr).getCaProMessage();
SubscriberPortSingleProducer(&m_portPtr).tryGetCaProMessage();

EXPECT_EQ(iox_sub_get_subscription_state(&m_portPtr), SubscribeState_SUBSCRIBE_REQUESTED);
}
Expand All @@ -104,9 +104,9 @@ TEST_F(c_iox_sub_test, NACKResponseLeadsToSubscribeWaitForOfferState)
uint64_t queueCapacity = 1u;
iox_sub_subscribe(&m_portPtr, queueCapacity);

SubscriberPortSingleProducer(&m_portPtr).getCaProMessage();
SubscriberPortSingleProducer(&m_portPtr).tryGetCaProMessage();
iox::capro::CaproMessage caproMessage(iox::capro::CaproMessageType::NACK, TEST_SERVICE_DESCRIPTION);
SubscriberPortSingleProducer(&m_portPtr).dispatchCaProMessage(caproMessage);
SubscriberPortSingleProducer(&m_portPtr).dispatchCaProMessageAndGetPossibleResponse(caproMessage);

EXPECT_EQ(iox_sub_get_subscription_state(&m_portPtr), SubscribeState_WAIT_FOR_OFFER);
}
Expand All @@ -116,9 +116,9 @@ TEST_F(c_iox_sub_test, ACKResponseLeadsToSubscribedState)
uint64_t queueCapacity = 1u;
iox_sub_subscribe(&m_portPtr, queueCapacity);

SubscriberPortSingleProducer(&m_portPtr).getCaProMessage();
SubscriberPortSingleProducer(&m_portPtr).tryGetCaProMessage();
iox::capro::CaproMessage caproMessage(iox::capro::CaproMessageType::ACK, TEST_SERVICE_DESCRIPTION);
SubscriberPortSingleProducer(&m_portPtr).dispatchCaProMessage(caproMessage);
SubscriberPortSingleProducer(&m_portPtr).dispatchCaProMessageAndGetPossibleResponse(caproMessage);

EXPECT_EQ(iox_sub_get_subscription_state(&m_portPtr), SubscribeState_SUBSCRIBED);
}
Expand All @@ -128,13 +128,13 @@ TEST_F(c_iox_sub_test, UnsubscribeLeadsToUnscribeRequestedState)
uint64_t queueCapacity = 1u;
iox_sub_subscribe(&m_portPtr, queueCapacity);

SubscriberPortSingleProducer(&m_portPtr).getCaProMessage();
SubscriberPortSingleProducer(&m_portPtr).tryGetCaProMessage();
iox::capro::CaproMessage caproMessage(iox::capro::CaproMessageType::ACK, TEST_SERVICE_DESCRIPTION);
SubscriberPortSingleProducer(&m_portPtr).dispatchCaProMessage(caproMessage);
SubscriberPortSingleProducer(&m_portPtr).dispatchCaProMessageAndGetPossibleResponse(caproMessage);

iox_sub_unsubscribe(&m_portPtr);

SubscriberPortSingleProducer(&m_portPtr).getCaProMessage();
SubscriberPortSingleProducer(&m_portPtr).tryGetCaProMessage();

EXPECT_EQ(iox_sub_get_subscription_state(&m_portPtr), SubscribeState_UNSUBSCRIBE_REQUESTED);
}
Expand All @@ -148,7 +148,7 @@ TEST_F(c_iox_sub_test, initialStateNoChunksAvailable)
TEST_F(c_iox_sub_test, receiveChunkWhenThereIsOne)
{
this->Subscribe(&m_portPtr);
m_chunkPusher.push(m_memoryManager.getChunk(100));
m_chunkPusher.tryPush(m_memoryManager.getChunk(100));

const void* chunk = nullptr;
EXPECT_EQ(iox_sub_get_chunk(&m_portPtr, &chunk), ChunkReceiveResult_SUCCESS);
Expand All @@ -164,7 +164,7 @@ TEST_F(c_iox_sub_test, DISABLED_receiveChunkWithContent)

auto sharedChunk = m_memoryManager.getChunk(100);
static_cast<data_t*>(sharedChunk.getPayload())->value = 1234;
m_chunkPusher.push(m_memoryManager.getChunk(100));
m_chunkPusher.tryPush(m_memoryManager.getChunk(100));

const void* chunk = nullptr;

Expand All @@ -176,20 +176,20 @@ TEST_F(c_iox_sub_test, receiveChunkWhenToManyChunksAreHold)
{
this->Subscribe(&m_portPtr);
const void* chunk = nullptr;
for (uint64_t i = 0; i < MAX_CHUNKS_HELD_PER_RECEIVER + 1; ++i)
for (uint64_t i = 0; i < MAX_CHUNKS_HELD_PER_SUBSCRIBER_SIMULTANEOUSLY + 1; ++i)
{
m_chunkPusher.push(m_memoryManager.getChunk(100));
m_chunkPusher.tryPush(m_memoryManager.getChunk(100));
iox_sub_get_chunk(&m_portPtr, &chunk);
}

m_chunkPusher.push(m_memoryManager.getChunk(100));
m_chunkPusher.tryPush(m_memoryManager.getChunk(100));
EXPECT_EQ(iox_sub_get_chunk(&m_portPtr, &chunk), ChunkReceiveResult_TOO_MANY_CHUNKS_HELD_IN_PARALLEL);
}

TEST_F(c_iox_sub_test, releaseChunkWorks)
{
this->Subscribe(&m_portPtr);
m_chunkPusher.push(m_memoryManager.getChunk(100));
m_chunkPusher.tryPush(m_memoryManager.getChunk(100));

const void* chunk = nullptr;
iox_sub_get_chunk(&m_portPtr, &chunk);
Expand All @@ -202,12 +202,12 @@ TEST_F(c_iox_sub_test, releaseChunkWorks)
TEST_F(c_iox_sub_test, releaseChunkQueuedChunksWorks)
{
this->Subscribe(&m_portPtr);
for (uint64_t i = 0; i < MAX_CHUNKS_HELD_PER_RECEIVER; ++i)
for (uint64_t i = 0; i < MAX_CHUNKS_HELD_PER_SUBSCRIBER_SIMULTANEOUSLY; ++i)
{
m_chunkPusher.push(m_memoryManager.getChunk(100));
m_chunkPusher.tryPush(m_memoryManager.getChunk(100));
}

EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(MAX_CHUNKS_HELD_PER_RECEIVER));
EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(MAX_CHUNKS_HELD_PER_SUBSCRIBER_SIMULTANEOUSLY));
iox_sub_release_queued_chunks(&m_portPtr);
EXPECT_THAT(m_memoryManager.getMemPoolInfo(0).m_usedChunks, Eq(0u));
}
Expand All @@ -220,7 +220,7 @@ TEST_F(c_iox_sub_test, initialStateHasNewChunksFalse)
TEST_F(c_iox_sub_test, receivingChunkLeadsToHasNewChunksTrue)
{
this->Subscribe(&m_portPtr);
m_chunkPusher.push(m_memoryManager.getChunk(100));
m_chunkPusher.tryPush(m_memoryManager.getChunk(100));

EXPECT_TRUE(iox_sub_has_new_chunks(&m_portPtr));
}
Expand All @@ -235,7 +235,7 @@ TEST_F(c_iox_sub_test, sendingTooMuchLeadsToLostChunks)
this->Subscribe(&m_portPtr);
for (uint64_t i = 0; i < DefaultChunkQueueConfig::MAX_QUEUE_CAPACITY + 1; ++i)
{
m_chunkPusher.push(m_memoryManager.getChunk(100));
m_chunkPusher.tryPush(m_memoryManager.getChunk(100));
}

EXPECT_TRUE(iox_sub_has_lost_chunks(&m_portPtr));
Expand Down
6 changes: 6 additions & 0 deletions iceoryx_posh/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ add_library(iceoryx_posh
source/popo/ports/subscriber_port_single_producer.cpp
source/popo/ports/subscriber_port_multi_producer.cpp
source/popo/ports/subscriber_port_data.cpp
source/popo/ports/client_port_data.cpp
source/popo/ports/client_port_roudi.cpp
source/popo/ports/client_port_user.cpp
source/popo/ports/server_port_data.cpp
source/popo/ports/server_port_roudi.cpp
source/popo/ports/server_port_user.cpp
source/popo/building_blocks/chunk_queue_types.cpp
source/popo/building_blocks/condition_variable_signaler.cpp
source/popo/building_blocks/condition_variable_waiter.cpp
Expand Down
20 changes: 12 additions & 8 deletions iceoryx_posh/cmake/iceoryx_posh_deployment.cmake
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# configure deployment
if(NOT IOX_MAX_PORT_NUMBER)
set(IOX_MAX_PORT_NUMBER 1024)
if(NOT IOX_MAX_PUBLISHERS)
set(IOX_MAX_PUBLISHERS 512)
endif()

if(NOT IOX_MAX_SUBSCRIBERS)
set(IOX_MAX_SUBSCRIBERS 1024)
endif()

if(NOT IOX_MAX_INTERFACE_NUMBER)
Expand All @@ -11,16 +15,16 @@ if(NOT IOX_MAX_SUBSCRIBERS_PER_PUBLISHER)
set(IOX_MAX_SUBSCRIBERS_PER_PUBLISHER 256)
endif()

if(NOT IOX_MAX_CHUNKS_ALLOCATE_PER_SENDER)
set(IOX_MAX_CHUNKS_ALLOCATE_PER_SENDER 8)
if(NOT IOX_MAX_CHUNKS_ALLOCATED_PER_PUBLISHER_SIMULTANEOUSLY)
set(IOX_MAX_CHUNKS_ALLOCATED_PER_PUBLISHER_SIMULTANEOUSLY 8)
endif()

if(NOT IOX_MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR)
set(IOX_MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR 16)
if(NOT IOX_MAX_PUBLISHER_HISTORY)
set(IOX_MAX_PUBLISHER_HISTORY 16)
endif()

if(NOT IOX_MAX_CHUNKS_HELD_PER_RECEIVER)
set(IOX_MAX_CHUNKS_HELD_PER_RECEIVER 256)
if(NOT IOX_MAX_CHUNKS_HELD_PER_SUBSCRIBER_SIMULTANEOUSLY)
set(IOX_MAX_CHUNKS_HELD_PER_SUBSCRIBER_SIMULTANEOUSLY 256)
endif()

configure_file("${CMAKE_CURRENT_SOURCE_DIR}/cmake/iceoryx_posh_deployment.hpp.in"
Expand Down
13 changes: 7 additions & 6 deletions iceoryx_posh/cmake/iceoryx_posh_deployment.hpp.in
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ namespace build
{
/// @note Do not change the constants here, the file is autogenerated!
/// Edit the iceoryx_posh_deployment.cmake to change the default values,
/// or configure via cmake, e.g. with "-DIOX_MAX_PORT_NUMBER=42".
/// or configure via cmake, e.g. with "-DIOX_MAX_PUBLISHERS=42".
/// If you include iceoryx by source into your cmake project, just add e.g.
/// set(IOX_MAX_PORT_NUMBER 42) before add_subdirectory(iceoryx_posh).
constexpr uint32_t IOX_MAX_PORT_NUMBER = static_cast<uint32_t>(@IOX_MAX_PORT_NUMBER@);
/// set(IOX_MAX_PUBLISHERS 42) before add_subdirectory(iceoryx_posh).
constexpr uint32_t IOX_MAX_PUBLISHERS = static_cast<uint32_t>(@IOX_MAX_PUBLISHERS@);
constexpr uint32_t IOX_MAX_SUBSCRIBERS = static_cast<uint32_t>(@IOX_MAX_SUBSCRIBERS@);
constexpr uint32_t IOX_MAX_INTERFACE_NUMBER = static_cast<uint32_t>(@IOX_MAX_INTERFACE_NUMBER@);
constexpr uint32_t IOX_MAX_SUBSCRIBERS_PER_PUBLISHER = static_cast<uint32_t>(@IOX_MAX_SUBSCRIBERS_PER_PUBLISHER@);
constexpr uint32_t IOX_MAX_CHUNKS_ALLOCATE_PER_SENDER = static_cast<uint32_t>(@IOX_MAX_CHUNKS_ALLOCATE_PER_SENDER@);
constexpr uint64_t IOX_MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR = static_cast<uint32_t>(@IOX_MAX_HISTORY_CAPACITY_OF_CHUNK_DISTRIBUTOR@);
constexpr uint32_t IOX_MAX_CHUNKS_HELD_PER_RECEIVER = static_cast<uint32_t>(@IOX_MAX_CHUNKS_HELD_PER_RECEIVER@);
constexpr uint32_t IOX_MAX_CHUNKS_ALLOCATED_PER_PUBLISHER_SIMULTANEOUSLY = static_cast<uint32_t>(@IOX_MAX_CHUNKS_ALLOCATED_PER_PUBLISHER_SIMULTANEOUSLY@);
constexpr uint64_t IOX_MAX_PUBLISHER_HISTORY = static_cast<uint32_t>(@IOX_MAX_PUBLISHER_HISTORY@);
constexpr uint32_t IOX_MAX_CHUNKS_HELD_PER_SUBSCRIBER_SIMULTANEOUSLY = static_cast<uint32_t>(@IOX_MAX_CHUNKS_HELD_PER_SUBSCRIBER_SIMULTANEOUSLY@);
} // namespace build
} // namespace iox

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct GatewayConfig
{
capro::ServiceDescription m_serviceDescription;
};
cxx::vector<ServiceEntry, MAX_PORT_NUMBER> m_configuredServices;
iox::cxx::vector<ServiceEntry, MAX_GATEWAY_SERVICES> m_configuredServices;

void setDefaults() noexcept;
};
Expand Down
Loading