Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ntuple] Add MT index building #16679

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 66 additions & 7 deletions tree/ntuple/v7/inc/ROOT/RNTupleIndex.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
#define ROOT7_RNTupleIndex

#include <ROOT/RField.hxx>
#ifdef R__USE_IMT
#include "ROOT/TThreadExecutor.hxx"
#endif

#include <memory>
#include <string>
Expand All @@ -43,12 +46,17 @@ private:
class RIndexValue {
public:
std::vector<NTupleIndexValue_t> fFieldValues;

RIndexValue() = default;
RIndexValue(const std::vector<NTupleIndexValue_t> &fieldValues)
{
fFieldValues.reserve(fieldValues.size());
fFieldValues = fieldValues;
}
inline bool operator==(const RIndexValue &other) const { return other.fFieldValues == fFieldValues; }

inline bool operator==(const RIndexValue &other) const { return fFieldValues == other.fFieldValues; }
inline bool operator<(const RIndexValue &other) const { return fFieldValues < other.fFieldValues; }
inline bool operator>(const RIndexValue &other) const { return fFieldValues > other.fFieldValues; }
};

/////////////////////////////////////////////////////////////////////////////
Expand All @@ -66,9 +74,51 @@ private:
}
};

/// The index itself. Maps field values (or combinations thereof in case the index is defined for multiple fields) to
/// their respsective entry numbers.
std::unordered_map<RIndexValue, std::vector<NTupleSize_t>, RIndexValueHash> fIndex;
/////////////////////////////////////////////////////////////////////////////
/// Index for a specific entry range of the full indexed RNTuple
class RNTupleIndexPartition {
friend RNTupleIndex;

private:
NTupleSize_t fFirstEntry, fLastEntry;

std::unique_ptr<RPageSource> fPageSource;
std::vector<std::unique_ptr<RFieldBase>> fIndexFields;

/// The index itself. Maps field values (or combinations thereof in case the index is defined for multiple fields)
/// to their respsective entry numbers.
std::unordered_map<RIndexValue, std::vector<NTupleSize_t>, RIndexValueHash> fIndex;

public:
RNTupleIndexPartition(const RClusterDescriptor &descriptor,
const std::vector<std::unique_ptr<RFieldBase>> &indexFields, const RPageSource &pageSource)
: fPageSource(pageSource.Clone())
Comment on lines +93 to +95
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this constructor need inlining? If not I think it should be defined in the source file

{
fPageSource->Attach();

fFirstEntry = descriptor.GetFirstEntryIndex();
fLastEntry = fFirstEntry + descriptor.GetNEntries();

fIndexFields.reserve(indexFields.size());
for (const auto &field : indexFields) {
auto clonedField = field->Clone(field->GetFieldName());
CallConnectPageSourceOnField(*clonedField, *fPageSource);
Comment on lines +104 to +105
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we need to clone and connect page sources and fields for every index partition / cluster, which means we probably cannot use cluster prefetching...

fIndexFields.push_back(std::move(clonedField));
}
}

RNTupleIndexPartition(const RNTupleIndexPartition &) = delete;
RNTupleIndexPartition(RNTupleIndexPartition &&) = default;
RNTupleIndexPartition &operator=(const RNTupleIndexPartition &) = delete;
RNTupleIndexPartition &operator=(RNTupleIndexPartition &&) = default;
~RNTupleIndexPartition() = default;

void Build();
};

/// The partioned indices, which together provide the full index for the RNTuple. Currently, partitions are made on
/// cluster boundaries.
std::vector<RNTupleIndexPartition> fIndexPartitions;

/// The page source belonging to the RNTuple for which to build the index.
std::unique_ptr<RPageSource> fPageSource;
Expand All @@ -79,6 +129,10 @@ private:
/// Only built indexes can be queried.
bool fIsBuilt = false;

#ifdef R__USE_IMT
std::unique_ptr<ROOT::TThreadExecutor> fPool;
#endif
Comment on lines +132 to +134
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this member required? AFAICT it's only used once during Build(), so it might be constructed and destructed locally


/////////////////////////////////////////////////////////////////////////////
/// \brief Create an a new RNTupleIndex for the RNTuple represented by the provided page source.
///
Expand Down Expand Up @@ -129,7 +183,12 @@ public:
std::size_t GetSize() const
{
EnsureBuilt();
return fIndex.size();

auto fnAddSizes = [](std::size_t acc, const RNTupleIndexPartition &indexPartition) -> std::size_t {
return acc + indexPartition.fIndex.size();
};

return std::accumulate(fIndexPartitions.cbegin(), fIndexPartitions.cend(), 0, fnAddSizes);
}

/////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -176,14 +235,14 @@ public:
///
/// \return The entry numbers that corresponds to `valuePtrs`. When no such entry exists, an empty vector is
/// returned.
const std::vector<NTupleSize_t> *GetAllEntryNumbers(const std::vector<void *> &valuePtrs) const;
const std::vector<NTupleSize_t> GetAllEntryNumbers(const std::vector<void *> &valuePtrs) const;
Comment on lines -179 to +238
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this interface forces the allocation of a std::vector including its heap-backed data for every probe

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, returning a const value is generally not what you want (as it is my understanding it may prevent move semantics/rvalue reference semantics, and it doesn't really give any benefit to the caller).
To avoid forcing the allocation on the caller while giving them ownership of the data it's probably better to pass the out vector as a reference parameter (so they may reuse it across calls etc).


/////////////////////////////////////////////////////////////////////////////
/// \brief Get all entry numbers for the given index.
///
/// \sa GetAllEntryNumbers(std::vector<void *> valuePtrs)
template <typename... Ts>
const std::vector<NTupleSize_t> *GetAllEntryNumbers(Ts... values) const
const std::vector<NTupleSize_t> GetAllEntryNumbers(Ts... values) const
{
if (sizeof...(Ts) != fIndexFields.size())
throw RException(R__FAIL("Number of values must match number of indexed fields."));
Expand Down
119 changes: 84 additions & 35 deletions tree/ntuple/v7/src/RNTupleIndex.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@

#include <ROOT/RNTupleIndex.hxx>

#include <TROOT.h>
#ifdef R__USE_IMT
#include "ROOT/TThreadExecutor.hxx"
#endif

namespace {
ROOT::Experimental::Internal::RNTupleIndex::NTupleIndexValue_t
CastValuePtr(void *valuePtr, const ROOT::Experimental::RFieldBase &field)
Expand All @@ -33,6 +38,34 @@ CastValuePtr(void *valuePtr, const ROOT::Experimental::RFieldBase &field)
}
} // anonymous namespace

void ROOT::Experimental::Internal::RNTupleIndex::RNTupleIndexPartition::Build()
{
std::vector<RFieldBase::RValue> fieldValues;
fieldValues.reserve(fIndexFields.size());
for (const auto &field : fIndexFields) {
fieldValues.emplace_back(field->CreateValue());
}

std::vector<NTupleIndexValue_t> indexFieldValues;
indexFieldValues.reserve(fieldValues.size());

for (unsigned i = fFirstEntry; i < fLastEntry; ++i) {
indexFieldValues.clear();
for (auto &fieldValue : fieldValues) {
// TODO(fdegeus): use bulk reading
fieldValue.Read(i);

auto valuePtr = fieldValue.GetPtr<void>();
indexFieldValues.push_back(CastValuePtr(valuePtr.get(), fieldValue.GetField()));
}

RIndexValue indexValue(indexFieldValues);
fIndex[indexValue].push_back(i);
}
}

//------------------------------------------------------------------------------

ROOT::Experimental::Internal::RNTupleIndex::RNTupleIndex(const std::vector<std::string> &fieldNames,
const RPageSource &pageSource)
: fPageSource(pageSource.Clone())
Expand All @@ -42,6 +75,10 @@ ROOT::Experimental::Internal::RNTupleIndex::RNTupleIndex(const std::vector<std::

fIndexFields.reserve(fieldNames.size());

static const std::unordered_set<std::string> allowedTypes = {"std::int8_t", "std::int16_t", "std::int32_t",
"std::int64_t", "std::uint8_t", "std::uint16_t",
"std::uint32_t", "std::uint64_t"};

for (const auto &fieldName : fieldNames) {
auto fieldId = desc->FindFieldId(fieldName);
if (fieldId == kInvalidDescriptorId)
Expand All @@ -50,7 +87,10 @@ ROOT::Experimental::Internal::RNTupleIndex::RNTupleIndex(const std::vector<std::
const auto &fieldDesc = desc->GetFieldDescriptor(fieldId);
auto field = fieldDesc.CreateField(desc.GetRef());

CallConnectPageSourceOnField(*field, *fPageSource);
if (allowedTypes.find(field->GetTypeName()) == allowedTypes.end()) {
throw RException(R__FAIL("Cannot use field \"" + field->GetFieldName() + "\" with type \"" +
field->GetTypeName() + "\" for indexing. Only integral types are allowed."));
}

fIndexFields.push_back(std::move(field));
}
Expand Down Expand Up @@ -79,34 +119,29 @@ void ROOT::Experimental::Internal::RNTupleIndex::Build()
if (fIsBuilt)
return;

static const std::unordered_set<std::string> allowedTypes = {"std::int8_t", "std::int16_t", "std::int32_t",
"std::int64_t", "std::uint8_t", "std::uint16_t",
"std::uint32_t", "std::uint64_t"};
auto desc = fPageSource->GetSharedDescriptorGuard();

std::vector<RFieldBase::RValue> fieldValues;
fieldValues.reserve(fIndexFields.size());
fIndexPartitions.reserve(desc->GetNClusters());

for (const auto &field : fIndexFields) {
if (allowedTypes.find(field->GetTypeName()) == allowedTypes.end()) {
throw RException(R__FAIL("Cannot use field \"" + field->GetFieldName() + "\" with type \"" +
field->GetTypeName() + "\" for indexing. Only integral types are allowed."));
if (ROOT::IsImplicitMTEnabled()) {
#ifdef R__USE_IMT
for (const auto &cluster : desc->GetClusterIterable()) {
fIndexPartitions.emplace_back(cluster, fIndexFields, *fPageSource);
}
fieldValues.emplace_back(field->CreateValue());
}

std::vector<NTupleIndexValue_t> indexValues;
indexValues.reserve(fIndexFields.size());

for (unsigned i = 0; i < fPageSource->GetNEntries(); ++i) {
indexValues.clear();
for (auto &fieldValue : fieldValues) {
// TODO(fdegeus): use bulk reading
fieldValue.Read(i);

auto valuePtr = fieldValue.GetPtr<void>();
indexValues.push_back(CastValuePtr(valuePtr.get(), fieldValue.GetField()));
auto fnBuildIndexPartition = [](RNTupleIndexPartition &indexPartition) -> void { indexPartition.Build(); };

if (!fPool)
fPool = std::make_unique<ROOT::TThreadExecutor>();
fPool->Foreach(fnBuildIndexPartition, fIndexPartitions);
#else
assert(false);
#endif
} else {
for (const auto &cluster : desc->GetClusterIterable()) {
auto &indexPartition = fIndexPartitions.emplace_back(cluster, fIndexFields, *fPageSource);
indexPartition.Build();
}
fIndex[RIndexValue(indexValues)].push_back(i);
}

fIsBuilt = true;
Expand All @@ -115,31 +150,45 @@ void ROOT::Experimental::Internal::RNTupleIndex::Build()
ROOT::Experimental::NTupleSize_t
ROOT::Experimental::Internal::RNTupleIndex::GetFirstEntryNumber(const std::vector<void *> &valuePtrs) const
{
const auto entryIndices = GetAllEntryNumbers(valuePtrs);
if (!entryIndices)
const auto entryNumbers = GetAllEntryNumbers(valuePtrs);
if (entryNumbers.empty())
return kInvalidNTupleIndex;
return entryIndices->front();
return entryNumbers.front();
}

const std::vector<ROOT::Experimental::NTupleSize_t> *
const std::vector<ROOT::Experimental::NTupleSize_t>
ROOT::Experimental::Internal::RNTupleIndex::GetAllEntryNumbers(const std::vector<void *> &valuePtrs) const
{
if (valuePtrs.size() != fIndexFields.size())
throw RException(R__FAIL("Number of value pointers must match number of indexed fields."));

EnsureBuilt();

std::vector<NTupleIndexValue_t> indexValues;
indexValues.reserve(fIndexFields.size());
std::vector<std::vector<NTupleIndexValue_t>> entryNumbersPerCluster;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation uses two vectors (entryNumbersPerCluster and entryNumbers) which require two heap allocations. Would it be possible to fill entryNumbers directly?


std::vector<NTupleIndexValue_t> indexFieldValues;
indexFieldValues.reserve(fIndexFields.size());

for (unsigned i = 0; i < valuePtrs.size(); ++i) {
indexValues.push_back(CastValuePtr(valuePtrs[i], *fIndexFields[i]));
indexFieldValues.push_back(CastValuePtr(valuePtrs[i], *fIndexFields[i]));
}

auto entryNumber = fIndex.find(RIndexValue(indexValues));
RIndexValue indexValue(indexFieldValues);

if (entryNumber == fIndex.end())
return nullptr;
for (const auto &indexPartition : fIndexPartitions) {
auto clusterEntryNumbers = indexPartition.fIndex.find(indexValue);

if (clusterEntryNumbers == indexPartition.fIndex.end())
continue;

entryNumbersPerCluster.push_back(clusterEntryNumbers->second);
}
Comment on lines +178 to +185
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this introduces linear complexity in the number of clusters: while fIndex.find() has constant complexity, now there are separate hash maps per cluster. In the end, this means linear complexity in the number of entries (with a small coefficient). Maybe this is fine, but I think this means worse probe performance because of multi-threaded building...


std::vector<NTupleIndexValue_t> entryNumbers;

for (const auto &clusterEntries : entryNumbersPerCluster) {
entryNumbers.insert(entryNumbers.end(), clusterEntries.cbegin(), clusterEntries.cend());
}

return &(entryNumber->second);
return entryNumbers;
Comment on lines -144 to +193
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(side note: it might be possible to get around returning vectors by value. For example, after building the partitions the RNTupleIndex could "link" collisions across partitions. Then we could return a "linked list of vectors", which would behave like a container with a custom iterator.)

}
56 changes: 52 additions & 4 deletions tree/ntuple/v7/test/ntuple_index.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,61 @@ TEST(RNTupleIndex, MultipleMatches)

auto entryIdxs = index->GetAllEntryNumbers<std::uint64_t>(1);
auto expected = std::vector<std::uint64_t>{0, 1, 2, 3, 4};
EXPECT_EQ(expected, *entryIdxs);
EXPECT_EQ(expected, entryIdxs);
entryIdxs = index->GetAllEntryNumbers<std::uint64_t>(2);
expected = {5, 6, 7};
EXPECT_EQ(expected, *entryIdxs);
EXPECT_EQ(expected, entryIdxs);
entryIdxs = index->GetAllEntryNumbers<std::uint64_t>(3);
expected = {8, 9};
EXPECT_EQ(expected, *entryIdxs);
EXPECT_EQ(expected, entryIdxs);
entryIdxs = index->GetAllEntryNumbers<std::uint64_t>(4);
EXPECT_EQ(nullptr, entryIdxs);
EXPECT_EQ(0, entryIdxs.size());
}

#ifdef R__USE_IMT
TEST(RNTupleIndex, IMT)
{
FileRaii fileGuard("test_ntuple_index_build_mt.root");
{
auto model = RNTupleModel::Create();
auto fldRun = model->MakeField<std::int16_t>("run");
auto fldEvent = model->MakeField<std::uint64_t>("event");
auto fldX = model->MakeField<float>("x");

auto ntuple = RNTupleWriter::Recreate(std::move(model), "ntuple", fileGuard.GetPath());

for (int i = 0; i < 5; ++i) {
*fldRun = i;
for (int j = 0; j < 15; ++j) {
*fldEvent = j;
*fldX = static_cast<float>(i + j) / 3.14;
ntuple->Fill();
}

ntuple->CommitCluster();
}
}

IMTRAII _;

auto pageSource = RPageSource::Create("ntuple", fileGuard.GetPath());
pageSource->Attach();
EXPECT_EQ(5UL, pageSource->GetSharedDescriptorGuard()->GetNClusters());

auto index = RNTupleIndex::Create({"run", "event"}, *pageSource);

EXPECT_EQ(5ULL * 15ULL, index->GetSize());

auto ntuple = RNTupleReader::Open("ntuple", fileGuard.GetPath());
auto fld = ntuple->GetView<float>("x");

std::int16_t run;
std::uint64_t event;
for (std::uint64_t i = 0; i < pageSource->GetNEntries(); ++i) {
run = i / 15;
event = i % 15;
auto entryIdx = index->GetFirstEntryNumber({&run, &event});
EXPECT_EQ(fld(entryIdx), fld(i));
}
}
#endif
Loading