Skip to content

Commit

Permalink
Cleanup SHAMap and simplify interfaces:
Browse files Browse the repository at this point in the history
* Improve error reporting (more readable exception messages)
* Reduce function complexity (split oversized function to smaller pieces)
* Reduce code duplication
* Reduce buffer copying
  • Loading branch information
nbougalis committed Jun 26, 2020
1 parent e8f3525 commit 362a017
Show file tree
Hide file tree
Showing 13 changed files with 368 additions and 472 deletions.
18 changes: 4 additions & 14 deletions src/ripple/app/ledger/InboundLedger.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,23 +174,13 @@ class InboundLedger final : public PeerSet,

bool
takeHeader(std::string const& data);
bool
takeTxNode(
const std::vector<SHAMapNodeID>& IDs,
const std::vector<Blob>& data,
SHAMapAddNode&);

void
receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode&);

bool
takeTxRootNode(Slice const& data, SHAMapAddNode&);

// VFALCO TODO Rename to receiveAccountStateNode
// Don't use acronyms, but if we are going to use them at least
// capitalize them correctly.
//
bool
takeAsNode(
const std::vector<SHAMapNodeID>& IDs,
const std::vector<Blob>& data,
SHAMapAddNode&);
bool
takeAsRootNode(Slice const& data, SHAMapAddNode&);

Expand Down
7 changes: 1 addition & 6 deletions src/ripple/app/ledger/LedgerMaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,6 @@ class LedgerMaster : public Stoppable, public AbstractFetchPackContainer
void
setLedgerRangePresent(std::uint32_t minV, std::uint32_t maxV);

boost::optional<LedgerHash>
getLedgerHash(
std::uint32_t desiredSeq,
std::shared_ptr<ReadView const> const& knownGoodLedger);

boost::optional<NetClock::time_point>
getCloseTimeBySeq(LedgerIndex ledgerIndex);

Expand Down Expand Up @@ -264,7 +259,7 @@ class LedgerMaster : public Stoppable, public AbstractFetchPackContainer
gotFetchPack(bool progress, std::uint32_t seq);

void
addFetchPack(uint256 const& hash, std::shared_ptr<Blob>& data);
addFetchPack(uint256 const& hash, std::shared_ptr<Blob> data);

boost::optional<Blob>
getFetchPack(uint256 const& hash) override;
Expand Down
228 changes: 82 additions & 146 deletions src/ripple/app/ledger/impl/InboundLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ enum {
,
peerCountAdd = 2

// how many timeouts before we giveup
// how many timeouts before we give up
,
ledgerTimeoutRetriesMax = 10

Expand Down Expand Up @@ -876,149 +876,87 @@ InboundLedger::takeHeader(std::string const& data)
return true;
}

/** Process TX data received from a peer
/** Process node data received from a peer
Call with a lock
*/
bool
InboundLedger::takeTxNode(
const std::vector<SHAMapNodeID>& nodeIDs,
const std::vector<Blob>& data,
SHAMapAddNode& san)
void
InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
{
if (!mHaveHeader)
{
JLOG(m_journal.warn()) << "TX node without header";
JLOG(m_journal.warn()) << "Missing ledger header";
san.incInvalid();
return false;
}

if (mHaveTransactions || mFailed)
{
san.incDuplicate();
return true;
return;
}

auto nodeIDit = nodeIDs.cbegin();
auto nodeDatait = data.begin();
TransactionStateSF filter(
mLedger->txMap().family().db(), app_.getLedgerMaster());

while (nodeIDit != nodeIDs.cend())
if (packet.type() == protocol::liTX_NODE)
{
if (nodeIDit->isRoot())
{
san += mLedger->txMap().addRootNode(
SHAMapHash{mLedger->info().txHash},
makeSlice(*nodeDatait),
snfWIRE,
&filter);
if (!san.isGood())
return false;
}
else
if (mHaveTransactions || mFailed)
{
san += mLedger->txMap().addKnownNode(
*nodeIDit, makeSlice(*nodeDatait), &filter);
if (!san.isGood())
return false;
}

++nodeIDit;
++nodeDatait;
}

if (!mLedger->txMap().isSynching())
{
mHaveTransactions = true;

if (mHaveState)
{
mComplete = true;
done();
san.incDuplicate();
return;
}
}

return true;
}

/** Process AS data received from a peer
Call with a lock
*/
bool
InboundLedger::takeAsNode(
const std::vector<SHAMapNodeID>& nodeIDs,
const std::vector<Blob>& data,
SHAMapAddNode& san)
{
JLOG(m_journal.trace())
<< "got ASdata (" << nodeIDs.size() << ") acquiring ledger " << mHash;
if (nodeIDs.size() == 1)
{
JLOG(m_journal.trace()) << "got AS node: " << nodeIDs.front();
}

ScopedLockType sl(mLock);

if (!mHaveHeader)
{
JLOG(m_journal.warn()) << "Don't have ledger header";
san.incInvalid();
return false;
}

if (mHaveState || mFailed)
else if (mHaveState || mFailed)
{
san.incDuplicate();
return true;
return;
}

auto nodeIDit = nodeIDs.cbegin();
auto nodeDatait = data.begin();
AccountStateSF filter(
mLedger->stateMap().family().db(), app_.getLedgerMaster());

while (nodeIDit != nodeIDs.cend())
auto [map, filter] =
[&]() -> std::pair<SHAMap&, std::unique_ptr<SHAMapSyncFilter>> {
if (packet.type() == protocol::liTX_NODE)
return {
mLedger->txMap(),
std::make_unique<TransactionStateSF>(
mLedger->txMap().family().db(), app_.getLedgerMaster())};
return {
mLedger->stateMap(),
std::make_unique<AccountStateSF>(
mLedger->stateMap().family().db(), app_.getLedgerMaster())};
}();

try
{
if (nodeIDit->isRoot())
for (auto const& node : packet.nodes())
{
san += mLedger->stateMap().addRootNode(
SHAMapHash{mLedger->info().accountHash},
makeSlice(*nodeDatait),
snfWIRE,
&filter);
if (!san.isGood())
{
JLOG(m_journal.warn()) << "Bad ledger header";
return false;
}
}
else
{
san += mLedger->stateMap().addKnownNode(
*nodeIDit, makeSlice(*nodeDatait), &filter);
SHAMapNodeID const nodeID(
node.nodeid().data(), node.nodeid().size());
if (nodeID.isRoot())
san += map.addRootNode(
SHAMapHash{mLedger->info().accountHash},
makeSlice(node.nodedata()),
filter.get());
else
san += map.addKnownNode(
nodeID, makeSlice(node.nodedata()), filter.get());

if (!san.isGood())
{
JLOG(m_journal.warn()) << "Unable to add AS node";
return false;
JLOG(m_journal.warn()) << "Received bad node data";
return;
}
}

++nodeIDit;
++nodeDatait;
}
catch (std::exception const& e)
{
JLOG(m_journal.error()) << "Received bad node data: " << e.what();
san.incInvalid();
return;
}

if (!mLedger->stateMap().isSynching())
if (!map.isSynching())
{
mHaveState = true;
if (packet.type() == protocol::liTX_NODE)
mHaveTransactions = true;
else
mHaveState = true;

if (mHaveTransactions)
if (mHaveTransactions && mHaveState)
{
mComplete = true;
done();
}
}

return true;
}

/** Process AS root node received from a peer
Expand All @@ -1042,7 +980,7 @@ InboundLedger::takeAsRootNode(Slice const& data, SHAMapAddNode& san)
AccountStateSF filter(
mLedger->stateMap().family().db(), app_.getLedgerMaster());
san += mLedger->stateMap().addRootNode(
SHAMapHash{mLedger->info().accountHash}, data, snfWIRE, &filter);
SHAMapHash{mLedger->info().accountHash}, data, &filter);
return san.isGood();
}

Expand All @@ -1067,7 +1005,7 @@ InboundLedger::takeTxRootNode(Slice const& data, SHAMapAddNode& san)
TransactionStateSF filter(
mLedger->txMap().family().db(), app_.getLedgerMaster());
san += mLedger->txMap().addRootNode(
SHAMapHash{mLedger->info().txHash}, data, snfWIRE, &filter);
SHAMapHash{mLedger->info().txHash}, data, &filter);
return san.isGood();
}

Expand Down Expand Up @@ -1156,28 +1094,38 @@ InboundLedger::processData(

SHAMapAddNode san;

if (!mHaveHeader)
try
{
if (takeHeader(packet.nodes(0).nodedata()))
if (!mHaveHeader)
{
if (!takeHeader(packet.nodes(0).nodedata()))
{
JLOG(m_journal.warn()) << "Got invalid header data";
peer->charge(Resource::feeInvalidRequest);
return -1;
}

san.incUseful();
else
}

if (!mHaveState && (packet.nodes().size() > 1) &&
!takeAsRootNode(makeSlice(packet.nodes(1).nodedata()), san))
{
JLOG(m_journal.warn()) << "Got invalid header data";
peer->charge(Resource::feeInvalidRequest);
return -1;
JLOG(m_journal.warn()) << "Included AS root invalid";
}
}

if (!mHaveState && (packet.nodes().size() > 1) &&
!takeAsRootNode(makeSlice(packet.nodes(1).nodedata()), san))
{
JLOG(m_journal.warn()) << "Included AS root invalid";
if (!mHaveTransactions && (packet.nodes().size() > 2) &&
!takeTxRootNode(makeSlice(packet.nodes(2).nodedata()), san))
{
JLOG(m_journal.warn()) << "Included TX root invalid";
}
}

if (!mHaveTransactions && (packet.nodes().size() > 2) &&
!takeTxRootNode(makeSlice(packet.nodes(2).nodedata()), san))
catch (std::exception const& ex)
{
JLOG(m_journal.warn()) << "Included TX root invalid";
JLOG(m_journal.warn())
<< "Included AS/TX root invalid: " << ex.what();
peer->charge(Resource::feeBadData);
return -1;
}

if (san.isUseful())
Expand All @@ -1197,38 +1145,26 @@ InboundLedger::processData(
return -1;
}

std::vector<SHAMapNodeID> nodeIDs;
nodeIDs.reserve(packet.nodes().size());
std::vector<Blob> nodeData;
nodeData.reserve(packet.nodes().size());

for (int i = 0; i < packet.nodes().size(); ++i)
// Verify node IDs and data are complete
for (auto const& node : packet.nodes())
{
const protocol::TMLedgerNode& node = packet.nodes(i);

if (!node.has_nodeid() || !node.has_nodedata())
{
JLOG(m_journal.warn()) << "Got bad node";
peer->charge(Resource::feeInvalidRequest);
return -1;
}

nodeIDs.push_back(
SHAMapNodeID(node.nodeid().data(), node.nodeid().size()));
nodeData.push_back(
Blob(node.nodedata().begin(), node.nodedata().end()));
}

SHAMapAddNode san;
receiveNode(packet, san);

if (packet.type() == protocol::liTX_NODE)
{
takeTxNode(nodeIDs, nodeData, san);
JLOG(m_journal.debug()) << "Ledger TX node stats: " << san.get();
}
else
{
takeAsNode(nodeIDs, nodeData, san);
JLOG(m_journal.debug()) << "Ledger AS node stats: " << san.get();
}

Expand Down
Loading

0 comments on commit 362a017

Please sign in to comment.