Skip to content

Commit

Permalink
[ntuple] Add multi-threaded index building
Browse files Browse the repository at this point in the history
  • Loading branch information
enirolf committed Oct 14, 2024
1 parent 8be44ad commit 8cce410
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 3 deletions.
7 changes: 7 additions & 0 deletions tree/ntuple/v7/inc/ROOT/RNTupleIndex.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
#define ROOT7_RNTupleIndex

#include <ROOT/RField.hxx>
#ifdef R__USE_IMT
#include "ROOT/TThreadExecutor.hxx"
#endif

#include <memory>
#include <string>
Expand Down Expand Up @@ -126,6 +129,10 @@ private:
/// Only built indexes can be queried.
bool fIsBuilt = false;

#ifdef R__USE_IMT
std::unique_ptr<ROOT::TThreadExecutor> fPool;
#endif

/////////////////////////////////////////////////////////////////////////////
/// \brief Create an a new RNTupleIndex for the RNTuple represented by the provided page source.
///
Expand Down
27 changes: 24 additions & 3 deletions tree/ntuple/v7/src/RNTupleIndex.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@

#include <ROOT/RNTupleIndex.hxx>

#include <TROOT.h>
#ifdef R__USE_IMT
#include "ROOT/TThreadExecutor.hxx"
#endif

namespace {
ROOT::Experimental::Internal::RNTupleIndex::NTupleIndexValue_t
CastValuePtr(void *valuePtr, const ROOT::Experimental::RFieldBase &field)
Expand Down Expand Up @@ -118,9 +123,25 @@ void ROOT::Experimental::Internal::RNTupleIndex::Build()

fIndexPartitions.reserve(desc->GetNClusters());

for (const auto &cluster : desc->GetClusterIterable()) {
auto &indexPartition = fIndexPartitions.emplace_back(cluster, fIndexFields, *fPageSource);
indexPartition.Build();
if (ROOT::IsImplicitMTEnabled()) {
#ifdef R__USE_IMT
for (const auto &cluster : desc->GetClusterIterable()) {
fIndexPartitions.emplace_back(cluster, fIndexFields, *fPageSource);
}

auto fnBuildIndexPartition = [](RNTupleIndexPartition &indexPartition) -> void { indexPartition.Build(); };

if (!fPool)
fPool = std::make_unique<ROOT::TThreadExecutor>();
fPool->Foreach(fnBuildIndexPartition, fIndexPartitions);
#else
assert(false);
#endif
} else {
for (const auto &cluster : desc->GetClusterIterable()) {
auto &indexPartition = fIndexPartitions.emplace_back(cluster, fIndexFields, *fPageSource);
indexPartition.Build();
}
}

fIsBuilt = true;
Expand Down
48 changes: 48 additions & 0 deletions tree/ntuple/v7/test/ntuple_index.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,51 @@ TEST(RNTupleIndex, MultipleMatches)
entryIdxs = index->GetAllEntryNumbers<std::uint64_t>(4);
EXPECT_EQ(0, entryIdxs.size());
}

#ifdef R__USE_IMT
TEST(RNTupleIndex, IMT)
{
FileRaii fileGuard("test_ntuple_index_build_mt.root");
{
auto model = RNTupleModel::Create();
auto fldRun = model->MakeField<std::int16_t>("run");
auto fldEvent = model->MakeField<std::uint64_t>("event");
auto fldX = model->MakeField<float>("x");

auto ntuple = RNTupleWriter::Recreate(std::move(model), "ntuple", fileGuard.GetPath());

for (int i = 0; i < 5; ++i) {
*fldRun = i;
for (int j = 0; j < 15; ++j) {
*fldEvent = j;
*fldX = static_cast<float>(i + j) / 3.14;
ntuple->Fill();
}

ntuple->CommitCluster();
}
}

IMTRAII _;

auto pageSource = RPageSource::Create("ntuple", fileGuard.GetPath());
pageSource->Attach();
EXPECT_EQ(5UL, pageSource->GetSharedDescriptorGuard()->GetNClusters());

auto index = RNTupleIndex::Create({"run", "event"}, *pageSource);

EXPECT_EQ(5ULL * 15ULL, index->GetSize());

auto ntuple = RNTupleReader::Open("ntuple", fileGuard.GetPath());
auto fld = ntuple->GetView<float>("x");

std::int16_t run;
std::uint64_t event;
for (std::uint64_t i = 0; i < pageSource->GetNEntries(); ++i) {
run = i / 15;
event = i % 15;
auto entryIdx = index->GetFirstEntryNumber({&run, &event});
EXPECT_EQ(fld(entryIdx), fld(i));
}
}
#endif

0 comments on commit 8cce410

Please sign in to comment.