-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ProtocolStart and GracefulClose messages #3839
Merged
Merged
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
3ce6166
Add ProtocolStart message to sync inbound/outbound peer send/receive …
gregtatcam 5d637ac
[FOLD] Clean up send queue on app failure and close connection on wri…
gregtatcam c470ed6
[FOLD] Close connection in write call back if
gregtatcam cb16ebb
[FOLD] Clang format fix.
gregtatcam a7f0d0b
[FOLD] Fix id_t.
gregtatcam fdcb750
[FOLD] Fix shard info message.
gregtatcam 1231c81
[FOLD] Add read_buf_ to the outbound PeerImp constructor.
gregtatcam 044df76
Merge branch 'develop' into start-protocol
gregtatcam 6cdc895
Update ProtocolVersion test.
gregtatcam de9b335
Merge branch 'develop' into start-protocol
intelliot 28f99c5
Merge branch 'develop' into start-protocol
intelliot ff89cf4
Merge branch 'develop' into start-protocol
intelliot 8b62f5d
Merge branch 'develop' into start-protocol
intelliot f9033b9
Merge branch 'develop' into start-protocol
intelliot File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
//------------------------------------------------------------------------------ | ||
/* | ||
This file is part of rippled: https://github.com/ripple/rippled | ||
Copyright (c) 2012-2021 Ripple Labs Inc. | ||
|
||
Permission to use, copy, modify, and/or distribute this software for any | ||
purpose with or without fee is hereby granted, provided that the above | ||
copyright notice and this permission notice appear in all copies. | ||
|
||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES | ||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | ||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR | ||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | ||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | ||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF | ||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | ||
*/ | ||
//============================================================================== | ||
|
||
#include <ripple/overlay/Cluster.h> | ||
#include <ripple/overlay/impl/InboundHandoff.h> | ||
intelliot marked this conversation as resolved.
Show resolved
Hide resolved
|
||
#include <ripple/overlay/impl/PeerImp.h> | ||
|
||
#include <boost/beast/core/ostream.hpp> | ||
|
||
namespace ripple { | ||
|
||
InboundHandoff::InboundHandoff( | ||
Application& app, | ||
id_t id, | ||
std::shared_ptr<PeerFinder::Slot> const& slot, | ||
http_request_type&& request, | ||
PublicKey const& publicKey, | ||
ProtocolVersion protocol, | ||
Resource::Consumer consumer, | ||
std::unique_ptr<stream_type>&& stream_ptr, | ||
OverlayImpl& overlay) | ||
: OverlayImpl::Child(overlay) | ||
, app_(app) | ||
, id_(id) | ||
, sink_( | ||
app_.journal("Peer"), | ||
[id]() { | ||
std::stringstream ss; | ||
ss << "[" << std::setfill('0') << std::setw(3) << id << "] "; | ||
return ss.str(); | ||
}()) | ||
, journal_(sink_) | ||
, stream_ptr_(std::move(stream_ptr)) | ||
, strand_(stream_ptr_->next_layer().socket().get_executor()) | ||
, remote_address_(slot->remote_endpoint()) | ||
, protocol_(protocol) | ||
, publicKey_(publicKey) | ||
, usage_(consumer) | ||
, slot_(slot) | ||
, request_(std::move(request)) | ||
{ | ||
} | ||
|
||
void | ||
InboundHandoff::run() | ||
{ | ||
if (!strand_.running_in_this_thread()) | ||
return post( | ||
strand_, std::bind(&InboundHandoff::run, shared_from_this())); | ||
sendResponse(); | ||
} | ||
|
||
void | ||
InboundHandoff::stop() | ||
{ | ||
if (!strand_.running_in_this_thread()) | ||
return post( | ||
strand_, std::bind(&InboundHandoff::stop, shared_from_this())); | ||
if (stream_ptr_->next_layer().socket().is_open()) | ||
{ | ||
JLOG(journal_.debug()) << "Stop"; | ||
} | ||
close(); | ||
} | ||
|
||
void | ||
InboundHandoff::sendResponse() | ||
{ | ||
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_); | ||
// This shouldn't fail since we already computed | ||
// the shared value successfully in OverlayImpl | ||
if (!sharedValue) | ||
return fail("makeSharedValue: Unexpected failure"); | ||
|
||
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_); | ||
JLOG(journal_.info()) << "Public Key: " | ||
<< toBase58(TokenType::NodePublic, publicKey_); | ||
|
||
auto write_buffer = std::make_shared<boost::beast::multi_buffer>(); | ||
|
||
boost::beast::ostream(*write_buffer) << makeResponse( | ||
!overlay_.peerFinder().config().peerPrivate, | ||
request_, | ||
overlay_.setup().public_ip, | ||
remote_address_.address(), | ||
*sharedValue, | ||
overlay_.setup().networkID, | ||
protocol_, | ||
app_); | ||
|
||
// Write the whole buffer and only start protocol when that's done. | ||
boost::asio::async_write( | ||
*stream_ptr_, | ||
write_buffer->data(), | ||
boost::asio::transfer_all(), | ||
bind_executor( | ||
strand_, | ||
[this, write_buffer, self = shared_from_this()]( | ||
error_code ec, std::size_t bytes_transferred) { | ||
if (!stream_ptr_->next_layer().socket().is_open()) | ||
return; | ||
if (ec == boost::asio::error::operation_aborted) | ||
return; | ||
if (ec) | ||
return fail("onWriteResponse", ec); | ||
if (write_buffer->size() == bytes_transferred) | ||
return createPeer(); | ||
return fail("Failed to write header"); | ||
})); | ||
} | ||
|
||
void | ||
InboundHandoff::fail(std::string const& name, error_code const& ec) | ||
{ | ||
if (socket().is_open()) | ||
{ | ||
JLOG(journal_.warn()) | ||
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_) | ||
<< " at " << remote_address_.to_string() << ": " << ec.message(); | ||
} | ||
close(); | ||
} | ||
|
||
void | ||
InboundHandoff::fail(std::string const& reason) | ||
{ | ||
if (journal_.active(beast::severities::kWarning) && socket().is_open()) | ||
{ | ||
auto const n = app_.cluster().member(publicKey_); | ||
JLOG(journal_.warn()) | ||
<< (n ? remote_address_.to_string() : *n) << " failed: " << reason; | ||
} | ||
close(); | ||
} | ||
|
||
void | ||
InboundHandoff::close() | ||
{ | ||
if (socket().is_open()) | ||
{ | ||
socket().close(); | ||
JLOG(journal_.debug()) << "Closed"; | ||
} | ||
} | ||
|
||
void | ||
InboundHandoff::createPeer() | ||
{ | ||
auto peer = std::make_shared<PeerImp>( | ||
app_, | ||
id_, | ||
slot_, | ||
std::move(request_), | ||
publicKey_, | ||
protocol_, | ||
usage_, | ||
std::move(stream_ptr_), | ||
overlay_); | ||
|
||
overlay_.add_active(peer); | ||
} | ||
|
||
InboundHandoff::socket_type& | ||
InboundHandoff::socket() const | ||
{ | ||
return stream_ptr_->next_layer().socket(); | ||
} | ||
|
||
} // namespace ripple |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
//------------------------------------------------------------------------------ | ||
/* | ||
This file is part of rippled: https://github.com/ripple/rippled | ||
Copyright (c) 2012-2021 Ripple Labs Inc. | ||
|
||
Permission to use, copy, modify, and/or distribute this software for any | ||
purpose with or without fee is hereby granted, provided that the above | ||
copyright notice and this permission notice appear in all copies. | ||
|
||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES | ||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | ||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR | ||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | ||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | ||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF | ||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | ||
*/ | ||
//============================================================================== | ||
|
||
#ifndef RIPPLE_OVERLAY_INBOUNDHANDOFF_H_INCLUDED | ||
#define RIPPLE_OVERLAY_INBOUNDHANDOFF_H_INCLUDED | ||
|
||
#include <ripple/overlay/impl/OverlayImpl.h> | ||
|
||
namespace ripple { | ||
|
||
/** Sends HTTP response. Instantiates the inbound peer | ||
* once the response is sent. Maintains all data members | ||
* required for the inbound peer instantiation. | ||
*/ | ||
class InboundHandoff : public OverlayImpl::Child, | ||
public std::enable_shared_from_this<InboundHandoff> | ||
{ | ||
private: | ||
using error_code = boost::system::error_code; | ||
using socket_type = boost::asio::ip::tcp::socket; | ||
using middle_type = boost::beast::tcp_stream; | ||
using stream_type = boost::beast::ssl_stream<middle_type>; | ||
using id_t = Peer::id_t; | ||
Application& app_; | ||
id_t const id_; | ||
beast::WrappedSink sink_; | ||
beast::Journal const journal_; | ||
std::unique_ptr<stream_type> stream_ptr_; | ||
boost::asio::strand<boost::asio::executor> strand_; | ||
beast::IP::Endpoint const remote_address_; | ||
ProtocolVersion protocol_; | ||
PublicKey const publicKey_; | ||
Resource::Consumer usage_; | ||
std::shared_ptr<PeerFinder::Slot> const slot_; | ||
http_request_type request_; | ||
|
||
public: | ||
virtual ~InboundHandoff() override = default; | ||
|
||
intelliot marked this conversation as resolved.
Show resolved
Hide resolved
|
||
InboundHandoff( | ||
Application& app, | ||
id_t id, | ||
std::shared_ptr<PeerFinder::Slot> const& slot, | ||
http_request_type&& request, | ||
PublicKey const& publicKey, | ||
ProtocolVersion protocol, | ||
Resource::Consumer consumer, | ||
std::unique_ptr<stream_type>&& stream_ptr, | ||
OverlayImpl& overlay); | ||
|
||
// This class isn't meant to be copied | ||
InboundHandoff(InboundHandoff const&) = delete; | ||
InboundHandoff& | ||
operator=(InboundHandoff const&) = delete; | ||
|
||
/** Start the handshake */ | ||
void | ||
run(); | ||
/** Stop the child */ | ||
void | ||
stop() override; | ||
|
||
private: | ||
/** Send upgrade response to the client */ | ||
void | ||
sendResponse(); | ||
/** Instantiate and run the overlay peer */ | ||
void | ||
createPeer(); | ||
/** Log and close */ | ||
void | ||
fail(std::string const& name, error_code const& ec); | ||
/** Log and close */ | ||
void | ||
fail(std::string const& reason); | ||
/** Close connection */ | ||
void | ||
close(); | ||
/** Get underlying socket */ | ||
socket_type& | ||
socket() const; | ||
}; | ||
|
||
} // namespace ripple | ||
|
||
#endif // RIPPLE_OVERLAY_INBOUNDHANDOFF_H_INCLUDED |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the
getType()
function under thepublic
access specifier? This function is used only byMessage::compress()
function (which is also aprivate
function)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's
private
in the latestdevelop
branch.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I was referring to this feature branch. It is
public
in this branch, I'm wondering why it differs from the develop branch ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To your point - it should had not been
public
. So it got fixed a while back.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, thanks for the clarification.