Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions tree/treeplayer/inc/ROOT/TDFActionHelpers.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public:
using BranchTypes_t = typename TRemoveFirst<typename TFunctionTraits<F>::Args_t>::Types_t;
ForeachSlotHelper(F &&f) : fCallable(f) {}

void Init(TTreeReader*, unsigned int) {}

template <typename... Args>
void Exec(unsigned int slot, Args &&... args)
{
Expand All @@ -56,6 +58,7 @@ class CountHelper {
public:
using BranchTypes_t = TTypeList<>;
CountHelper(const std::shared_ptr<unsigned int> &resultCount, unsigned int nSlots);
void Init(TTreeReader*, unsigned int) {}
void Exec(unsigned int slot);
void Finalize();
};
Expand All @@ -78,6 +81,7 @@ class FillHelper {

public:
FillHelper(const std::shared_ptr<Hist_t> &h, unsigned int nSlots);
void Init(TTreeReader*, unsigned int) {}
void Exec(unsigned int slot, double v);
void Exec(unsigned int slot, double v, double w);

Expand Down Expand Up @@ -138,6 +142,8 @@ public:
}
}

void Init(TTreeReader*, unsigned int) {}

void Exec(unsigned int slot, double x0) // 1D histos
{
fTo->GetAtSlotUnchecked(slot)->Fill(x0);
Expand Down Expand Up @@ -236,6 +242,8 @@ public:
for (unsigned int i = 1; i < nSlots; ++i) fColls.emplace_back(std::make_shared<COLL>());
}

void Init(TTreeReader*, unsigned int) {}

template <typename V, typename std::enable_if<!TIsContainer<V>::fgValue, int>::type = 0>
void Exec(unsigned int slot, V v)
{
Expand Down Expand Up @@ -279,6 +287,8 @@ public:
}
}

void Init(TTreeReader*, unsigned int) {}

template <typename V, typename std::enable_if<!TIsContainer<V>::fgValue, int>::type = 0>
void Exec(unsigned int slot, V v)
{
Expand Down Expand Up @@ -318,6 +328,8 @@ public:
{
}

void Init(TTreeReader*, unsigned int) {}

void Exec(unsigned int slot, const T &value) { fReduceObjs[slot] = fReduceFun(fReduceObjs[slot], value); }

void Finalize()
Expand All @@ -332,6 +344,9 @@ class MinHelper {

public:
MinHelper(const std::shared_ptr<double> &minVPtr, unsigned int nSlots);

void Init(TTreeReader*, unsigned int) {}

void Exec(unsigned int slot, double v);

template <typename T, typename std::enable_if<TIsContainer<T>::fgValue, int>::type = 0>
Expand All @@ -355,6 +370,7 @@ class MaxHelper {

public:
MaxHelper(const std::shared_ptr<double> &maxVPtr, unsigned int nSlots);
void Init(TTreeReader*, unsigned int) {}
void Exec(unsigned int slot, double v);

template <typename T, typename std::enable_if<TIsContainer<T>::fgValue, int>::type = 0>
Expand All @@ -379,6 +395,7 @@ class MeanHelper {

public:
MeanHelper(const std::shared_ptr<double> &meanVPtr, unsigned int nSlots);
void Init(TTreeReader*, unsigned int) {}
void Exec(unsigned int slot, double v);

template <typename T, typename std::enable_if<TIsContainer<T>::fgValue, int>::type = 0>
Expand All @@ -399,6 +416,28 @@ extern template void MeanHelper::Exec(unsigned int, const std::vector<char> &);
extern template void MeanHelper::Exec(unsigned int, const std::vector<int> &);
extern template void MeanHelper::Exec(unsigned int, const std::vector<unsigned int> &);

template <typename F1, typename F2>
class SnapshotHelper {
F1 fInitFunc;
F2 fExecFunc;

public:
using BranchTypes_t = typename TRemoveFirst<typename TFunctionTraits<F2>::Args_t>::Types_t;
SnapshotHelper(F1 &&f1, F2 &&f2) : fInitFunc(f1), fExecFunc(f2) {}

void Init(TTreeReader *r, unsigned int slot) { fInitFunc(r, slot); }

template <typename... Args>
void Exec(unsigned int slot, Args &&... args)
{
// check that the decayed types of Args are the same as the branch types
static_assert(std::is_same<TTypeList<typename std::decay<Args>::type...>, BranchTypes_t>::value, "");
fExecFunc(slot, std::forward<Args>(args)...);
}

void Finalize() { /* noop */}
};

} // end of NS TDF
} // end of NS Internal
} // end of NS ROOT
Expand Down
61 changes: 44 additions & 17 deletions tree/treeplayer/inc/ROOT/TDFInterface.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "TProfile2D.h" // For Histo actions
#include "TRegexp.h"
#include "TROOT.h" // IsImplicitMTEnabled
#include "TTreeReader.h"

#include <initializer_list>
#include <memory>
Expand Down Expand Up @@ -1085,53 +1086,79 @@ protected:
}

auto df = GetDataFrameChecked();
auto inputTree = df->GetTree();

if (!ROOT::IsImplicitMTEnabled()) {
std::unique_ptr<TFile> ofile(TFile::Open(filenameInt.c_str(), "RECREATE"));
TTree t(treenameInt.c_str(), treenameInt.c_str());

bool FirstEvent = true;
auto fillTree = [&t, &bnames, &FirstEvent, &inputTree](Args &... args) {
// TODO move fillTree and initLambda to SnapshotHelper's body
auto fillTree = [&t, &bnames, &FirstEvent](unsigned int /* unused */, Args &... args) {
if (FirstEvent) {
// hack to call TTree::Branch on all variadic template arguments
std::initializer_list<int> expander = {(t.Branch(bnames[S].c_str(), &args), 0)..., 0};
(void)expander; // avoid unused variable warnings for older compilers such as gcc 4.9
// We need this in case we have multiple input file to rotate the pointers
// associated to the branches. This is in ROOT since forever to allow cloning of
// TChains and here, de facto, we are doing a clone.
inputTree->AddClone(&t);
FirstEvent = false;
}
t.Fill();
};

Foreach(fillTree, {bnames[S]...});
auto initLambda = [&t] (TTreeReader *r, unsigned int slot) {
if(r) {
// not an empty-source TDF
auto tree = r->GetTree();
tree->AddClone(&t);
}
};

using Op_t = TDFInternal::SnapshotHelper<decltype(initLambda), decltype(fillTree)>;
using DFA_t = TDFInternal::TAction<Op_t, Proxied>;
df->Book(std::make_shared<DFA_t>(Op_t(std::move(initLambda), std::move(fillTree)), bnames, *fProxiedPtr));
fProxiedPtr->IncrChildrenCount();
df->Run();
t.Write();
} else {
unsigned int nSlots = df->GetNSlots();
TBufferMerger merger (filenameInt.c_str(), "RECREATE");
TBufferMerger merger(filenameInt.c_str(), "RECREATE");
std::vector<std::shared_ptr<TBufferMergerFile>> files(nSlots);
std::vector<TTree *> trees(nSlots);
std::vector<TTree *> trees(nSlots); // ROOT owns/manages these TTrees
std::vector<int> isFirstEvent(nSlots, 1); // vector<bool> is evil

auto fillTree = [&](unsigned int slot, Args &... args) {
R__LOCKGUARD(gROOTMutex);
if (!trees[slot]) {
files[slot] = merger.GetFile();
trees[slot] = new TTree(treenameInt.c_str(), treenameInt.c_str());
// FIXME Here the tree needs to be added to the list of clones of the right chain
trees[slot]->ResetBit(kMustCleanup);
if (isFirstEvent[slot]) {
// hack to call TTree::Branch on all variadic template arguments
std::initializer_list<int> expander = {(trees[slot]->Branch(bnames[S].c_str(), &args), 0)..., 0};
(void)expander; // avoid unused variable warnings for older compilers such as gcc 4.9
isFirstEvent[slot] = 0;
}
trees[slot]->Fill();
auto entries = trees[slot]->GetEntries();
auto autoflush = trees[slot]->GetAutoFlush();
if ((autoflush > 0) && (entries % autoflush == 0)) files[slot]->Write();
};

ForeachSlot(fillTree, {bnames[S]...});
// called at the beginning of each task
auto initLambda = [&trees, &merger, &files, &treenameInt, &isFirstEvent] (TTreeReader *r, unsigned int slot) {
if(!trees[slot]) {
// first time this thread executes something, let's create a TBufferMerger output directory
files[slot] = merger.GetFile();
} else {
files[slot]->Write();
}
trees[slot] = new TTree(treenameInt.c_str(), treenameInt.c_str());
trees[slot]->ResetBit(kMustCleanup);
if(r) {
// not an empty-source TDF
auto tree = r->GetTree();
tree->AddClone(trees[slot]);
}
isFirstEvent[slot] = 1;
};

using Op_t = TDFInternal::SnapshotHelper<decltype(initLambda), decltype(fillTree)>;
using DFA_t = TDFInternal::TAction<Op_t, Proxied>;
df->Book(std::make_shared<DFA_t>(Op_t(std::move(initLambda), std::move(fillTree)), bnames, *fProxiedPtr));
fProxiedPtr->IncrChildrenCount();
df->Run();
for (auto &&file : files) file->Write();
}

Expand Down
15 changes: 8 additions & 7 deletions tree/treeplayer/inc/ROOT/TDFNodes.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public:
TLoopManager(const TLoopManager &) = delete;
~TLoopManager(){};
void Run();
void BuildAllReaderValues(TTreeReader *r, unsigned int slot);
void InitAllNodes(TTreeReader *r, unsigned int slot);
void CreateSlots(unsigned int nSlots);
TLoopManager *GetImplPtr();
std::shared_ptr<TLoopManager> GetSharedPtr() { return shared_from_this(); }
Expand Down Expand Up @@ -207,7 +207,7 @@ public:
TActionBase(TLoopManager *implPtr, const ColumnNames_t &tmpBranches);
virtual ~TActionBase() {}
virtual void Run(unsigned int slot, Long64_t entry) = 0;
virtual void BuildReaderValues(TTreeReader *r, unsigned int slot) = 0;
virtual void Init(TTreeReader *r, unsigned int slot) = 0;
virtual void CreateSlots(unsigned int nSlots) = 0;
};

Expand All @@ -230,9 +230,10 @@ public:

void CreateSlots(unsigned int nSlots) final { fValues.resize(nSlots); }

void BuildReaderValues(TTreeReader *r, unsigned int slot) final
void Init(TTreeReader *r, unsigned int slot) final
{
InitTDFValues(slot, fValues[slot], r, fBranches, fTmpBranches, fImplPtr->GetBookedBranches(), TypeInd_t());
fHelper.Init(r, slot);
}

void Run(unsigned int slot, Long64_t entry) final
Expand Down Expand Up @@ -269,7 +270,7 @@ protected:
public:
TCustomColumnBase(TLoopManager *df, const ColumnNames_t &tmpBranches, std::string_view name);
virtual ~TCustomColumnBase() {}
virtual void BuildReaderValues(TTreeReader *r, unsigned int slot) = 0;
virtual void Init(TTreeReader *r, unsigned int slot) = 0;
virtual void CreateSlots(unsigned int nSlots) = 0;
virtual void *GetValuePtr(unsigned int slot) = 0;
virtual const std::type_info &GetTypeId() const = 0;
Expand Down Expand Up @@ -308,7 +309,7 @@ public:

TCustomColumn(const TCustomColumn &) = delete;

void BuildReaderValues(TTreeReader *r, unsigned int slot) final
void Init(TTreeReader *r, unsigned int slot) final
{
TDFInternal::InitTDFValues(slot, fValues[slot], r, fBranches, fTmpBranches, fImplPtr->GetBookedBranches(),
TypeInd_t());
Expand Down Expand Up @@ -377,7 +378,7 @@ protected:
public:
TFilterBase(TLoopManager *df, const ColumnNames_t &tmpBranches, std::string_view name);
virtual ~TFilterBase() {}
virtual void BuildReaderValues(TTreeReader *r, unsigned int slot) = 0;
virtual void Init(TTreeReader *r, unsigned int slot) = 0;
virtual bool CheckFilters(unsigned int slot, Long64_t entry) = 0;
virtual void Report() const = 0;
virtual void PartialReport() const = 0;
Expand Down Expand Up @@ -444,7 +445,7 @@ public:
return fFilter(std::get<S>(fValues[slot]).Get(entry)...);
}

void BuildReaderValues(TTreeReader *r, unsigned int slot) final
void Init(TTreeReader *r, unsigned int slot) final
{
TDFInternal::InitTDFValues(slot, fValues[slot], r, fBranches, fTmpBranches, fImplPtr->GetBookedBranches(),
TypeInd_t());
Expand Down
16 changes: 8 additions & 8 deletions tree/treeplayer/src/TDFNodes.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ void TLoopManager::Run()
// Each task will generate a subrange of entries
auto genFunction = [this, &slotStack](const std::pair<Long64_t, Long64_t> &range) {
auto slot = slotStack.Pop();
BuildAllReaderValues(nullptr, slot);
InitAllNodes(nullptr, slot);
for (auto currEntry = range.first; currEntry < range.second; ++currEntry) {
RunAndCheckFilters(slot, currEntry);
}
Expand All @@ -182,7 +182,7 @@ void TLoopManager::Run()

tp->Process([this, &slotStack](TTreeReader &r) -> void {
auto slot = slotStack.Pop();
BuildAllReaderValues(&r, slot);
InitAllNodes(&r, slot);
// recursive call to check filters and conditionally execute actions
while (r.Next()) {
RunAndCheckFilters(slot, r.GetCurrentEntry());
Expand All @@ -194,13 +194,13 @@ void TLoopManager::Run()
#endif // R__USE_IMT
CreateSlots(1);
if (fNEmptyEntries > 0) {
BuildAllReaderValues(nullptr, 0);
InitAllNodes(nullptr, 0);
for (Long64_t currEntry = 0; currEntry < fNEmptyEntries && fNStopsReceived < fNChildren; ++currEntry) {
RunAndCheckFilters(0, currEntry);
}
} else {
TTreeReader r(fTree.get());
BuildAllReaderValues(&r, 0);
InitAllNodes(&r, 0);

// recursive call to check filters and conditionally execute actions
// in the non-MT case processing can be stopped early by ranges, hence the check on fNStopsReceived
Expand Down Expand Up @@ -229,13 +229,13 @@ void TLoopManager::Run()
/// calls their `BuildReaderValues` methods. It is called once per node per slot, before
/// running the event loop. It also informs each node of the TTreeReader that
/// a particular slot will be using.
void TLoopManager::BuildAllReaderValues(TTreeReader *r, unsigned int slot)
void TLoopManager::InitAllNodes(TTreeReader *r, unsigned int slot)
{
// booked branches must be initialized first
// because actions and filters might need to point to the values encapsulate
for (auto &bookedBranch : fBookedBranches) bookedBranch.second->BuildReaderValues(r, slot);
for (auto &ptr : fBookedActions) ptr->BuildReaderValues(r, slot);
for (auto &ptr : fBookedFilters) ptr->BuildReaderValues(r, slot);
for (auto &bookedBranch : fBookedBranches) bookedBranch.second->Init(r, slot);
for (auto &ptr : fBookedActions) ptr->Init(r, slot);
for (auto &ptr : fBookedFilters) ptr->Init(r, slot);
}

/// Initialize all nodes of the functional graph before running the event loop
Expand Down