-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ntuple] Add MT index building #16679
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,9 @@ | |
#define ROOT7_RNTupleIndex | ||
|
||
#include <ROOT/RField.hxx> | ||
#ifdef R__USE_IMT | ||
#include "ROOT/TThreadExecutor.hxx" | ||
#endif | ||
|
||
#include <memory> | ||
#include <string> | ||
|
@@ -43,12 +46,17 @@ private: | |
class RIndexValue { | ||
public: | ||
std::vector<NTupleIndexValue_t> fFieldValues; | ||
|
||
RIndexValue() = default; | ||
RIndexValue(const std::vector<NTupleIndexValue_t> &fieldValues) | ||
{ | ||
fFieldValues.reserve(fieldValues.size()); | ||
fFieldValues = fieldValues; | ||
} | ||
inline bool operator==(const RIndexValue &other) const { return other.fFieldValues == fFieldValues; } | ||
|
||
inline bool operator==(const RIndexValue &other) const { return fFieldValues == other.fFieldValues; } | ||
inline bool operator<(const RIndexValue &other) const { return fFieldValues < other.fFieldValues; } | ||
inline bool operator>(const RIndexValue &other) const { return fFieldValues > other.fFieldValues; } | ||
}; | ||
|
||
///////////////////////////////////////////////////////////////////////////// | ||
|
@@ -66,9 +74,51 @@ private: | |
} | ||
}; | ||
|
||
/// The index itself. Maps field values (or combinations thereof in case the index is defined for multiple fields) to | ||
/// their respsective entry numbers. | ||
std::unordered_map<RIndexValue, std::vector<NTupleSize_t>, RIndexValueHash> fIndex; | ||
///////////////////////////////////////////////////////////////////////////// | ||
/// Index for a specific entry range of the full indexed RNTuple | ||
class RNTupleIndexPartition { | ||
friend RNTupleIndex; | ||
|
||
private: | ||
NTupleSize_t fFirstEntry, fLastEntry; | ||
|
||
std::unique_ptr<RPageSource> fPageSource; | ||
std::vector<std::unique_ptr<RFieldBase>> fIndexFields; | ||
|
||
/// The index itself. Maps field values (or combinations thereof in case the index is defined for multiple fields) | ||
/// to their respsective entry numbers. | ||
std::unordered_map<RIndexValue, std::vector<NTupleSize_t>, RIndexValueHash> fIndex; | ||
|
||
public: | ||
RNTupleIndexPartition(const RClusterDescriptor &descriptor, | ||
const std::vector<std::unique_ptr<RFieldBase>> &indexFields, const RPageSource &pageSource) | ||
: fPageSource(pageSource.Clone()) | ||
{ | ||
fPageSource->Attach(); | ||
|
||
fFirstEntry = descriptor.GetFirstEntryIndex(); | ||
fLastEntry = fFirstEntry + descriptor.GetNEntries(); | ||
|
||
fIndexFields.reserve(indexFields.size()); | ||
for (const auto &field : indexFields) { | ||
auto clonedField = field->Clone(field->GetFieldName()); | ||
CallConnectPageSourceOnField(*clonedField, *fPageSource); | ||
Comment on lines
+104
to
+105
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now we need to clone and connect page sources and fields for every index partition / cluster, which means we probably cannot use cluster prefetching... |
||
fIndexFields.push_back(std::move(clonedField)); | ||
} | ||
} | ||
|
||
RNTupleIndexPartition(const RNTupleIndexPartition &) = delete; | ||
RNTupleIndexPartition(RNTupleIndexPartition &&) = default; | ||
RNTupleIndexPartition &operator=(const RNTupleIndexPartition &) = delete; | ||
RNTupleIndexPartition &operator=(RNTupleIndexPartition &&) = default; | ||
~RNTupleIndexPartition() = default; | ||
|
||
void Build(); | ||
}; | ||
|
||
/// The partioned indices, which together provide the full index for the RNTuple. Currently, partitions are made on | ||
/// cluster boundaries. | ||
std::vector<RNTupleIndexPartition> fIndexPartitions; | ||
|
||
/// The page source belonging to the RNTuple for which to build the index. | ||
std::unique_ptr<RPageSource> fPageSource; | ||
|
@@ -79,6 +129,10 @@ private: | |
/// Only built indexes can be queried. | ||
bool fIsBuilt = false; | ||
|
||
#ifdef R__USE_IMT | ||
std::unique_ptr<ROOT::TThreadExecutor> fPool; | ||
#endif | ||
Comment on lines
+132
to
+134
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this member required? AFAICT it's only used once during |
||
|
||
///////////////////////////////////////////////////////////////////////////// | ||
/// \brief Create an a new RNTupleIndex for the RNTuple represented by the provided page source. | ||
/// | ||
|
@@ -129,7 +183,12 @@ public: | |
std::size_t GetSize() const | ||
{ | ||
EnsureBuilt(); | ||
return fIndex.size(); | ||
|
||
auto fnAddSizes = [](std::size_t acc, const RNTupleIndexPartition &indexPartition) -> std::size_t { | ||
return acc + indexPartition.fIndex.size(); | ||
}; | ||
|
||
return std::accumulate(fIndexPartitions.cbegin(), fIndexPartitions.cend(), 0, fnAddSizes); | ||
} | ||
|
||
///////////////////////////////////////////////////////////////////////////// | ||
|
@@ -176,14 +235,14 @@ public: | |
/// | ||
/// \return The entry numbers that corresponds to `valuePtrs`. When no such entry exists, an empty vector is | ||
/// returned. | ||
const std::vector<NTupleSize_t> *GetAllEntryNumbers(const std::vector<void *> &valuePtrs) const; | ||
const std::vector<NTupleSize_t> GetAllEntryNumbers(const std::vector<void *> &valuePtrs) const; | ||
Comment on lines
-179
to
+238
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that this interface forces the allocation of a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Additionally, returning a |
||
|
||
///////////////////////////////////////////////////////////////////////////// | ||
/// \brief Get all entry numbers for the given index. | ||
/// | ||
/// \sa GetAllEntryNumbers(std::vector<void *> valuePtrs) | ||
template <typename... Ts> | ||
const std::vector<NTupleSize_t> *GetAllEntryNumbers(Ts... values) const | ||
const std::vector<NTupleSize_t> GetAllEntryNumbers(Ts... values) const | ||
{ | ||
if (sizeof...(Ts) != fIndexFields.size()) | ||
throw RException(R__FAIL("Number of values must match number of indexed fields.")); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,11 @@ | |
|
||
#include <ROOT/RNTupleIndex.hxx> | ||
|
||
#include <TROOT.h> | ||
#ifdef R__USE_IMT | ||
#include "ROOT/TThreadExecutor.hxx" | ||
#endif | ||
|
||
namespace { | ||
ROOT::Experimental::Internal::RNTupleIndex::NTupleIndexValue_t | ||
CastValuePtr(void *valuePtr, const ROOT::Experimental::RFieldBase &field) | ||
|
@@ -33,6 +38,34 @@ CastValuePtr(void *valuePtr, const ROOT::Experimental::RFieldBase &field) | |
} | ||
} // anonymous namespace | ||
|
||
void ROOT::Experimental::Internal::RNTupleIndex::RNTupleIndexPartition::Build() | ||
{ | ||
std::vector<RFieldBase::RValue> fieldValues; | ||
fieldValues.reserve(fIndexFields.size()); | ||
for (const auto &field : fIndexFields) { | ||
fieldValues.emplace_back(field->CreateValue()); | ||
} | ||
|
||
std::vector<NTupleIndexValue_t> indexFieldValues; | ||
indexFieldValues.reserve(fieldValues.size()); | ||
|
||
for (unsigned i = fFirstEntry; i < fLastEntry; ++i) { | ||
indexFieldValues.clear(); | ||
for (auto &fieldValue : fieldValues) { | ||
// TODO(fdegeus): use bulk reading | ||
fieldValue.Read(i); | ||
|
||
auto valuePtr = fieldValue.GetPtr<void>(); | ||
indexFieldValues.push_back(CastValuePtr(valuePtr.get(), fieldValue.GetField())); | ||
} | ||
|
||
RIndexValue indexValue(indexFieldValues); | ||
fIndex[indexValue].push_back(i); | ||
} | ||
} | ||
|
||
//------------------------------------------------------------------------------ | ||
|
||
ROOT::Experimental::Internal::RNTupleIndex::RNTupleIndex(const std::vector<std::string> &fieldNames, | ||
const RPageSource &pageSource) | ||
: fPageSource(pageSource.Clone()) | ||
|
@@ -42,6 +75,10 @@ ROOT::Experimental::Internal::RNTupleIndex::RNTupleIndex(const std::vector<std:: | |
|
||
fIndexFields.reserve(fieldNames.size()); | ||
|
||
static const std::unordered_set<std::string> allowedTypes = {"std::int8_t", "std::int16_t", "std::int32_t", | ||
"std::int64_t", "std::uint8_t", "std::uint16_t", | ||
"std::uint32_t", "std::uint64_t"}; | ||
|
||
for (const auto &fieldName : fieldNames) { | ||
auto fieldId = desc->FindFieldId(fieldName); | ||
if (fieldId == kInvalidDescriptorId) | ||
|
@@ -50,7 +87,10 @@ ROOT::Experimental::Internal::RNTupleIndex::RNTupleIndex(const std::vector<std:: | |
const auto &fieldDesc = desc->GetFieldDescriptor(fieldId); | ||
auto field = fieldDesc.CreateField(desc.GetRef()); | ||
|
||
CallConnectPageSourceOnField(*field, *fPageSource); | ||
if (allowedTypes.find(field->GetTypeName()) == allowedTypes.end()) { | ||
throw RException(R__FAIL("Cannot use field \"" + field->GetFieldName() + "\" with type \"" + | ||
field->GetTypeName() + "\" for indexing. Only integral types are allowed.")); | ||
} | ||
|
||
fIndexFields.push_back(std::move(field)); | ||
} | ||
|
@@ -79,34 +119,29 @@ void ROOT::Experimental::Internal::RNTupleIndex::Build() | |
if (fIsBuilt) | ||
return; | ||
|
||
static const std::unordered_set<std::string> allowedTypes = {"std::int8_t", "std::int16_t", "std::int32_t", | ||
"std::int64_t", "std::uint8_t", "std::uint16_t", | ||
"std::uint32_t", "std::uint64_t"}; | ||
auto desc = fPageSource->GetSharedDescriptorGuard(); | ||
|
||
std::vector<RFieldBase::RValue> fieldValues; | ||
fieldValues.reserve(fIndexFields.size()); | ||
fIndexPartitions.reserve(desc->GetNClusters()); | ||
|
||
for (const auto &field : fIndexFields) { | ||
if (allowedTypes.find(field->GetTypeName()) == allowedTypes.end()) { | ||
throw RException(R__FAIL("Cannot use field \"" + field->GetFieldName() + "\" with type \"" + | ||
field->GetTypeName() + "\" for indexing. Only integral types are allowed.")); | ||
if (ROOT::IsImplicitMTEnabled()) { | ||
#ifdef R__USE_IMT | ||
for (const auto &cluster : desc->GetClusterIterable()) { | ||
fIndexPartitions.emplace_back(cluster, fIndexFields, *fPageSource); | ||
} | ||
fieldValues.emplace_back(field->CreateValue()); | ||
} | ||
|
||
std::vector<NTupleIndexValue_t> indexValues; | ||
indexValues.reserve(fIndexFields.size()); | ||
|
||
for (unsigned i = 0; i < fPageSource->GetNEntries(); ++i) { | ||
indexValues.clear(); | ||
for (auto &fieldValue : fieldValues) { | ||
// TODO(fdegeus): use bulk reading | ||
fieldValue.Read(i); | ||
|
||
auto valuePtr = fieldValue.GetPtr<void>(); | ||
indexValues.push_back(CastValuePtr(valuePtr.get(), fieldValue.GetField())); | ||
auto fnBuildIndexPartition = [](RNTupleIndexPartition &indexPartition) -> void { indexPartition.Build(); }; | ||
|
||
if (!fPool) | ||
fPool = std::make_unique<ROOT::TThreadExecutor>(); | ||
fPool->Foreach(fnBuildIndexPartition, fIndexPartitions); | ||
#else | ||
assert(false); | ||
#endif | ||
} else { | ||
for (const auto &cluster : desc->GetClusterIterable()) { | ||
auto &indexPartition = fIndexPartitions.emplace_back(cluster, fIndexFields, *fPageSource); | ||
indexPartition.Build(); | ||
} | ||
fIndex[RIndexValue(indexValues)].push_back(i); | ||
} | ||
|
||
fIsBuilt = true; | ||
|
@@ -115,31 +150,45 @@ void ROOT::Experimental::Internal::RNTupleIndex::Build() | |
ROOT::Experimental::NTupleSize_t | ||
ROOT::Experimental::Internal::RNTupleIndex::GetFirstEntryNumber(const std::vector<void *> &valuePtrs) const | ||
{ | ||
const auto entryIndices = GetAllEntryNumbers(valuePtrs); | ||
if (!entryIndices) | ||
const auto entryNumbers = GetAllEntryNumbers(valuePtrs); | ||
if (entryNumbers.empty()) | ||
return kInvalidNTupleIndex; | ||
return entryIndices->front(); | ||
return entryNumbers.front(); | ||
} | ||
|
||
const std::vector<ROOT::Experimental::NTupleSize_t> * | ||
const std::vector<ROOT::Experimental::NTupleSize_t> | ||
ROOT::Experimental::Internal::RNTupleIndex::GetAllEntryNumbers(const std::vector<void *> &valuePtrs) const | ||
{ | ||
if (valuePtrs.size() != fIndexFields.size()) | ||
throw RException(R__FAIL("Number of value pointers must match number of indexed fields.")); | ||
|
||
EnsureBuilt(); | ||
|
||
std::vector<NTupleIndexValue_t> indexValues; | ||
indexValues.reserve(fIndexFields.size()); | ||
std::vector<std::vector<NTupleIndexValue_t>> entryNumbersPerCluster; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This implementation uses two vectors ( |
||
|
||
std::vector<NTupleIndexValue_t> indexFieldValues; | ||
indexFieldValues.reserve(fIndexFields.size()); | ||
|
||
for (unsigned i = 0; i < valuePtrs.size(); ++i) { | ||
indexValues.push_back(CastValuePtr(valuePtrs[i], *fIndexFields[i])); | ||
indexFieldValues.push_back(CastValuePtr(valuePtrs[i], *fIndexFields[i])); | ||
} | ||
|
||
auto entryNumber = fIndex.find(RIndexValue(indexValues)); | ||
RIndexValue indexValue(indexFieldValues); | ||
|
||
if (entryNumber == fIndex.end()) | ||
return nullptr; | ||
for (const auto &indexPartition : fIndexPartitions) { | ||
auto clusterEntryNumbers = indexPartition.fIndex.find(indexValue); | ||
|
||
if (clusterEntryNumbers == indexPartition.fIndex.end()) | ||
continue; | ||
|
||
entryNumbersPerCluster.push_back(clusterEntryNumbers->second); | ||
} | ||
Comment on lines
+178
to
+185
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that this introduces linear complexity in the number of clusters: while |
||
|
||
std::vector<NTupleIndexValue_t> entryNumbers; | ||
|
||
for (const auto &clusterEntries : entryNumbersPerCluster) { | ||
entryNumbers.insert(entryNumbers.end(), clusterEntries.cbegin(), clusterEntries.cend()); | ||
} | ||
|
||
return &(entryNumber->second); | ||
return entryNumbers; | ||
Comment on lines
-144
to
+193
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (side note: it might be possible to get around returning vectors by value. For example, after building the partitions the |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this constructor need inlining? If not I think it should be defined in the source file