diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.cpp b/src/cpp/fastdds/publisher/DataWriterImpl.cpp index 5b95809ce15..de657ea543d 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.cpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.cpp @@ -1187,6 +1187,7 @@ const DataWriterQos& DataWriterImpl::get_qos() const ReturnCode_t DataWriterImpl::set_listener( DataWriterListener* listener) { + std::lock_guard scoped_lock(listener_mutex_); listener_ = listener; return ReturnCode_t::RETCODE_OK; } @@ -1934,6 +1935,7 @@ bool DataWriterImpl::can_qos_be_updated( DataWriterListener* DataWriterImpl::get_listener_for( const StatusMask& status) { + std::lock_guard scoped_lock(listener_mutex_); if (listener_ != nullptr && user_datawriter_->get_status_mask().is_active(status)) { diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.hpp b/src/cpp/fastdds/publisher/DataWriterImpl.hpp index 3498b51740d..6b421ec7b2f 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.hpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.hpp @@ -410,6 +410,9 @@ class DataWriterImpl : protected rtps::IReaderDataFilter //! DataWriterListener DataWriterListener* listener_ = nullptr; + //! Mutex to protect listener_ + std::mutex listener_mutex_; + //!History DataWriterHistory history_; diff --git a/src/cpp/rtps/attributes/RTPSParticipantAttributes.cpp b/src/cpp/rtps/attributes/RTPSParticipantAttributes.cpp index 23c68767733..e2f9df11abb 100644 --- a/src/cpp/rtps/attributes/RTPSParticipantAttributes.cpp +++ b/src/cpp/rtps/attributes/RTPSParticipantAttributes.cpp @@ -253,6 +253,7 @@ static void setup_transports_large_datav6( { Locator_t pdp_locator; pdp_locator.kind = LOCATOR_KIND_UDPv6; + IPLocator::setIPv6(pdp_locator, "ff1e::ffff:efff:1"); att.builtin.metatrafficMulticastLocatorList.push_back(pdp_locator); } } diff --git a/src/cpp/rtps/transport/TCPChannelResource.cpp b/src/cpp/rtps/transport/TCPChannelResource.cpp index 7cd4c018be9..ab95d4eaf2b 100644 --- a/src/cpp/rtps/transport/TCPChannelResource.cpp +++ b/src/cpp/rtps/transport/TCPChannelResource.cpp @@ -84,7 +84,7 @@ ResponseCode TCPChannelResource::process_bind_request( if (connection_status_.compare_exchange_strong(expected, eConnectionStatus::eEstablished)) { locator_ = IPLocator::toPhysicalLocator(locator); - EPROSIMA_LOG_INFO(RTCP_MSG, "Connection Stablished"); + EPROSIMA_LOG_INFO(RTCP_MSG, "Connection Established"); return RETCODE_OK; } else if (expected == eConnectionStatus::eEstablished) @@ -139,9 +139,7 @@ void TCPChannelResource::add_logical_port( pending_logical_output_ports_.emplace_back(port); if (connection_established()) { - scopedLock.unlock(); TCPTransactionId id = rtcp_manager->sendOpenLogicalPortRequest(this, port); - scopedLock.lock(); negotiating_logical_ports_[id] = port; } } diff --git a/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp b/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp index bb4aac5702f..b9b4c1336ed 100644 --- a/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp +++ b/src/cpp/rtps/transport/TCPChannelResourceBasic.cpp @@ -105,6 +105,7 @@ void TCPChannelResourceBasic::disconnect() { if (eConnecting < change_status(eConnectionStatus::eDisconnected) && alive()) { + std::lock_guard read_lock(read_mutex_); auto socket = socket_; std::error_code ec; diff --git a/src/cpp/rtps/transport/TCPSenderResource.hpp b/src/cpp/rtps/transport/TCPSenderResource.hpp index c5d136938e4..416ea94adfd 100644 --- a/src/cpp/rtps/transport/TCPSenderResource.hpp +++ b/src/cpp/rtps/transport/TCPSenderResource.hpp @@ -32,14 +32,14 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource TCPSenderResource( TCPTransportInterface& transport, - std::shared_ptr& channel) + eprosima::fastrtps::rtps::Locator_t& locator) : fastrtps::rtps::SenderResource(transport.kind()) - , channel_(channel) + , locator_(locator) { // Implementation functions are bound to the right transport parameters clean_up = [this, &transport]() { - transport.CloseOutputChannel(channel_); + transport.CloseOutputChannel(locator_); }; send_lambda_ = [this, &transport]( @@ -49,7 +49,7 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource fastrtps::rtps::LocatorsIterator* destination_locators_end, const std::chrono::steady_clock::time_point&) -> bool { - return transport.send(data, dataSize, channel_, destination_locators_begin, + return transport.send(data, dataSize, locator_, destination_locators_begin, destination_locators_end); }; } @@ -62,9 +62,9 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource } } - std::shared_ptr& channel() + fastrtps::rtps::Locator_t& locator() { - return channel_; + return locator_; } static TCPSenderResource* cast( @@ -102,7 +102,7 @@ class TCPSenderResource : public fastrtps::rtps::SenderResource TCPSenderResource& operator =( const SenderResource&) = delete; - std::shared_ptr channel_; + fastrtps::rtps::Locator_t locator_; }; } // namespace rtps diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 9d34ad8c964..5c6cd4d4611 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -234,7 +234,6 @@ void TCPTransportInterface::bind_socket( assert(it_remove != unbound_channel_resources_.end()); unbound_channel_resources_.erase(it_remove); channel_resources_[channel->locator()] = channel; - } bool TCPTransportInterface::check_crc( @@ -571,14 +570,10 @@ bool TCPTransportInterface::transform_remote_locator( } void TCPTransportInterface::CloseOutputChannel( - std::shared_ptr& channel) + fastrtps::rtps::Locator_t& locator) { - Locator physical_locator = channel->locator(); - channel.reset(); - std::unique_lock scopedLock(sockets_map_mutex_); - auto channel_resource = channel_resources_.find(physical_locator); - assert(channel_resource != channel_resources_.end()); - (void)channel_resource; + locator.set_Invalid_Address(); + locator.port = 0; } bool TCPTransportInterface::CloseInputChannel( @@ -607,7 +602,7 @@ bool TCPTransportInterface::CloseInputChannel( receiver_in_use->cv.wait(scopedLock, [&]() { - return receiver_in_use->in_use == false; + return receiver_in_use->in_use == 0; }); delete receiver_in_use; } @@ -632,87 +627,87 @@ bool TCPTransportInterface::OpenOutputChannel( return false; } - bool success = false; uint16_t logical_port = IPLocator::getLogicalPort(locator); - - if (logical_port != 0) + if (0 == logical_port) { - Locator physical_locator = IPLocator::toPhysicalLocator(locator); + return false; + } - // We try to find a SenderResource that can be reuse to this locator. - // Note: This is done in this level because if we do in NetworkFactory level, we have to mantain what transport - // already reuses a SenderResource. - for (auto& sender_resource : send_resource_list) - { - TCPSenderResource* tcp_sender_resource = TCPSenderResource::cast(*this, sender_resource.get()); + Locator physical_locator = IPLocator::toPhysicalLocator(locator); - if (tcp_sender_resource && (physical_locator == tcp_sender_resource->channel()->locator() || - (IPLocator::hasWan(locator) && - IPLocator::WanToLanLocator(physical_locator) == - tcp_sender_resource->channel()->locator()))) - { - // Look for an existing channel that matches this physical locator - auto existing_channel = channel_resources_.find(physical_locator); - // If the channel exists, check if the channel reference in the sender resource needs to be updated with - // the found channel - if (existing_channel != channel_resources_.end() && - existing_channel->second != tcp_sender_resource->channel()) - { - // Disconnect the old channel - tcp_sender_resource->channel()->disconnect(); - // Update sender resource with new channel - tcp_sender_resource->channel() = existing_channel->second; - } - // Add logical port to channel if it's not there yet - if (!tcp_sender_resource->channel()->is_logical_port_added(logical_port)) - { - tcp_sender_resource->channel()->add_logical_port(logical_port, rtcp_message_manager_.get()); - } + // We try to find a SenderResource that has this locator. + // Note: This is done in this level because if we do in NetworkFactory level, we have to mantain what transport + // already reuses a SenderResource. + for (auto& sender_resource : send_resource_list) + { + TCPSenderResource* tcp_sender_resource = TCPSenderResource::cast(*this, sender_resource.get()); - statistics_info_.add_entry(locator); - return true; - } + if (tcp_sender_resource && (physical_locator == tcp_sender_resource->locator() || + (IPLocator::hasWan(locator) && + IPLocator::WanToLanLocator(physical_locator) == + tcp_sender_resource->locator()))) + { + // If missing, logical port will be added in first send() + statistics_info_.add_entry(locator); + return true; } + } - // At this point, if there no SenderResource to reuse, the channel was created for reception (through an - // acceptor) or we have to create a new one. + // At this point, if there is no SenderResource to reuse, this is the first call to OpenOutputChannel for this locator. + // Need to check if a channel already exists for this locator. - std::unique_lock scopedLock(sockets_map_mutex_); - EPROSIMA_LOG_INFO(RTCP, "Called to OpenOutputChannel (physical: " << IPLocator::getPhysicalPort( - locator) << "; logical: " - << IPLocator::getLogicalPort( - locator) << ") @ " << IPLocator::to_string(locator)); + EPROSIMA_LOG_INFO(RTCP, "Called to OpenOutputChannel (physical: " << IPLocator::getPhysicalPort( + locator) << "; logical: " + << IPLocator::getLogicalPort( + locator) << ") @ " << IPLocator::to_string(locator)); - auto channel_resource = channel_resources_.find(physical_locator); + std::lock_guard socketsLock(sockets_map_mutex_); + auto channel_resource = channel_resources_.find(physical_locator); - // Maybe as WAN? - if (channel_resource == channel_resources_.end() && IPLocator::hasWan(locator)) + // Maybe as WAN? + if (channel_resource == channel_resources_.end() && IPLocator::hasWan(locator)) + { + Locator wan_locator = IPLocator::WanToLanLocator(locator); + channel_resource = channel_resources_.find(IPLocator::toPhysicalLocator(wan_locator)); + if (channel_resource != channel_resources_.end()) { - Locator wan_locator; - wan_locator.kind = locator.kind; - wan_locator.port = locator.port; // Copy full port - IPLocator::setIPv4(wan_locator, IPLocator::toWanstring(locator)); // WAN to IP - channel_resource = channel_resources_.find(IPLocator::toPhysicalLocator(wan_locator)); - if (channel_resource != channel_resources_.end()) - { - channel_resources_[IPLocator::toPhysicalLocator(locator)] = channel_resource->second; // Add alias! - } + channel_resources_[physical_locator] = channel_resource->second; // Add alias! } + } + // (Server-Client Topology OR LARGE DATA with PDP discovery after TCP connection) - Server side + if (channel_resource != channel_resources_.end()) + { std::shared_ptr channel; - - if (channel_resource != channel_resources_.end()) + // There is an existing channel in channel_resources_ created for reception with the remote locator as key. Use it. + channel = channel_resource->second; + // Add logical port to channel if it's not there yet + channel->add_logical_port(logical_port, rtcp_message_manager_.get()); + } + // (Server-Client Topology - Client Side) OR LARGE DATA Topology with PDP discovery before TCP connection + else + { + // Get listening port (0 if client) + uint16_t listening_port = 0; + const TCPTransportDescriptor* config = configuration(); + assert (config != nullptr); + if (!config->listening_ports.empty()) { - channel = channel_resource->second; + listening_port = config->listening_ports.front(); } - else + + // If the remote physical port is higher than our listening port, a new CONNECT channel needs to be created and connected + // and the locator added to the send_resource_list. + // If the remote physical port is lower than our listening port, only the locator needs to be added to the send_resource_list. + if (IPLocator::getPhysicalPort(physical_locator) > listening_port) { - // Create output channel - EPROSIMA_LOG_INFO(OpenOutputChannel, "OpenOutputChannel (physical: " + // Client side (either Server-Client or LARGE_DATA) + EPROSIMA_LOG_INFO(OpenOutputChannel, "OpenOutputChannel: [CONNECT] (physical: " << IPLocator::getPhysicalPort(locator) << "; logical: " << IPLocator::getLogicalPort(locator) << ") @ " << IPLocator::to_string(locator)); - channel.reset( + // Create a TCP_CONNECT_TYPE channel + std::shared_ptr channel( #if TLS_FOUND (configuration()->apply_security) ? static_cast( @@ -726,16 +721,23 @@ bool TCPTransportInterface::OpenOutputChannel( channel_resources_[physical_locator] = channel; channel->connect(channel_resources_[physical_locator]); + channel->add_logical_port(logical_port, rtcp_message_manager_.get()); + } + else + { + // Server side LARGE_DATA + // Act as server and wait to the other endpoint to connect. Add locator to sender_resource_list + EPROSIMA_LOG_INFO(OpenOutputChannel, "OpenOutputChannel: [WAIT_CONNECTION] (physical: " + << IPLocator::getPhysicalPort(locator) << "; logical: " + << IPLocator::getLogicalPort(locator) << ") @ " << IPLocator::to_string(locator)); } - - statistics_info_.add_entry(locator); - success = true; - channel->add_logical_port(logical_port, rtcp_message_manager_.get()); - send_resource_list.emplace_back( - static_cast(new TCPSenderResource(*this, channel))); } - return success; + statistics_info_.add_entry(locator); + send_resource_list.emplace_back( + static_cast(new TCPSenderResource(*this, physical_locator))); + + return true; } bool TCPTransportInterface::OpenInputChannel( @@ -875,16 +877,15 @@ void TCPTransportInterface::perform_listen_operation( logicalPort = IPLocator::getLogicalPort(remote_locator); std::unique_lock scopedLock(sockets_map_mutex_); auto it = receiver_resources_.find(logicalPort); - //TransportReceiverInterface* receiver = channel->GetMessageReceiver(logicalPort); if (it != receiver_resources_.end()) { TransportReceiverInterface* receiver = it->second.first; ReceiverInUseCV* receiver_in_use = it->second.second; - receiver_in_use->in_use = true; + receiver_in_use->in_use++; scopedLock.unlock(); receiver->OnDataReceived(msg.buffer, msg.length, channel->locator(), remote_locator); scopedLock.lock(); - receiver_in_use->in_use = false; + receiver_in_use->in_use--; receiver_in_use->cv.notify_one(); } else @@ -1148,7 +1149,7 @@ bool TCPTransportInterface::Receive( bool TCPTransportInterface::send( const octet* send_buffer, uint32_t send_buffer_size, - std::shared_ptr& channel, + const fastrtps::rtps::Locator_t& locator, fastrtps::rtps::LocatorsIterator* destination_locators_begin, fastrtps::rtps::LocatorsIterator* destination_locators_end) { @@ -1160,7 +1161,7 @@ bool TCPTransportInterface::send( { if (IsLocatorSupported(*it)) { - ret &= send(send_buffer, send_buffer_size, channel, *it); + ret &= send(send_buffer, send_buffer_size, locator, *it); } ++it; @@ -1172,27 +1173,25 @@ bool TCPTransportInterface::send( bool TCPTransportInterface::send( const octet* send_buffer, uint32_t send_buffer_size, - std::shared_ptr& channel, + const fastrtps::rtps::Locator_t& locator, const Locator& remote_locator) { using namespace eprosima::fastdds::statistics::rtps; bool locator_mismatch = false; - if (channel->locator() != IPLocator::toPhysicalLocator(remote_locator)) + if (locator != IPLocator::toPhysicalLocator(remote_locator)) { + // Locator of sender resource does not match remote locator (from LocatorSelector) locator_mismatch = true; } // Maybe is WAN? if (locator_mismatch && IPLocator::hasWan(remote_locator)) { - Locator wan_locator; - wan_locator.kind = remote_locator.kind; - wan_locator.port = IPLocator::toPhysicalLocator(remote_locator).port; - IPLocator::setIPv4(wan_locator, IPLocator::toWanstring(remote_locator)); // WAN to IP - //std::cout << "WANLocator: " << IPLocator::to_string(wan_locator) << std::endl; - if (channel->locator() == wan_locator) + Locator wan_locator = IPLocator::WanToLanLocator(remote_locator); + wan_locator = IPLocator::toPhysicalLocator(wan_locator); + if (locator == wan_locator) { locator_mismatch = false; } @@ -1200,14 +1199,21 @@ bool TCPTransportInterface::send( if (locator_mismatch || send_buffer_size > configuration()->sendBufferSize) { - //std::cout << "ChannelLocator: " << IPLocator::to_string(channel->locator()) << std::endl; - //std::cout << "RemoteLocator: " << IPLocator::to_string(remote_locator) << std::endl; - return false; } bool success = false; + std::lock_guard scoped_lock(sockets_map_mutex_); + auto channel_resource = channel_resources_.find(locator); + if (channel_resource == channel_resources_.end()) + { + // There is no channel for this locator. Skip send + return false; + } + + auto channel = channel_resource->second; + /* TODO Verify when cable is removed if(TCPChannelResource::TCPConnectionStatus::TCP_DISCONNECTED == channel->tcp_connection_status() && TCPChannelResource::TCPConnectionType::TCP_ACCEPT_TYPE == channel->tcp_connection_type()) @@ -1263,7 +1269,6 @@ bool TCPTransportInterface::send( TCPChannelResource::eConnectionStatus::eDisconnected == channel->connection_status()) { channel->set_all_ports_pending(); - std::unique_lock lock(sockets_map_mutex_); channel->connect(channel_resources_[channel->locator()]); } @@ -1307,7 +1312,7 @@ void TCPTransportInterface::SocketAccepted( { if (!error.value()) { - // Store the new connection. + // Always create a new channel, it might be replaced later in bind_socket() std::shared_ptr channel(new TCPChannelResourceBasic(this, io_service_, socket, configuration()->maxMessageSize)); @@ -1355,7 +1360,7 @@ void TCPTransportInterface::SecureSocketAccepted( { if (!error.value()) { - // Store the new connection. + // Always create a new secure_channel, it might be replaced later in bind_socket() std::shared_ptr secure_channel(new TCPChannelResourceSecure(this, io_service_, ssl_context_, socket, configuration()->maxMessageSize)); diff --git a/src/cpp/rtps/transport/TCPTransportInterface.h b/src/cpp/rtps/transport/TCPTransportInterface.h index f454e829425..c56fe0245d3 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.h +++ b/src/cpp/rtps/transport/TCPTransportInterface.h @@ -64,7 +64,7 @@ class TCPTransportInterface : public TransportInterface { public: - bool in_use = false; + uint16_t in_use = 0; std::condition_variable cv; }; @@ -100,7 +100,9 @@ class TCPTransportInterface : public TransportInterface std::shared_ptr rtcp_message_manager_; std::mutex rtcp_message_manager_mutex_; std::condition_variable rtcp_message_manager_cv_; + // Mutex to control access to channel_resources_ mutable std::mutex sockets_map_mutex_; + // Mutex to control access to unbound_channel_resources_ mutable std::mutex unbound_map_mutex_; std::map> channel_resources_; // The key is the "Physical locator" @@ -203,12 +205,13 @@ class TCPTransportInterface : public TransportInterface std::string get_password() const; /** - * Send a buffer to a destination + * Send a buffer to a destination indicated by the locator. + * There must exist a channel bound to the locator, otherwise the send will be skipped. */ bool send( const fastrtps::rtps::octet* send_buffer, uint32_t send_buffer_size, - std::shared_ptr& channel, + const eprosima::fastrtps::rtps::Locator_t& locator, const Locator& remote_locator); public: @@ -225,9 +228,9 @@ class TCPTransportInterface : public TransportInterface bool CloseInputChannel( const Locator&) override; - //! Removes all outbound sockets on the given port. + //! Resets the locator bound to the sender resource. void CloseOutputChannel( - std::shared_ptr& channel); + fastrtps::rtps::Locator_t& locator); //! Reports whether Locators correspond to the same port. bool DoInputLocatorsMatch( @@ -339,11 +342,11 @@ class TCPTransportInterface : public TransportInterface Locator& remote_locator); /** - * Blocking Send through the specified channel. + * Blocking Send through the channel inside channel_resources_ matching the locator provided. * @param send_buffer Slice into the raw data to send. * @param send_buffer_size Size of the raw data. It will be used as a bounds check for the previous argument. * It must not exceed the send_buffer_size fed to this class during construction. - * @param channel channel we're sending from. + * @param locator Physical locator we're sending to. * @param destination_locators_begin pointer to destination locators iterator begin, the iterator can be advanced inside this fuction * so should not be reuse. * @param destination_locators_end pointer to destination locators iterator end, the iterator can be advanced inside this fuction @@ -352,7 +355,7 @@ class TCPTransportInterface : public TransportInterface bool send( const fastrtps::rtps::octet* send_buffer, uint32_t send_buffer_size, - std::shared_ptr& channel, + const fastrtps::rtps::Locator_t& locator, fastrtps::rtps::LocatorsIterator* destination_locators_begin, fastrtps::rtps::LocatorsIterator* destination_locators_end); diff --git a/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp b/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp index 5cd7f4b8d88..63d56ee9198 100644 --- a/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp +++ b/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp @@ -463,7 +463,6 @@ ResponseCode RTCPMessageManager::processBindConnectionRequest( if (RETCODE_OK == code) { - // In server side, at this moment, the channel has to be included in the map. mTransport->bind_socket(channel); } @@ -477,8 +476,11 @@ ResponseCode RTCPMessageManager::processOpenLogicalPortRequest( const OpenLogicalPortRequest_t& request, const TCPTransactionId& transaction_id) { - if (!channel->connection_established()) + // A server can send an OpenLogicalPortRequest to a client before the BindConnectionResponse is processed. + if (!channel->connection_established() && + channel->connection_status_ != TCPChannelResource::eConnectionStatus::eWaitingForBindResponse) { + EPROSIMA_LOG_ERROR(RTCP, "Trying to send [OPEN_LOGICAL_PORT_RESPONSE] without connection established."); sendData(channel, CHECK_LOGICAL_PORT_RESPONSE, transaction_id, nullptr, RETCODE_SERVER_ERROR); } else if (request.logicalPort() == 0 || !mTransport->is_input_port_open(request.logicalPort())) diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index 4ef7e7dc04e..977714c09f3 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -49,6 +49,8 @@ #include #include #include +#include +#include #include #include #include @@ -952,6 +954,69 @@ class PubSubReader return *this; } + PubSubReader& setup_large_data_tcp( + bool v6 = false, + const uint16_t& port = 0) + { + participant_qos_.transport().use_builtin_transports = false; + + /* Transports configuration */ + // UDP transport for PDP over multicast + // TCP transport for EDP and application data (The listening port must to be unique for + // each participant in the same host) + uint16_t tcp_listening_port = port; + if (v6) + { + auto pdp_transport = std::make_shared(); + participant_qos_.transport().user_transports.push_back(pdp_transport); + + auto data_transport = std::make_shared(); + data_transport->add_listener_port(tcp_listening_port); + participant_qos_.transport().user_transports.push_back(data_transport); + } + else + { + auto pdp_transport = std::make_shared(); + participant_qos_.transport().user_transports.push_back(pdp_transport); + + auto data_transport = std::make_shared(); + data_transport->add_listener_port(tcp_listening_port); + participant_qos_.transport().user_transports.push_back(data_transport); + } + + /* Locators */ + eprosima::fastrtps::rtps::Locator_t pdp_locator; + eprosima::fastrtps::rtps::Locator_t tcp_locator; + if (v6) + { + // Define locator for PDP over multicast + pdp_locator.kind = LOCATOR_KIND_UDPv6; + eprosima::fastrtps::rtps::IPLocator::setIPv6(pdp_locator, "ff1e::ffff:efff:1"); + // Define locator for EDP and user data + tcp_locator.kind = LOCATOR_KIND_TCPv6; + eprosima::fastrtps::rtps::IPLocator::setIPv6(tcp_locator, "::"); + eprosima::fastrtps::rtps::IPLocator::setPhysicalPort(tcp_locator, tcp_listening_port); + eprosima::fastrtps::rtps::IPLocator::setLogicalPort(tcp_locator, 0); + } + else + { + // Define locator for PDP over multicast + pdp_locator.kind = LOCATOR_KIND_UDPv4; + eprosima::fastrtps::rtps::IPLocator::setIPv4(pdp_locator, "239.255.0.1"); + // Define locator for EDP and user data + tcp_locator.kind = LOCATOR_KIND_TCPv4; + eprosima::fastrtps::rtps::IPLocator::setIPv4(tcp_locator, "0.0.0.0"); + eprosima::fastrtps::rtps::IPLocator::setPhysicalPort(tcp_locator, tcp_listening_port); + eprosima::fastrtps::rtps::IPLocator::setLogicalPort(tcp_locator, 0); + } + + participant_qos_.wire_protocol().builtin.metatrafficMulticastLocatorList.push_back(pdp_locator); + participant_qos_.wire_protocol().builtin.metatrafficUnicastLocatorList.push_back(tcp_locator); + participant_qos_.wire_protocol().default_unicast_locator_list.push_back(tcp_locator); + + return *this; + } + PubSubReader& disable_builtin_transport() { participant_qos_.transport().use_builtin_transports = false; diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index e81cfababae..4f1eab7bdd9 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -51,6 +51,8 @@ #include #include #include +#include +#include #include #include #include @@ -924,6 +926,69 @@ class PubSubWriter return *this; } + PubSubWriter& setup_large_data_tcp( + bool v6 = false, + const uint16_t& port = 0) + { + participant_qos_.transport().use_builtin_transports = false; + + /* Transports configuration */ + // UDP transport for PDP over multicast + // TCP transport for EDP and application data (The listening port must to be unique for + // each participant in the same host) + uint16_t tcp_listening_port = port; + if (v6) + { + auto pdp_transport = std::make_shared(); + participant_qos_.transport().user_transports.push_back(pdp_transport); + + auto data_transport = std::make_shared(); + data_transport->add_listener_port(tcp_listening_port); + participant_qos_.transport().user_transports.push_back(data_transport); + } + else + { + auto pdp_transport = std::make_shared(); + participant_qos_.transport().user_transports.push_back(pdp_transport); + + auto data_transport = std::make_shared(); + data_transport->add_listener_port(tcp_listening_port); + participant_qos_.transport().user_transports.push_back(data_transport); + } + + /* Locators */ + eprosima::fastrtps::rtps::Locator_t pdp_locator; + eprosima::fastrtps::rtps::Locator_t tcp_locator; + if (v6) + { + // Define locator for PDP over multicast + pdp_locator.kind = LOCATOR_KIND_UDPv6; + eprosima::fastrtps::rtps::IPLocator::setIPv6(pdp_locator, "ff1e::ffff:efff:1"); + // Define locator for EDP and user data + tcp_locator.kind = LOCATOR_KIND_TCPv6; + eprosima::fastrtps::rtps::IPLocator::setIPv6(tcp_locator, "::"); + eprosima::fastrtps::rtps::IPLocator::setPhysicalPort(tcp_locator, tcp_listening_port); + eprosima::fastrtps::rtps::IPLocator::setLogicalPort(tcp_locator, 0); + } + else + { + // Define locator for PDP over multicast + pdp_locator.kind = LOCATOR_KIND_UDPv4; + eprosima::fastrtps::rtps::IPLocator::setIPv4(pdp_locator, "239.255.0.1"); + // Define locator for EDP and user data + tcp_locator.kind = LOCATOR_KIND_TCPv4; + eprosima::fastrtps::rtps::IPLocator::setIPv4(tcp_locator, "0.0.0.0"); + eprosima::fastrtps::rtps::IPLocator::setPhysicalPort(tcp_locator, tcp_listening_port); + eprosima::fastrtps::rtps::IPLocator::setLogicalPort(tcp_locator, 0); + } + + participant_qos_.wire_protocol().builtin.metatrafficMulticastLocatorList.push_back(pdp_locator); + participant_qos_.wire_protocol().builtin.metatrafficUnicastLocatorList.push_back(tcp_locator); + participant_qos_.wire_protocol().default_unicast_locator_list.push_back(tcp_locator); + + return *this; + } + PubSubWriter& disable_builtin_transport() { participant_qos_.transport().use_builtin_transports = false; diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp index c89cac71c15..3ec3f834d4e 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp @@ -39,6 +39,8 @@ #include #include #include +#include +#include #include #include #include @@ -738,6 +740,69 @@ class PubSubReader return *this; } + PubSubReader& setup_large_data_tcp( + bool v6 = false, + const uint16_t& port = 0) + { + participant_attr_.rtps.useBuiltinTransports = false; + + /* Transports configuration */ + // UDP transport for PDP over multicast + // TCP transport for EDP and application data (The listening port must to be unique for + // each participant in the same host) + uint16_t tcp_listening_port = port; + if (v6) + { + auto pdp_transport = std::make_shared(); + participant_attr_.rtps.userTransports.push_back(pdp_transport); + + auto data_transport = std::make_shared(); + data_transport->add_listener_port(tcp_listening_port); + participant_attr_.rtps.userTransports.push_back(data_transport); + } + else + { + auto pdp_transport = std::make_shared(); + participant_attr_.rtps.userTransports.push_back(pdp_transport); + + auto data_transport = std::make_shared(); + data_transport->add_listener_port(tcp_listening_port); + participant_attr_.rtps.userTransports.push_back(data_transport); + } + + /* Locators */ + eprosima::fastrtps::rtps::Locator_t pdp_locator; + eprosima::fastrtps::rtps::Locator_t tcp_locator; + if (v6) + { + // Define locator for PDP over multicast + pdp_locator.kind = LOCATOR_KIND_UDPv6; + eprosima::fastrtps::rtps::IPLocator::setIPv6(pdp_locator, "ff1e::ffff:efff:1"); + // Define locator for EDP and user data + tcp_locator.kind = LOCATOR_KIND_TCPv6; + eprosima::fastrtps::rtps::IPLocator::setIPv6(tcp_locator, "::"); + eprosima::fastrtps::rtps::IPLocator::setPhysicalPort(tcp_locator, tcp_listening_port); + eprosima::fastrtps::rtps::IPLocator::setLogicalPort(tcp_locator, 0); + } + else + { + // Define locator for PDP over multicast + pdp_locator.kind = LOCATOR_KIND_UDPv4; + eprosima::fastrtps::rtps::IPLocator::setIPv4(pdp_locator, "239.255.0.1"); + // Define locator for EDP and user data + tcp_locator.kind = LOCATOR_KIND_TCPv4; + eprosima::fastrtps::rtps::IPLocator::setIPv4(tcp_locator, "0.0.0.0"); + eprosima::fastrtps::rtps::IPLocator::setPhysicalPort(tcp_locator, tcp_listening_port); + eprosima::fastrtps::rtps::IPLocator::setLogicalPort(tcp_locator, 0); + } + + participant_attr_.rtps.builtin.metatrafficMulticastLocatorList.push_back(pdp_locator); + participant_attr_.rtps.builtin.metatrafficUnicastLocatorList.push_back(tcp_locator); + participant_attr_.rtps.defaultUnicastLocatorList.push_back(tcp_locator); + + return *this; + } + PubSubReader& disable_builtin_transport() { participant_attr_.rtps.useBuiltinTransports = false; diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp index 1d3fac6b6f2..33f6e857175 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp @@ -42,6 +42,8 @@ #include #include #include +#include +#include #include #include #include @@ -750,6 +752,69 @@ class PubSubWriter return *this; } + PubSubWriter& setup_large_data_tcp( + bool v6 = false, + const uint16_t& port = 0) + { + participant_attr_.rtps.useBuiltinTransports = false; + + /* Transports configuration */ + // UDP transport for PDP over multicast + // TCP transport for EDP and application data (The listening port must to be unique for + // each participant in the same host) + uint16_t tcp_listening_port = port; + if (v6) + { + auto pdp_transport = std::make_shared(); + participant_attr_.rtps.userTransports.push_back(pdp_transport); + + auto data_transport = std::make_shared(); + data_transport->add_listener_port(tcp_listening_port); + participant_attr_.rtps.userTransports.push_back(data_transport); + } + else + { + auto pdp_transport = std::make_shared(); + participant_attr_.rtps.userTransports.push_back(pdp_transport); + + auto data_transport = std::make_shared(); + data_transport->add_listener_port(tcp_listening_port); + participant_attr_.rtps.userTransports.push_back(data_transport); + } + + /* Locators */ + eprosima::fastrtps::rtps::Locator_t pdp_locator; + eprosima::fastrtps::rtps::Locator_t tcp_locator; + if (v6) + { + // Define locator for PDP over multicast + pdp_locator.kind = LOCATOR_KIND_UDPv6; + eprosima::fastrtps::rtps::IPLocator::setIPv6(pdp_locator, "ff1e::ffff:efff:1"); + // Define locator for EDP and user data + tcp_locator.kind = LOCATOR_KIND_TCPv6; + eprosima::fastrtps::rtps::IPLocator::setIPv6(tcp_locator, "::"); + eprosima::fastrtps::rtps::IPLocator::setPhysicalPort(tcp_locator, tcp_listening_port); + eprosima::fastrtps::rtps::IPLocator::setLogicalPort(tcp_locator, 0); + } + else + { + // Define locator for PDP over multicast + pdp_locator.kind = LOCATOR_KIND_UDPv4; + eprosima::fastrtps::rtps::IPLocator::setIPv4(pdp_locator, "239.255.0.1"); + // Define locator for EDP and user data + tcp_locator.kind = LOCATOR_KIND_TCPv4; + eprosima::fastrtps::rtps::IPLocator::setIPv4(tcp_locator, "0.0.0.0"); + eprosima::fastrtps::rtps::IPLocator::setPhysicalPort(tcp_locator, tcp_listening_port); + eprosima::fastrtps::rtps::IPLocator::setLogicalPort(tcp_locator, 0); + } + + participant_attr_.rtps.builtin.metatrafficMulticastLocatorList.push_back(pdp_locator); + participant_attr_.rtps.builtin.metatrafficUnicastLocatorList.push_back(tcp_locator); + participant_attr_.rtps.defaultUnicastLocatorList.push_back(tcp_locator); + + return *this; + } + PubSubWriter& disable_builtin_transport() { participant_attr_.rtps.useBuiltinTransports = false; diff --git a/test/blackbox/common/BlackboxTests.hpp b/test/blackbox/common/BlackboxTests.hpp index d56fd742302..46cabd7d1ca 100644 --- a/test/blackbox/common/BlackboxTests.hpp +++ b/test/blackbox/common/BlackboxTests.hpp @@ -142,6 +142,10 @@ std::list default_keyedhelloworld_data_generator( size_t max = 0, bool unique_key = false); +std::list default_keyedhelloworld_per_participant_data_generator( + size_t participants, + size_t max = 0); + std::list default_large_string_data_generator( size_t max = 0); diff --git a/test/blackbox/common/BlackboxTestsTransportCustom.cpp b/test/blackbox/common/BlackboxTestsTransportCustom.cpp index 5944d53d702..3680b0041f9 100644 --- a/test/blackbox/common/BlackboxTestsTransportCustom.cpp +++ b/test/blackbox/common/BlackboxTestsTransportCustom.cpp @@ -243,9 +243,9 @@ class BuiltinTransportsTest case BuiltinTransportsTestCase::ENV: { #ifdef _WIN32 - _putenv_s(env_var_name_.c_str(), env_var_name_.c_str()); + _putenv_s(env_var_name_.c_str(), env_var_value.c_str()); #else - setenv(env_var_name_.c_str(), env_var_name_.c_str(), 1); + setenv(env_var_name_.c_str(), env_var_value.c_str(), 1); #endif // _WIN32 break; } diff --git a/test/blackbox/common/BlackboxTestsTransportTCP.cpp b/test/blackbox/common/BlackboxTestsTransportTCP.cpp index 11106db8205..b6f31643955 100644 --- a/test/blackbox/common/BlackboxTestsTransportTCP.cpp +++ b/test/blackbox/common/BlackboxTestsTransportTCP.cpp @@ -13,11 +13,16 @@ // limitations under the License. #include "BlackboxTests.hpp" + #include "TCPReqRepHelloWorldRequester.hpp" #include "TCPReqRepHelloWorldReplier.hpp" #include "PubSubReader.hpp" #include "PubSubWriter.hpp" +#include +#include +#include + #include #include @@ -724,6 +729,115 @@ TEST_P(TransportTCP, TCPv6_autofill_port) EXPECT_TRUE(IPLocator::getPhysicalPort(p2_locators.begin()[0]) == port); } +// Test TCP transport on LARGE_DATA topology +TEST_P(TransportTCP, large_data_topology) +{ + eprosima::fastdds::dds::Log::SetVerbosity(eprosima::fastdds::dds::Log::Warning); + + // Limited to 12 readers and 12 writers so as not to exceed the system's file descriptor limit. + uint16_t n_participants = 12; + constexpr uint32_t samples_per_participant = 10; + + /* Test configuration */ + std::vector>> readers; + std::vector>> writers; + + for (uint16_t i = 0; i < n_participants; i++) + { + readers.emplace_back(new PubSubReader(TEST_TOPIC_NAME)); + writers.emplace_back(new PubSubWriter(TEST_TOPIC_NAME)); + } + + // Create a vector of ports and shuffle it + std::vector ports; + for (uint16_t i = 0; i < 2 * n_participants; i++) + { + ports.push_back(7200 + i); + } + auto rng = std::default_random_engine{}; + std::shuffle(ports.begin(), ports.end(), rng); + + // Reliable Keep_all to wait for all acked as end condition + for (uint16_t i = 0; i < n_participants; i++) + { + writers[i]->reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) + .history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS) + .durability_kind(eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS) + .lease_duration(eprosima::fastrtps::c_TimeInfinite, eprosima::fastrtps::Duration_t(3, 0)) + .resource_limits_max_instances(1) + .resource_limits_max_samples_per_instance(samples_per_participant); + + readers[i]->reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) + .history_kind(eprosima::fastrtps::KEEP_ALL_HISTORY_QOS) + .durability_kind(eprosima::fastrtps::TRANSIENT_LOCAL_DURABILITY_QOS) + .lease_duration(eprosima::fastrtps::c_TimeInfinite, eprosima::fastrtps::Duration_t(3, 0)) + .resource_limits_max_instances(n_participants) + .resource_limits_max_samples_per_instance(samples_per_participant); + + // Force TCP EDP discovery & data communication and UDP PDP discovery (NO SHM) + writers[i]->setup_large_data_tcp(use_ipv6, ports[i]); + readers[i]->setup_large_data_tcp(use_ipv6, ports[n_participants + i]); + } + + // Init participants + for (uint16_t i = 0; i < n_participants; i++) + { + writers[i]->init(); + readers[i]->init(); + ASSERT_TRUE(writers[i]->isInitialized()); + ASSERT_TRUE(readers[i]->isInitialized()); + } + + // Wait for discovery + for (uint16_t i = 0; i < n_participants; i++) + { + writers[i]->wait_discovery(n_participants, std::chrono::seconds(0)); + ASSERT_EQ(writers[i]->get_matched(), n_participants); + readers[i]->wait_discovery(std::chrono::seconds(0), n_participants); + ASSERT_EQ(readers[i]->get_matched(), n_participants); + } + + // Send and receive data + std::list data; + data = default_keyedhelloworld_per_participant_data_generator(n_participants, samples_per_participant); + + for (auto& reader : readers) + { + reader->startReception(data); + } + + auto validate_key = [](const std::list& data, uint16_t participant_key) + { + for (const auto& sample : data) + { + ASSERT_EQ(sample.key(), participant_key); + } + }; + + for (uint16_t i = 0; i < n_participants; i++) + { + auto start = std::next(data.begin(), i * samples_per_participant ); + auto end = std::next(start, samples_per_participant); + auto writer_data(std::list(start, end)); + validate_key(writer_data, i); + writers[i]->send(writer_data); + EXPECT_TRUE(writer_data.empty()); + } + + for (auto& reader : readers) + { + reader->block_for_all(); + } + for (auto& writer : writers) + { + EXPECT_TRUE(writer->waitForAllAcked(std::chrono::seconds(5))); + } + + // Destroy participants + readers.clear(); + writers.clear(); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else @@ -744,4 +858,4 @@ GTEST_INSTANTIATE_TEST_MACRO(TransportTCP, return "Transport" + suffix; } - }); \ No newline at end of file + }); diff --git a/test/blackbox/utils/data_generators.cpp b/test/blackbox/utils/data_generators.cpp index 6d40057f60f..99f809a9e22 100644 --- a/test/blackbox/utils/data_generators.cpp +++ b/test/blackbox/utils/data_generators.cpp @@ -84,6 +84,35 @@ std::list default_keyedhelloworld_data_generator( return returnedValue; } +std::list default_keyedhelloworld_per_participant_data_generator( + size_t participants, + size_t max) +{ + uint16_t participant_key = 0; + uint16_t index = 0; + size_t maximum = max ? max : 10; + std::list returnedValue(maximum * participants); + + std::generate(returnedValue.begin(), returnedValue.end(), [&index, &participant_key, &maximum] + { + KeyedHelloWorld hello; + hello.index(index); + hello.key(participant_key); + std::stringstream ss; + ss << "HelloWorld " << index; + hello.message(ss.str()); + ++index; + if (index == maximum) + { + index = 0; + ++participant_key; + } + return hello; + }); + + return returnedValue; +} + std::list default_large_string_data_generator( size_t max) { diff --git a/test/unittest/dds/participant/ParticipantTests.cpp b/test/unittest/dds/participant/ParticipantTests.cpp index 2c255276a2d..e5077ffbb4e 100644 --- a/test/unittest/dds/participant/ParticipantTests.cpp +++ b/test/unittest/dds/participant/ParticipantTests.cpp @@ -3976,6 +3976,7 @@ TEST(ParticipantTests, ParticipantCreationWithBuiltinTransport) }; EXPECT_TRUE(transport_check(attributes_)); EXPECT_FALSE(attributes_.useBuiltinTransports); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, DomainParticipantFactory::get_instance()->delete_participant(participant_)); } { @@ -4003,6 +4004,7 @@ TEST(ParticipantTests, ParticipantCreationWithBuiltinTransport) }; EXPECT_TRUE(transport_check(attributes_)); EXPECT_FALSE(attributes_.useBuiltinTransports); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, DomainParticipantFactory::get_instance()->delete_participant(participant_)); } { @@ -4030,7 +4032,7 @@ TEST(ParticipantTests, ParticipantCreationWithBuiltinTransport) }; EXPECT_TRUE(transport_check(attributes_)); EXPECT_FALSE(attributes_.useBuiltinTransports); - + EXPECT_EQ(ReturnCode_t::RETCODE_OK, DomainParticipantFactory::get_instance()->delete_participant(participant_)); } { @@ -4058,6 +4060,7 @@ TEST(ParticipantTests, ParticipantCreationWithBuiltinTransport) }; EXPECT_TRUE(transport_check(attributes_)); EXPECT_FALSE(attributes_.useBuiltinTransports); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, DomainParticipantFactory::get_instance()->delete_participant(participant_)); } { @@ -4085,6 +4088,7 @@ TEST(ParticipantTests, ParticipantCreationWithBuiltinTransport) }; EXPECT_TRUE(transport_check(attributes_)); EXPECT_FALSE(attributes_.useBuiltinTransports); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, DomainParticipantFactory::get_instance()->delete_participant(participant_)); } { @@ -4126,6 +4130,7 @@ TEST(ParticipantTests, ParticipantCreationWithBuiltinTransport) }; EXPECT_TRUE(transport_check(attributes_)); EXPECT_FALSE(attributes_.useBuiltinTransports); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, DomainParticipantFactory::get_instance()->delete_participant(participant_)); } { @@ -4167,6 +4172,7 @@ TEST(ParticipantTests, ParticipantCreationWithBuiltinTransport) }; EXPECT_TRUE(transport_check(attributes_)); EXPECT_FALSE(attributes_.useBuiltinTransports); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, DomainParticipantFactory::get_instance()->delete_participant(participant_)); } } diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index af50eebd139..8ec3f8748c4 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -132,8 +132,11 @@ TEST_F(TCPv4Tests, opening_and_closing_output_channel_with_listener) Locator_t genericOutputChannelLocator; genericOutputChannelLocator.kind = LOCATOR_KIND_TCPv4; - genericOutputChannelLocator.port = g_output_port; // arbitrary - IPLocator::setLogicalPort(genericOutputChannelLocator, g_output_port); + // The OpenOutputChannel locator argument has to be greater than the listening port + // to avoid falling in large data behavior, that would make the channel behave as an + // acceptor (no channel resource is created until it receives a connection request). + genericOutputChannelLocator.port = g_output_port + 1; + IPLocator::setLogicalPort(genericOutputChannelLocator, g_output_port + 1); SendResourceList send_resource_list; // Then