Skip to content

Commit

Permalink
iox-#33 Use unnamed semaphores for named pipes
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Eltzschig <me@elchris.org>
  • Loading branch information
elfenpiff committed Jun 17, 2021
1 parent 04dd9ae commit 8c6e0b9
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 57 deletions.
16 changes: 12 additions & 4 deletions iceoryx_hoofs/include/iceoryx_hoofs/posix_wrapper/named_pipe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "iceoryx_hoofs/concurrent/lockfree_queue.hpp"
#include "iceoryx_hoofs/cxx/string.hpp"
#include "iceoryx_hoofs/cxx/vector.hpp"
#include "iceoryx_hoofs/design_pattern/creation.hpp"
#include "iceoryx_hoofs/internal/posix_wrapper/ipc_channel.hpp"
#include "iceoryx_hoofs/internal/posix_wrapper/shared_memory_object.hpp"
Expand Down Expand Up @@ -120,16 +121,23 @@ class NamedPipe : public DesignPattern::Creation<NamedPipe, IpcChannelError>

private:
cxx::optional<SharedMemoryObject> m_sharedMemory;
mutable cxx::optional<Semaphore> m_sendSemaphore;
mutable cxx::optional<Semaphore> m_receiveSemaphore;

struct NamedPipeData
{
NamedPipeData(bool& isInitialized, IpcChannelError& error, const uint64_t maxMsgNumber) noexcept;

Semaphore& sendSemaphore() noexcept;
Semaphore& receiveSemaphore() noexcept;

static constexpr uint64_t SEND_SEMAPHORE = 0U;
static constexpr uint64_t RECEIVE_SEMAPHORE = 1U;

MessageQueue_t messages;
Semaphore sendSemaphore;
Semaphore receiveSemaphore;
using semaphoreMemory_t = uint8_t[sizeof(Semaphore)];
alignas(alignof(Semaphore)) semaphoreMemory_t semaphores[2U];
};


NamedPipeData* m_data = nullptr;
};
} // namespace posix
Expand Down
93 changes: 40 additions & 53 deletions iceoryx_hoofs/source/posix_wrapper/named_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,50 +106,41 @@ NamedPipe::NamedPipe(const IpcChannelName_t& name,

m_sharedMemory.emplace(std::move(*sharedMemory));

m_isInitialized = true;
auto signalError = [&](const char* name, const char* openMode) {
std::cerr << "Unable to " << openMode << " the named pipe semaphore \"" << name << "\" for named pipe \""
<< name << "\"" << std::endl;
m_isInitialized = false;
m_errorValue = IpcChannelError::INTERNAL_LOGIC_ERROR;
};
m_data = static_cast<NamedPipeData*>(m_sharedMemory->allocate(sizeof(NamedPipeData), alignof(NamedPipeData)));

m_isInitialized = true;
if (m_sharedMemory->hasOwnership())
{
Semaphore::create(CreateNamedSemaphore,
convertName(SEND_SEMAPHORE_PREFIX, name).c_str(),
static_cast<mode_t>(S_IRUSR | S_IWUSR),
static_cast<unsigned int>(maxMsgNumber))
.and_then([&](auto& r) { m_sendSemaphore.emplace(std::move(r)); })
.or_else([&](auto) { signalError(convertName(SEND_SEMAPHORE_PREFIX, name).c_str(), "create"); });

Semaphore::create(CreateNamedSemaphore,
convertName(RECEIVE_SEMAPHORE_PREFIX, name).c_str(),
static_cast<mode_t>(S_IRUSR | S_IWUSR),
0U)
.and_then([&](auto& r) { m_receiveSemaphore.emplace(std::move(r)); })
.or_else([&](auto) { signalError(convertName(RECEIVE_SEMAPHORE_PREFIX, name).c_str(), "create"); });
new (m_data) NamedPipeData(m_isInitialized, m_errorValue, maxMsgNumber);
}
else
{
Semaphore::create(OpenNamedSemaphore, convertName(SEND_SEMAPHORE_PREFIX, name).c_str(), 0)
.and_then([&](auto& r) { m_sendSemaphore.emplace(std::move(r)); })
.or_else([&](auto) { signalError(convertName(SEND_SEMAPHORE_PREFIX, name).c_str(), "open"); });
}

Semaphore::create(OpenNamedSemaphore, convertName(RECEIVE_SEMAPHORE_PREFIX, name).c_str(), 0)
.and_then([&](auto& r) { m_receiveSemaphore.emplace(std::move(r)); })
.or_else([&](auto) { signalError(convertName(RECEIVE_SEMAPHORE_PREFIX, name).c_str(), "open"); });
}
NamedPipe::NamedPipeData::NamedPipeData(bool& isInitialized,
IpcChannelError& error,
const uint64_t maxMsgNumber) noexcept
{
auto signalError = [&](const char* name) {
std::cerr << "Unable to create " << name << " semaphore for named pipe \"" << 'x' << "\"";
isInitialized = false;
error = IpcChannelError::INTERNAL_LOGIC_ERROR;
};

if (m_isInitialized)
{
m_data = static_cast<NamedPipeData*>(m_sharedMemory->allocate(sizeof(NamedPipeData), alignof(NamedPipeData)));
Semaphore::placementCreate(
&semaphores[SEND_SEMAPHORE], CreateUnnamedSharedMemorySemaphore, static_cast<unsigned int>(maxMsgNumber))
.or_else([&](auto) { signalError("send"); });

if (channelSide == IpcChannelSide::SERVER)
{
new (m_data) NamedPipeData();
}
}
Semaphore::placementCreate(&semaphores[RECEIVE_SEMAPHORE], CreateUnnamedSharedMemorySemaphore, 0U)
.or_else([&](auto) { signalError("receive"); });
}

Semaphore& NamedPipe::NamedPipeData::sendSemaphore() noexcept
{
return reinterpret_cast<Semaphore&>(semaphores[SEND_SEMAPHORE]);
}

Semaphore& NamedPipe::NamedPipeData::receiveSemaphore() noexcept
{
return reinterpret_cast<Semaphore&>(semaphores[RECEIVE_SEMAPHORE]);
}

NamedPipe::NamedPipe(NamedPipe&& rhs) noexcept
Expand All @@ -165,8 +156,6 @@ NamedPipe& NamedPipe::operator=(NamedPipe&& rhs) noexcept
CreationPattern_t::operator=(std::move(rhs));

m_sharedMemory = std::move(rhs.m_sharedMemory);
m_sendSemaphore = std::move(rhs.m_sendSemaphore);
m_receiveSemaphore = std::move(rhs.m_receiveSemaphore);
m_data = std::move(rhs.m_data);
rhs.m_data = nullptr;
}
Expand All @@ -193,8 +182,6 @@ cxx::expected<IpcChannelError> NamedPipe::destroy() noexcept
m_isInitialized = false;
m_errorValue = IpcChannelError::NOT_INITIALIZED;
m_data->~NamedPipeData();
m_sendSemaphore.reset();
m_receiveSemaphore.reset();
m_sharedMemory.reset();
m_data = nullptr;
}
Expand Down Expand Up @@ -233,13 +220,13 @@ cxx::expected<IpcChannelError> NamedPipe::trySend(const std::string& message) co
return cxx::error<IpcChannelError>(IpcChannelError::MESSAGE_TOO_LONG);
}

auto result = m_sendSemaphore->tryWait();
auto result = m_data->sendSemaphore().tryWait();
cxx::Expects(!result.has_error());

if (*result)
{
IOX_DISCARD_RESULT(m_data->messages.push(Message_t(cxx::TruncateToCapacity, message)));
cxx::Expects(!m_receiveSemaphore->post().has_error());
cxx::Expects(!m_data->receiveSemaphore().post().has_error());
return cxx::success<>();
}
return cxx::error<IpcChannelError>(IpcChannelError::TIMEOUT);
Expand All @@ -257,9 +244,9 @@ cxx::expected<IpcChannelError> NamedPipe::send(const std::string& message) const
return cxx::error<IpcChannelError>(IpcChannelError::MESSAGE_TOO_LONG);
}

cxx::Expects(!m_sendSemaphore->wait().has_error());
cxx::Expects(!m_data->sendSemaphore().wait().has_error());
IOX_DISCARD_RESULT(m_data->messages.push(Message_t(cxx::TruncateToCapacity, message)));
cxx::Expects(!m_receiveSemaphore->post().has_error());
cxx::Expects(!m_data->receiveSemaphore().post().has_error());

return cxx::success<>();
}
Expand All @@ -277,13 +264,13 @@ cxx::expected<IpcChannelError> NamedPipe::timedSend(const std::string& message,
return cxx::error<IpcChannelError>(IpcChannelError::MESSAGE_TOO_LONG);
}

auto result = m_sendSemaphore->timedWait(timeout);
auto result = m_data->sendSemaphore().timedWait(timeout);
cxx::Expects(!result.has_error());

if (*result == SemaphoreWaitState::NO_TIMEOUT)
{
IOX_DISCARD_RESULT(m_data->messages.push(Message_t(cxx::TruncateToCapacity, message)));
cxx::Expects(!m_receiveSemaphore->post().has_error());
cxx::Expects(!m_data->receiveSemaphore().post().has_error());
return cxx::success<>();
}
return cxx::error<IpcChannelError>(IpcChannelError::TIMEOUT);
Expand All @@ -296,11 +283,11 @@ cxx::expected<std::string, IpcChannelError> NamedPipe::receive() const noexcept
return cxx::error<IpcChannelError>(IpcChannelError::NOT_INITIALIZED);
}

cxx::Expects(!m_receiveSemaphore->wait().has_error());
cxx::Expects(!m_data->receiveSemaphore().wait().has_error());
auto message = m_data->messages.pop();
if (message.has_value())
{
cxx::Expects(!m_sendSemaphore->post().has_error());
cxx::Expects(!m_data->sendSemaphore().post().has_error());
return cxx::success<std::string>(message->c_str());
}
return cxx::error<IpcChannelError>(IpcChannelError::INTERNAL_LOGIC_ERROR);
Expand All @@ -313,15 +300,15 @@ cxx::expected<std::string, IpcChannelError> NamedPipe::tryReceive() const noexce
return cxx::error<IpcChannelError>(IpcChannelError::NOT_INITIALIZED);
}

auto result = m_receiveSemaphore->tryWait();
auto result = m_data->receiveSemaphore().tryWait();
cxx::Expects(!result.has_error());

if (*result)
{
auto message = m_data->messages.pop();
if (message.has_value())
{
cxx::Expects(!m_sendSemaphore->post().has_error());
cxx::Expects(!m_data->sendSemaphore().post().has_error());
return cxx::success<std::string>(message->c_str());
}
return cxx::error<IpcChannelError>(IpcChannelError::INTERNAL_LOGIC_ERROR);
Expand All @@ -337,15 +324,15 @@ cxx::expected<std::string, IpcChannelError> NamedPipe::timedReceive(const units:
return cxx::error<IpcChannelError>(IpcChannelError::NOT_INITIALIZED);
}

auto result = m_receiveSemaphore->timedWait(timeout);
auto result = m_data->receiveSemaphore().timedWait(timeout);
cxx::Expects(!result.has_error());

if (*result == SemaphoreWaitState::NO_TIMEOUT)
{
auto message = m_data->messages.pop();
if (message.has_value())
{
cxx::Expects(!m_sendSemaphore->post().has_error());
cxx::Expects(!m_data->sendSemaphore().post().has_error());
return cxx::success<std::string>(message->c_str());
}
return cxx::error<IpcChannelError>(IpcChannelError::INTERNAL_LOGIC_ERROR);
Expand Down
2 changes: 2 additions & 0 deletions iceoryx_hoofs/test/moduletests/test_ipc_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class IpcChannel_test : public Test

void SetUp()
{
IOX_DISCARD_RESULT(IpcChannelType::unlinkIfExists(goodName));

auto serverResult = IpcChannelType::create(goodName, IpcChannelSide::SERVER, MaxMsgSize, MaxMsgNumber);
ASSERT_THAT(serverResult.has_error(), Eq(false));
server = std::move(serverResult.value());
Expand Down

0 comments on commit 8c6e0b9

Please sign in to comment.