Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iox #989 handle deserialization errors for publisher and subscriber options #1008

1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
- Add support for Multi-Arch install destinations [\#961](https://github.com/eclipse-iceoryx/iceoryx/issues/961) thanks to @roehling
- Fix a few misspellings in log messages [\#962](https://github.com/eclipse-iceoryx/iceoryx/issues/962) thanks to @roehling
- Fix typos in goals/non-goals document [\#968](https://github.com/eclipse-iceoryx/iceoryx/issues/968)
- Catch deserialization errors for enums in publisher and subscriber options [\#989](https://github.com/eclipse-iceoryx/iceoryx/issues/989)

**Refactoring:**

Expand Down
4 changes: 3 additions & 1 deletion iceoryx_posh/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
# Copyright (c) 2020 - 2021 by Apex.AI Inc. All rights reserved.
# Copyright (c) 2020 - 2022 by Apex.AI Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -125,6 +125,8 @@ add_library(iceoryx_posh
source/popo/listener.cpp
source/popo/notification_info.cpp
source/popo/rpc_header.cpp
source/popo/publisher_options.cpp
source/popo/subscriber_options.cpp
source/popo/trigger.cpp
source/popo/trigger_handle.cpp
source/popo/user_trigger.cpp
Expand Down
11 changes: 10 additions & 1 deletion iceoryx_posh/include/iceoryx_posh/popo/publisher_options.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020 - 2021 by Apex.AI Inc. All rights reserved.
// Copyright (c) 2020 - 2022 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,9 @@

#include "iceoryx_posh/iceoryx_posh_types.hpp"
#include "port_queue_policies.hpp"

#include "iceoryx_hoofs/cxx/serialization.hpp"

#include <cstdint>

namespace iox
Expand All @@ -39,6 +42,12 @@ struct PublisherOptions

/// @brief The option whether the publisher should block when the subscriber queue is full
SubscriberTooSlowPolicy subscriberTooSlowPolicy{SubscriberTooSlowPolicy::DISCARD_OLDEST_DATA};

/// @brief serialization of the PublisherOptions
elBoberido marked this conversation as resolved.
Show resolved Hide resolved
cxx::Serialization serialize() const noexcept;
/// @brief deserialization of the PublisherOptions
static cxx::expected<PublisherOptions, cxx::Serialization::Error>
deserialize(const cxx::Serialization& serialized) noexcept;
};

} // namespace popo
Expand Down
10 changes: 9 additions & 1 deletion iceoryx_posh/include/iceoryx_posh/popo/subscriber_options.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020 - 2021 by Apex.AI Inc. All rights reserved.
// Copyright (c) 2020 - 2022 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,8 @@
#include "iceoryx_posh/internal/popo/ports/subscriber_port_data.hpp"
#include "port_queue_policies.hpp"

#include "iceoryx_hoofs/cxx/serialization.hpp"

#include <cstdint>

namespace iox
Expand All @@ -44,6 +46,12 @@ struct SubscriberOptions

/// @brief The option whether the publisher should block when the subscriber queue is full
QueueFullPolicy queueFullPolicy{QueueFullPolicy::DISCARD_OLDEST_DATA};

/// @brief serialization of the SubscriberOptions
cxx::Serialization serialize() const noexcept;
/// @brief deserialization of the SubscriberOptions
static cxx::expected<SubscriberOptions, cxx::Serialization::Error>
deserialize(const cxx::Serialization& serialized) noexcept;
};

} // namespace popo
Expand Down
57 changes: 57 additions & 0 deletions iceoryx_posh/source/popo/publisher_options.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2022 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

#include "iceoryx_posh/popo/publisher_options.hpp"
#include "iceoryx_posh/internal/log/posh_logging.hpp"

namespace iox
{
namespace popo
{
cxx::Serialization PublisherOptions::serialize() const noexcept
{
return cxx::Serialization::create(
historyCapacity,
nodeName,
offerOnCreate,
static_cast<std::underlying_type_t<SubscriberTooSlowPolicy>>(subscriberTooSlowPolicy));
}

cxx::expected<PublisherOptions, cxx::Serialization::Error>
PublisherOptions::deserialize(const cxx::Serialization& serialized) noexcept
{
using SubscriberTooSlowPolicyUT = std::underlying_type_t<SubscriberTooSlowPolicy>;

PublisherOptions publisherOptions;
SubscriberTooSlowPolicyUT subscriberTooSlowPolicy;

auto deserializationSuccessful = serialized.extract(publisherOptions.historyCapacity,
publisherOptions.nodeName,
publisherOptions.offerOnCreate,
subscriberTooSlowPolicy);

if (!deserializationSuccessful
|| subscriberTooSlowPolicy
> static_cast<SubscriberTooSlowPolicyUT>(SubscriberTooSlowPolicy::DISCARD_OLDEST_DATA))
{
return cxx::error<cxx::Serialization::Error>(cxx::Serialization::Error::DESERIALIZATION_FAILED);
}

publisherOptions.subscriberTooSlowPolicy = static_cast<SubscriberTooSlowPolicy>(subscriberTooSlowPolicy);
return cxx::success<PublisherOptions>(publisherOptions);
}
} // namespace popo
} // namespace iox
57 changes: 57 additions & 0 deletions iceoryx_posh/source/popo/subscriber_options.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2022 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

#include "iceoryx_posh/popo/subscriber_options.hpp"
#include "iceoryx_posh/internal/log/posh_logging.hpp"

namespace iox
{
namespace popo
{
cxx::Serialization SubscriberOptions::serialize() const noexcept
{
return cxx::Serialization::create(queueCapacity,
historyRequest,
nodeName,
subscribeOnCreate,
static_cast<std::underlying_type_t<QueueFullPolicy>>(queueFullPolicy));
}

cxx::expected<SubscriberOptions, cxx::Serialization::Error>
SubscriberOptions::deserialize(const cxx::Serialization& serialized) noexcept
{
using QueueFullPolicyUT = std::underlying_type_t<QueueFullPolicy>;

SubscriberOptions subscriberOptions;
QueueFullPolicyUT queueFullPolicy;

auto deserializationSuccessful = serialized.extract(subscriberOptions.queueCapacity,
subscriberOptions.historyRequest,
subscriberOptions.nodeName,
subscriberOptions.subscribeOnCreate,
queueFullPolicy);

if (!deserializationSuccessful
|| queueFullPolicy > static_cast<QueueFullPolicyUT>(QueueFullPolicy::DISCARD_OLDEST_DATA))
{
return cxx::error<cxx::Serialization::Error>(cxx::Serialization::Error::DESERIALIZATION_FAILED);
}

subscriberOptions.queueFullPolicy = static_cast<QueueFullPolicy>(queueFullPolicy);
return cxx::success<SubscriberOptions>(subscriberOptions);
}
} // namespace popo
} // namespace iox
83 changes: 19 additions & 64 deletions iceoryx_posh/source/roudi/roudi.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) 2019, 2021 by Robert Bosch GmbH. All rights reserved.
// Copyright (c) 2021 by Apex.AI Inc. All rights reserved.
// Copyright (c) 2021 - 2022 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -229,7 +229,7 @@ void RouDi::processMessage(const runtime::IpcMessage& message,
}
case runtime::IpcMessageType::CREATE_PUBLISHER:
{
if (message.getNumberOfElements() != 8)
if (message.getNumberOfElements() != 5)
{
LogError() << "Wrong number of parameters for \"IpcMessageType::CREATE_PUBLISHER\" from \"" << runtimeName
<< "\"received!";
Expand All @@ -246,51 +246,32 @@ void RouDi::processMessage(const runtime::IpcMessage& message,
}
const auto& service = deserializationResult.value();

cxx::Serialization portConfigInfoSerialization(message.getElementAtIndex(7));

if (!service.isValid())
{
LogError() << "Invalid service description '" << message.getElementAtIndex(2).c_str() << "' provided\n";
break;
}

popo::PublisherOptions options;
uint64_t historyCapacity{};
if (!cxx::convert::fromString(message.getElementAtIndex(3).c_str(), historyCapacity))
auto publisherOptionsDeserializationResult =
popo::PublisherOptions::deserialize(cxx::Serialization(message.getElementAtIndex(3)));
if (publisherOptionsDeserializationResult.has_error())
{
LogError() << "Invalid parameter for \"IpcMessageType::CREATE_PUBLISHER\"! '"
<< message.getElementAtIndex(3).c_str() << "' cannot be extracted from string\n";
LogError() << "Deserialization of 'PublisherOptions' failed when '"
<< message.getElementAtIndex(3).c_str() << "' was provided\n";
break;
}
options.historyCapacity = historyCapacity;
options.nodeName = NodeName_t(cxx::TruncateToCapacity, message.getElementAtIndex(4));
const auto& publisherOptions = publisherOptionsDeserializationResult.value();

uint64_t offerOnCreate{};
if (!cxx::convert::fromString(message.getElementAtIndex(5).c_str(), offerOnCreate))
{
LogError() << "Invalid parameter for \"IpcMessageType::CREATE_PUBLISHER\"! '"
<< message.getElementAtIndex(5).c_str() << "' cannot be extracted from string\n";
break;
}
options.offerOnCreate = (0U == offerOnCreate) ? false : true;

uint8_t subscriberTooSlowPolicy{};
if (!cxx::convert::fromString(message.getElementAtIndex(6).c_str(), subscriberTooSlowPolicy))
{
LogError() << "Invalid parameter for \"IpcMessageType::CREATE_PUBLISHER\"! '"
<< message.getElementAtIndex(6).c_str() << "' cannot be extracted from string\n";
break;
}
options.subscriberTooSlowPolicy = static_cast<popo::SubscriberTooSlowPolicy>(subscriberTooSlowPolicy);
cxx::Serialization portConfigInfoSerialization(message.getElementAtIndex(4));

m_prcMgr->addPublisherForProcess(
runtimeName, service, options, iox::runtime::PortConfigInfo(portConfigInfoSerialization));
runtimeName, service, publisherOptions, iox::runtime::PortConfigInfo(portConfigInfoSerialization));
}
break;
}
case runtime::IpcMessageType::CREATE_SUBSCRIBER:
{
if (message.getNumberOfElements() != 9)
if (message.getNumberOfElements() != 5)
{
LogError() << "Wrong number of parameters for \"IpcMessageType::CREATE_SUBSCRIBER\" from \"" << runtimeName
<< "\"received!";
Expand All @@ -307,53 +288,27 @@ void RouDi::processMessage(const runtime::IpcMessage& message,
}

const auto& service = deserializationResult.value();
cxx::Serialization portConfigInfoSerialization(message.getElementAtIndex(8));

if (!service.isValid())
{
LogError() << "Invalid service description '" << message.getElementAtIndex(2).c_str() << "' provided\n";
break;
}

popo::SubscriberOptions options;
uint64_t historyRequest;
if (!cxx::convert::fromString(message.getElementAtIndex(3).c_str(), historyRequest))
{
LogError() << "Invalid parameter for \"IpcMessageType::CREATE_SUBSCRIBER\"! '"
<< message.getElementAtIndex(3).c_str() << "' cannot be extracted from string\n";
break;
}
options.historyRequest = historyRequest;
uint64_t queueCapacity;
if (!cxx::convert::fromString(message.getElementAtIndex(4).c_str(), queueCapacity))
{
LogError() << "Invalid parameter for \"IpcMessageType::CREATE_SUBSCRIBER\"! '"
<< message.getElementAtIndex(4).c_str() << "' cannot be extracted from string\n";
break;
}
options.queueCapacity = queueCapacity;
options.nodeName = NodeName_t(cxx::TruncateToCapacity, message.getElementAtIndex(5));

uint32_t subscribeOnCreate;
if (!cxx::convert::fromString(message.getElementAtIndex(6).c_str(), subscribeOnCreate))
auto subscriberOptionsDeserializationResult =
popo::SubscriberOptions::deserialize(cxx::Serialization(message.getElementAtIndex(3)));
if (subscriberOptionsDeserializationResult.has_error())
{
LogError() << "Invalid parameter for \"IpcMessageType::CREATE_SUBSCRIBER\"! '"
<< message.getElementAtIndex(6).c_str() << "' cannot be extracted from string\n";
LogError() << "Deserialization of 'SubscriberOptions' failed when '"
<< message.getElementAtIndex(3).c_str() << "' was provided\n";
break;
}
options.subscribeOnCreate = (0U == subscribeOnCreate ? false : true);
const auto& subscriberOptions = subscriberOptionsDeserializationResult.value();

uint8_t queueFullPolicy{};
if (!cxx::convert::fromString(message.getElementAtIndex(7).c_str(), queueFullPolicy))
{
LogError() << "Invalid parameter for \"IpcMessageType::CREATE_SUBSCRIBER\"! '"
<< message.getElementAtIndex(7).c_str() << "' cannot be extracted from string\n";
break;
}
options.queueFullPolicy = static_cast<popo::QueueFullPolicy>(queueFullPolicy);
cxx::Serialization portConfigInfoSerialization(message.getElementAtIndex(4));

m_prcMgr->addSubscriberForProcess(
runtimeName, service, options, iox::runtime::PortConfigInfo(portConfigInfoSerialization));
runtimeName, service, subscriberOptions, iox::runtime::PortConfigInfo(portConfigInfoSerialization));
}
break;
}
Expand Down
11 changes: 3 additions & 8 deletions iceoryx_posh/source/runtime/posh_runtime_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) 2019 - 2021 by Robert Bosch GmbH. All rights reserved.
// Copyright (c) 2021 by Apex.AI Inc. All rights reserved.
// Copyright (c) 2021 - 2022 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -128,9 +128,7 @@ PoshRuntimeImpl::getMiddlewarePublisher(const capro::ServiceDescription& service

IpcMessage sendBuffer;
sendBuffer << IpcMessageTypeToString(IpcMessageType::CREATE_PUBLISHER) << m_appName
<< static_cast<cxx::Serialization>(service).toString() << cxx::convert::toString(options.historyCapacity)
<< options.nodeName << cxx::convert::toString(options.offerOnCreate)
<< cxx::convert::toString(static_cast<uint8_t>(options.subscriberTooSlowPolicy))
<< static_cast<cxx::Serialization>(service).toString() << publisherOptions.serialize().toString()
<< static_cast<cxx::Serialization>(portConfigInfo).toString();

auto maybePublisher = requestPublisherFromRoudi(sendBuffer);
Expand Down Expand Up @@ -241,10 +239,7 @@ PoshRuntimeImpl::getMiddlewareSubscriber(const capro::ServiceDescription& servic

IpcMessage sendBuffer;
sendBuffer << IpcMessageTypeToString(IpcMessageType::CREATE_SUBSCRIBER) << m_appName
<< static_cast<cxx::Serialization>(service).toString() << cxx::convert::toString(options.historyRequest)
<< cxx::convert::toString(options.queueCapacity) << options.nodeName
<< cxx::convert::toString(options.subscribeOnCreate)
<< cxx::convert::toString(static_cast<uint8_t>(options.queueFullPolicy))
<< static_cast<cxx::Serialization>(service).toString() << options.serialize().toString()
<< static_cast<cxx::Serialization>(portConfigInfo).toString();

auto maybeSubscriber = requestSubscriberFromRoudi(sendBuffer);
Expand Down
Loading