diff --git a/include/fastdds/rtps/builtin/liveliness/WLPListener.h b/include/fastdds/rtps/builtin/liveliness/WLPListener.h index a1859a202b7..825fe431933 100644 --- a/include/fastdds/rtps/builtin/liveliness/WLPListener.h +++ b/include/fastdds/rtps/builtin/liveliness/WLPListener.h @@ -21,14 +21,14 @@ #define _FASTDDS_RTPS_WLPLISTENER_H_ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC -#include -#include +#include #include - +#include +#include #include namespace eprosima { -namespace fastrtps{ +namespace fastrtps { namespace rtps { class WLP; @@ -39,14 +39,16 @@ struct CacheChange_t; * Class WLPListener that receives the liveliness messages asserting the liveliness of remote endpoints. * @ingroup LIVELINESS_MODULE */ -class WLPListener: public ReaderListener { +class WLPListener : public ReaderListener +{ public: /** * @brief Constructor * @param pwlp Pointer to the writer liveliness protocol */ - WLPListener(WLP* pwlp); + WLPListener( + WLP* pwlp); /** * @brief Destructor @@ -60,27 +62,41 @@ class WLPListener: public ReaderListener { */ void onNewCacheChangeAdded( RTPSReader* reader, - const CacheChange_t* const change) override; + const CacheChange_t* const change) override; private: /** - * Separate the Key between the GuidPrefix_t and the liveliness Kind - * @param key InstanceHandle_t to separate. - * @param guidP GuidPrefix_t pointer to store the info. - * @param liveliness Liveliness Kind Pointer. - * @return True if correctly separated. - */ + * Separate the Key between the GuidPrefix_t and the liveliness Kind + * @param key InstanceHandle_t to separate. + * @param guidP GuidPrefix_t pointer to store the info. + * @param liveliness Liveliness Kind Pointer. + * @return True if correctly separated. + */ bool separateKey( InstanceHandle_t& key, GuidPrefix_t* guidP, LivelinessQosPolicyKind* liveliness); /** - * Compute the key from a CacheChange_t - * @param change - */ - bool computeKey(CacheChange_t* change); + * Compute the key from a CacheChange_t + * @param change + */ + bool computeKey( + CacheChange_t* change); + + /** + * @brief Check that the ParticipantMessageData kind is a valid one for WLP and extract the liveliness kind. + * + * @param[in] serialized_kind A pointer to the first octet of the kind array. The function assumes 4 elements + * in the array. + * @param[out] liveliness_kind A reference to the LivelinessQosPolicyKind. + * + * @return True if the kind corresponds with one for WLP, false otherwise. + */ + bool get_wlp_kind( + const octet* serialized_kind, + LivelinessQosPolicyKind& liveliness_kind); //! A pointer to the writer liveliness protocol WLP* mp_WLP; @@ -89,6 +105,6 @@ class WLPListener: public ReaderListener { } /* namespace rtps */ } /* namespace eprosima */ -} -#endif +} // namespace eprosima +#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC #endif /* _FASTDDS_RTPS_WLPLISTENER_H_ */ diff --git a/include/fastdds/rtps/common/CDRMessage_t.h b/include/fastdds/rtps/common/CDRMessage_t.h index 3c93a564274..567f4e314bd 100644 --- a/include/fastdds/rtps/common/CDRMessage_t.h +++ b/include/fastdds/rtps/common/CDRMessage_t.h @@ -96,7 +96,11 @@ struct RTPS_DllAPI CDRMessage_t final const SerializedPayload_t& payload) : wraps(true) { - msg_endian = payload.encapsulation == PL_CDR_BE ? BIGEND : LITTLEEND; + msg_endian = LITTLEEND; + if (payload.encapsulation == PL_CDR_BE || payload.encapsulation == CDR_BE) + { + msg_endian = BIGEND; + } pos = payload.pos; length = payload.length; buffer = payload.data; diff --git a/include/fastdds/rtps/common/SerializedPayload.h b/include/fastdds/rtps/common/SerializedPayload.h index 17e2aeea6c2..4422c43efe5 100644 --- a/include/fastdds/rtps/common/SerializedPayload.h +++ b/include/fastdds/rtps/common/SerializedPayload.h @@ -42,7 +42,7 @@ namespace rtps { #define PL_CDR_LE 0x0003 #if FASTDDS_IS_BIG_ENDIAN_TARGET -#define DEFAULT_ENCAPSULATION CDR_LE +#define DEFAULT_ENCAPSULATION CDR_BE #define PL_DEFAULT_ENCAPSULATION PL_CDR_BE #else #define DEFAULT_ENCAPSULATION CDR_LE diff --git a/include/fastdds/rtps/messages/CDRMessage.h b/include/fastdds/rtps/messages/CDRMessage.h index 7856906f361..20e55b9550f 100644 --- a/include/fastdds/rtps/messages/CDRMessage.h +++ b/include/fastdds/rtps/messages/CDRMessage.h @@ -312,6 +312,17 @@ inline bool addParticipantGenericMessage( ///@} +/** + * @brief Skip bytes in serialized buffer + * + * @param msg The CDR message + * @param length The number of bytes to skip + * @return true if skipped, false otherwise + */ +inline bool skip( + CDRMessage_t* msg, + uint32_t length); + } /* namespace CDRMessage */ } /* namespace rtps */ diff --git a/include/fastdds/rtps/messages/CDRMessage.hpp b/include/fastdds/rtps/messages/CDRMessage.hpp index a39156855d4..d3161570767 100644 --- a/include/fastdds/rtps/messages/CDRMessage.hpp +++ b/include/fastdds/rtps/messages/CDRMessage.hpp @@ -1301,6 +1301,20 @@ inline bool CDRMessage::readParticipantGenericMessage( return true; } +inline bool CDRMessage::skip( + CDRMessage_t* msg, + uint32_t length) +{ + // Validate input + bool ret = (msg != nullptr) && (msg->pos + length <= msg->length); + if (ret) + { + // Advance index the number of specified bytes + msg->pos += length; + } + return ret; +} + } // namespace rtps } // namespace fastrtps } // namespace eprosima diff --git a/src/cpp/rtps/builtin/liveliness/WLP.cpp b/src/cpp/rtps/builtin/liveliness/WLP.cpp index 3366158ce82..3ed6c67ce7c 100644 --- a/src/cpp/rtps/builtin/liveliness/WLP.cpp +++ b/src/cpp/rtps/builtin/liveliness/WLP.cpp @@ -890,9 +890,9 @@ bool WLP::send_liveliness_message( if (change != nullptr) { - change->serializedPayload.encapsulation = (uint16_t)PL_DEFAULT_ENCAPSULATION; + change->serializedPayload.encapsulation = (uint16_t)DEFAULT_ENCAPSULATION; change->serializedPayload.data[0] = 0; - change->serializedPayload.data[1] = PL_DEFAULT_ENCAPSULATION; + change->serializedPayload.data[1] = DEFAULT_ENCAPSULATION; change->serializedPayload.data[2] = 0; change->serializedPayload.data[3] = 0; diff --git a/src/cpp/rtps/builtin/liveliness/WLPListener.cpp b/src/cpp/rtps/builtin/liveliness/WLPListener.cpp index 0002c47104a..339a6fc1877 100644 --- a/src/cpp/rtps/builtin/liveliness/WLPListener.cpp +++ b/src/cpp/rtps/builtin/liveliness/WLPListener.cpp @@ -16,29 +16,35 @@ * @file WLPListener.cpp * */ - #include -#include -#include +#include +#include +#include +#include -#include +#include #include - -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include -#include - -#include - - +#include namespace eprosima { -namespace fastrtps{ +namespace fastrtps { namespace rtps { - -WLPListener::WLPListener(WLP* plwp) +WLPListener::WLPListener( + WLP* plwp) : mp_WLP(plwp) { } @@ -47,8 +53,6 @@ WLPListener::~WLPListener() { } -typedef std::vector::iterator WPIT; - void WLPListener::onNewCacheChangeAdded( RTPSReader* reader, const CacheChange_t* const changeIN) @@ -56,55 +60,84 @@ void WLPListener::onNewCacheChangeAdded( std::lock_guard guard2(*mp_WLP->mp_builtinProtocols->mp_PDP->getMutex()); GuidPrefix_t guidP; - LivelinessQosPolicyKind livelinessKind; + LivelinessQosPolicyKind livelinessKind = AUTOMATIC_LIVELINESS_QOS; CacheChange_t* change = (CacheChange_t*)changeIN; - if(!computeKey(change)) + if (!computeKey(change)) { - logWarning(RTPS_LIVELINESS,"Problem obtaining the Key"); + logWarning(RTPS_LIVELINESS, "Problem obtaining the Key"); return; } //Check the serializedPayload: auto history = reader->getHistory(); - for(auto ch = history->changesBegin(); ch!=history->changesEnd(); ++ch) + for (auto ch = history->changesBegin(); ch != history->changesEnd(); ++ch) { - if((*ch)->instanceHandle == change->instanceHandle && (*ch)->sequenceNumber < change->sequenceNumber) + if ((*ch)->instanceHandle == change->instanceHandle && (*ch)->sequenceNumber < change->sequenceNumber) { history->remove_change(*ch); break; } } - if (change->serializedPayload.length > 0) - { - if (PL_CDR_BE == change->serializedPayload.data[1]) - { - change->serializedPayload.encapsulation = (uint16_t)PL_CDR_BE; - } - else - { - change->serializedPayload.encapsulation = (uint16_t)PL_CDR_LE; - } - for(size_t i = 0; i<12; ++i) + // Serialized payload should have at least 4 bytes of representation header, 12 of GuidPrefix, + // 4 of kind, and 4 of length. + constexpr uint32_t participant_msg_data_kind_size = 4; + constexpr uint32_t participant_msg_data_length_size = 4; + constexpr uint32_t min_serialized_length = SerializedPayload_t::representation_header_size + + GuidPrefix_t::size + + participant_msg_data_kind_size + + participant_msg_data_length_size; + + if (change->serializedPayload.length >= min_serialized_length) + { + constexpr uint32_t participant_msg_data_kind_pos = 16; + constexpr uint32_t encapsulation_pos = 1; + uint32_t data_length = 0; + + // Extract encapsulation from the second byte of the representation header. Done prior to + // creating the CDRMessage_t, as the CDRMessage_t ctor uses it for its own state. + change->serializedPayload.encapsulation = + static_cast(change->serializedPayload.data[encapsulation_pos]); + + // Create CDR message from buffer to deserialize contents for further validation + CDRMessage_t cdr_message(change->serializedPayload); + + bool message_ok = ( + // Skip representation header + CDRMessage::skip(&cdr_message, SerializedPayload_t::representation_header_size) + // Extract GuidPrefix + && CDRMessage::readData(&cdr_message, guidP.value, GuidPrefix_t::size) + // Skip kind, it will be validated later + && CDRMessage::skip(&cdr_message, participant_msg_data_kind_size) + // Extract and validate liveliness kind + && get_wlp_kind(&change->serializedPayload.data[participant_msg_data_kind_pos], livelinessKind) + // Extract data length + && CDRMessage::readUInt32(&cdr_message, &data_length) + // Check that serialized length is correctly set + && (change->serializedPayload.length >= min_serialized_length + data_length)); + + if (!message_ok) { - guidP.value[i] = change->serializedPayload.data[i + 4]; + logInfo(RTPS_LIVELINESS, "Ignoring incorrect WLP ParticipantDataMessage"); + history->remove_change(change); + return; } - livelinessKind = (LivelinessQosPolicyKind)(change->serializedPayload.data[19]-0x01); - } else { - if(!separateKey( + if (!separateKey( change->instanceHandle, &guidP, &livelinessKind)) { + logInfo(RTPS_LIVELINESS, "Ignoring not WLP ParticipantDataMessage"); + history->remove_change(change); return; } } - if(guidP == reader->getGuid().guidPrefix) + if (guidP == reader->getGuid().guidPrefix) { - logInfo(RTPS_LIVELINESS,"Message from own RTPSParticipant, ignoring"); + logInfo(RTPS_LIVELINESS, "Message from own RTPSParticipant, ignoring"); history->remove_change(change); return; } @@ -129,20 +162,22 @@ bool WLPListener::separateKey( GuidPrefix_t* guidP, LivelinessQosPolicyKind* liveliness) { - for(uint8_t i=0;i<12;++i) + bool ret = get_wlp_kind(&key.value[12], *liveliness); + if (ret) { - guidP->value[i] = key.value[i]; + // Extract GuidPrefix + memcpy(guidP->value, key.value, 12); } - *liveliness = (LivelinessQosPolicyKind)key.value[15]; - return true; + return ret; } -bool WLPListener::computeKey(CacheChange_t* change) +bool WLPListener::computeKey( + CacheChange_t* change) { - if(change->instanceHandle == c_InstanceHandle_Unknown) + if (change->instanceHandle == c_InstanceHandle_Unknown) { SerializedPayload_t* pl = &change->serializedPayload; - if(pl->length >= 20) + if (pl->length >= 20) { memcpy(change->instanceHandle.value, pl->data + 4, 16); return true; @@ -152,7 +187,30 @@ bool WLPListener::computeKey(CacheChange_t* change) return true; } +bool WLPListener::get_wlp_kind( + const octet* serialized_kind, + LivelinessQosPolicyKind& liveliness_kind) +{ + /* + * From RTPS 2.5 9.6.3.1, the ParticipantMessageData kinds for WLP are: + * - PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE {0x00, 0x00, 0x00, 0x01} + * - PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE {0x00, 0x00, 0x00, 0x02} + */ + bool is_wlp = ( + serialized_kind[0] == 0 + && serialized_kind[1] == 0 + && serialized_kind[2] == 0 + && (serialized_kind[3] == 0x01 || serialized_kind[3] == 0x02)); + + if (is_wlp) + { + // Adjust and cast to LivelinessQosPolicyKind enum, where AUTOMATIC_LIVELINESS_QOS == 0 + liveliness_kind = static_cast(serialized_kind[3] - 0x01); + } + + return is_wlp; +} } /* namespace rtps */ } /* namespace eprosima */ -} +} // namespace eprosima