Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 43 additions & 11 deletions tree/treeplayer/inc/ROOT/TDataSource.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,63 @@ namespace ROOT {
namespace Experimental {
namespace TDF {

/**
\class ROOT::Experimental::TDF::TDataSource
\ingroup dataframe
\brief The TDataSource interface dictates how a TDataFrame interface to an arbitrary data format should look like.

A TDataSource allows to seamlessly provide an adaptor for any kind of data set or data format to the TDataFrame.
*/
class TDataSource {
public:
virtual ~TDataSource(){};
virtual ~TDataSource() = default;

/// \brief Returns a reference to the collection of the dataset's column names
virtual const std::vector<std::string> &GetColumnNames() const = 0;

/// \brief Checks if the dataset has a certain column
/// \param[in] columnName The name of the column
virtual bool HasColumn(std::string_view) const = 0;
/// Type of a column as a string, e.g. `GetTypeName("x") == "double"`. Required for jitting e.g. `df.Filter("x>0")`.

/// \brief Type of a column as a string, e.g. `GetTypeName("x") == "double"`. Required for jitting e.g. `df.Filter("x>0")`.
/// \param[in] columnName The name of the column
virtual std::string GetTypeName(std::string_view) const = 0;

/// Called at most once per column by TDF. Return vector of pointers to pointers to column values - one per slot.
/// \tparam T The type of the data stored in the column
/// \param[in] columnName The name of the column
///
/// These pointers are veritable cursors: it's a responsibility of the TDataSource implementation that they point to the
/// "right" memory region.
template <typename T>
std::vector<T **> GetColumnReaders(std::string_view name, unsigned int nSlots)
std::vector<T **> GetColumnReaders(std::string_view columnName)
{
auto typeErasedVec = GetColumnReadersImpl(name, nSlots, typeid(T));
auto typeErasedVec = GetColumnReadersImpl(columnName, typeid(T));
std::vector<T **> typedVec(typeErasedVec.size());
std::transform(typeErasedVec.begin(), typeErasedVec.end(), typedVec.begin(),
[](void *p) { return static_cast<T **>(p); });
return typedVec;
}
/// Return chunks of entries to distribute to tasks. They are required to be continguous intervals with no entries
/// skipped, starting at 0 and ending at nEntries, e.g. [0-5],[5-10] for 10 entries.

/// \brief Return ranges of entries to distribute to tasks.
/// They are required to be contiguous intervals with no entries skipped. Supposing a dataset with nEntries, the intervals
/// must start at 0 and end at nEntries, e.g. [0-5],[5-10] for 10 entries.
virtual const std::vector<std::pair<ULong64_t, ULong64_t>> &GetEntryRanges() const = 0;
/// Different threads will loop over different ranges and will pass different "slot" values.
virtual void SetEntry(ULong64_t entry, unsigned int slot) = 0;
/// Convenience method called at the start of each task, before processing a range of entries.
/// DataSources can implement it if needed (does nothing by default).
/// firstEntry is the first entry of the range that the task will process.

/// \brief Advance the "cursors" returned by GetColumnReaders to the selected entry for a particular slot.
/// \param[in] slot The data processing slot that needs to be considered
/// \param[in] entry The entry which needs to be pointed to by the reader pointers
/// Slots are adopted to accommodate parallel data processing. Different workers will loop over different ranges and will
/// be labelled by different "slot" values.
virtual void SetEntry(unsigned int slot, ULong64_t entry) = 0;

/// \brief Convenience method to set the number of slots
/// For some implementation it's necessary to know the number of slots in advance for optimisation purposes.
virtual void SetNSlots(unsigned int nSlots) = 0;

/// \brief Convenience method called at the start of the data processing.
/// \param[in] slot The data processing slot wihch needs to be initialised
/// \param[in] firstEntry The first entry of the range that the task will process.
virtual void InitSlot(unsigned int /*slot*/, ULong64_t /*firstEntry*/) {}

protected:
Expand Down
42 changes: 42 additions & 0 deletions tree/treeplayer/inc/ROOT/TRootDS.hxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#ifndef ROOT_TROOTTDS
#define ROOT_TROOTTDS

#include "ROOT/TDataSource.hxx"
#include <TChain.h>

#include <memory>

namespace ROOT {
namespace Experimental {
namespace TDF {

class TRootDS final : public ROOT::Experimental::TDF::TDataSource {
private:
unsigned int fNSlots = 0U;
std::string fTreeName;
std::string fFileNameGlob;
mutable TChain fModelChain; // Mutable needed for getting the column type name
std::vector<double*> fAddressesToFree;
std::vector<std::string> fListOfBranches;
std::vector<std::pair<ULong64_t, ULong64_t>> fEntryRanges;
std::vector<std::vector<void *>> fBranchAddresses; // first container-> slot, second -> column;
std::vector<std::unique_ptr<TChain>> fChains;

std::vector<void *> GetColumnReadersImpl(std::string_view, const std::type_info &);

public:
TRootDS(std::string_view treeName, std::string_view fileNameGlob);
~TRootDS();
std::string GetTypeName(std::string_view colName) const;
const std::vector<std::string> &GetColumnNames() const;
bool HasColumn(std::string_view colName) const;
void InitSlot(unsigned int slot, ULong64_t firstEntry);
const std::vector<std::pair<ULong64_t, ULong64_t>> &GetEntryRanges() const;
void SetEntry(unsigned int slot, ULong64_t entry);
void SetNSlots(unsigned int nSlots);
};
} // ns TDF
} // ns Experimental
} // ns ROOT

#endif
34 changes: 34 additions & 0 deletions tree/treeplayer/inc/ROOT/TTrivialDS.hxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#ifndef ROOT_TTRIVIALTDS
#define ROOT_TTRIVIALTDS

#include "ROOT/TDataSource.hxx"

namespace ROOT {
namespace Experimental {
namespace TDF {

class TTrivialDS final : public ROOT::Experimental::TDF::TDataSource {
private:
unsigned int fNSlots = 0U;
ULong64_t fSize = 0ULL;
std::vector<std::pair<ULong64_t, ULong64_t>> fEntryRanges;
std::vector<std::string> fColNames{"col0"};
std::vector<ULong64_t> fCounter;
std::vector<ULong64_t *> fCounterAddr;
std::vector<void *> GetColumnReadersImpl(std::string_view name, const std::type_info &);

public:
TTrivialDS(ULong64_t size);
~TTrivialDS();
const std::vector<std::string> &GetColumnNames() const;
bool HasColumn(std::string_view colName) const;
std::string GetTypeName(std::string_view) const;
const std::vector<std::pair<ULong64_t, ULong64_t>> &GetEntryRanges() const;
void SetEntry(unsigned int slot, ULong64_t entry);
void SetNSlots(unsigned int nSlots);
};
} // ns TDF
} // ns Experimental
} // ns ROOT

#endif
3 changes: 2 additions & 1 deletion tree/treeplayer/src/TDFNodes.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ TLoopManager::TLoopManager(std::unique_ptr<TDataSource> ds, const ColumnNames_t
: fDefaultColumns(defaultBranches), fNSlots(TDFInternal::GetNSlots()), fLoopType(ELoopType::kDataSource),
fDataSource(std::move(ds))
{
fDataSource->SetNSlots(fNSlots);
}

/// Run event loop with no source files, in parallel.
Expand Down Expand Up @@ -225,7 +226,7 @@ void TLoopManager::RunDataSource()
// we are running single-thread, so all ranges are squashed together
const auto lastEntry = rangePairs.back().second;
for (ULong64_t i = 0ull; i < lastEntry; ++i) {
fDataSource->SetEntry(i, 0);
fDataSource->SetEntry(0, i);
RunAndCheckFilters(0, i);
}
}
Expand Down
140 changes: 140 additions & 0 deletions tree/treeplayer/src/TRootDS.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#include <ROOT/TDFUtils.hxx>
#include <ROOT/TRootDS.hxx>
#include <ROOT/TSeq.hxx>
#include <TClass.h>
#include <TInterpreter.h>

#include <algorithm>
#include <vector>

namespace ROOT {
namespace Experimental {
namespace TDF {

std::vector<void *> TRootDS::GetColumnReadersImpl(std::string_view name, const std::type_info &)
{

const auto &colNames = GetColumnNames();

if (fBranchAddresses.empty()) {
auto nColumns = colNames.size();
// Initialise the entire set of addresses
fBranchAddresses.resize(nColumns, std::vector<void *>(fNSlots, nullptr));
}

const auto index = std::distance(colNames.begin(), std::find(colNames.begin(), colNames.end(), name));
std::vector<void *> ret(fNSlots);
for (auto slot : ROOT::TSeqU(fNSlots)) {
ret[slot] = (void *)&fBranchAddresses[index][slot];
}
return ret;
}

TRootDS::TRootDS(std::string_view treeName, std::string_view fileNameGlob)
: fTreeName(treeName), fFileNameGlob(fileNameGlob), fModelChain(std::string(treeName).c_str())
{
fModelChain.Add(fFileNameGlob.c_str());

auto &lob = *fModelChain.GetListOfBranches();
fListOfBranches.resize(lob.GetEntries());
std::transform(lob.begin(), lob.end(), fListOfBranches.begin(), [](TObject *o) { return o->GetName(); });
}

TRootDS::~TRootDS()
{
for(auto addr : fAddressesToFree) {
delete addr;
}
}

std::string TRootDS::GetTypeName(std::string_view colName) const
{
if (!HasColumn(colName)) {
std::string e = "The dataset does not have column ";
e += colName;
throw std::runtime_error(e);
}
// TODO: we need to factor out the routine for the branch alone...
// Maybe a cache for the names?
auto typeName = ROOT::Internal::TDF::ColumnName2ColumnTypeName(std::string(colName).c_str(), &fModelChain,
nullptr /*TCustomColumnBase here*/);
// We may not have yet loaded the library where the dictionary of this type
// is
TClass::GetClass(typeName.c_str());
return typeName;
}

const std::vector<std::string> &TRootDS::GetColumnNames() const
{
return fListOfBranches;
}

bool TRootDS::HasColumn(std::string_view colName) const
{
if (!fListOfBranches.empty())
GetColumnNames();
return fListOfBranches.end() != std::find(fListOfBranches.begin(), fListOfBranches.end(), colName);
}

void TRootDS::InitSlot(unsigned int slot, ULong64_t firstEntry)
{
auto chain = new TChain(fTreeName.c_str());
fChains[slot].reset(chain);
chain->Add(fFileNameGlob.c_str());
chain->GetEntry(firstEntry);
TString setBranches;
for (auto i : ROOT::TSeqU(fListOfBranches.size())) {
auto colName = fListOfBranches[i].c_str();
auto &addr = fBranchAddresses[i][slot];
auto typeName = GetTypeName(colName);
auto typeClass = TClass::GetClass(typeName.c_str());
if (typeClass) {
// chain->SetBranchAddress(colName, &addr);
setBranches += TString::Format("((TChain*)%p)->SetBranchAddress(\"%s\", (%s**)%p);\n", chain, colName, typeClass->GetName(), &addr);
} else {
if (!addr) {
addr = new double(); // who frees this :) ?
fAddressesToFree.emplace_back((double*)addr);
}
chain->SetBranchAddress(colName, addr);
//setBranches += TString::Format("(*(void*)%p) = new %s();((TChain*)%p)->SetBranchAddress(\"%s\", (%s*)%p);\n",&addr, typeName.c_str(), chain, colName, typeName.c_str(), addr);
}
}
gInterpreter->Calc(setBranches);
}

const std::vector<std::pair<ULong64_t, ULong64_t>> &TRootDS::GetEntryRanges() const
{
if (fEntryRanges.empty()) {
throw std::runtime_error("No ranges are available. Did you set the number of slots?");
}
return fEntryRanges;
}

void TRootDS::SetEntry(unsigned int slot, ULong64_t entry)
{
fChains[slot]->GetEntry(entry);
}

void TRootDS::SetNSlots(unsigned int nSlots)
{
assert(0U == fNSlots && "Setting the number of slots even if the number of slots is different from zero.");

fNSlots = nSlots;
fChains.resize(fNSlots);
auto nentries = fModelChain.GetEntries();
auto chunkSize = nentries / fNSlots;
auto reminder = 1U == fNSlots ? 0 : nentries % fNSlots;
auto start = 0UL;
auto end = 0UL;
for (auto i : ROOT::TSeqU(fNSlots)) {
start = end;
end += chunkSize;
fEntryRanges.emplace_back(start, end);
(void)i;
}
fEntryRanges.back().second += reminder;
}
} // ns TDF
} // ns Experimental
} // ns ROOT
77 changes: 77 additions & 0 deletions tree/treeplayer/src/TTrivialDS.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#include <ROOT/TDFUtils.hxx>
#include <ROOT/TSeq.hxx>
#include <ROOT/TTrivialDS.hxx>

namespace ROOT {
namespace Experimental {
namespace TDF {

std::vector<void *> TTrivialDS::GetColumnReadersImpl(std::string_view, const std::type_info &)
{
std::vector<void *> ret;
for (auto i : ROOT::TSeqU(fNSlots)) {
fCounterAddr[i] = &fCounter[i];
ret.emplace_back((void *)(&fCounterAddr[i]));
}
return ret;
}

TTrivialDS::TTrivialDS(ULong64_t size) : fSize(size)
{
}

TTrivialDS::~TTrivialDS()
{
}

const std::vector<std::string> &TTrivialDS::GetColumnNames() const
{
return fColNames;
}

bool TTrivialDS::HasColumn(std::string_view colName) const
{
return colName == fColNames[0];
}

std::string TTrivialDS::GetTypeName(std::string_view) const
{
return "ULong64_t";
}

const std::vector<std::pair<ULong64_t, ULong64_t>> &TTrivialDS::GetEntryRanges() const
{
if (fEntryRanges.empty()) {
throw std::runtime_error("No ranges are available. Did you set the number of slots?");
}
return fEntryRanges;
}

void TTrivialDS::SetEntry(unsigned int slot, ULong64_t entry)
{
fCounter[slot] = entry;
}

void TTrivialDS::SetNSlots(unsigned int nSlots)
{
assert(0U == fNSlots && "Setting the number of slots even if the number of slots is different from zero.");

fNSlots = nSlots;
fCounter.resize(fNSlots);
fCounterAddr.resize(fNSlots);

auto chunkSize = fSize / fNSlots;
auto start = 0UL;
auto end = 0UL;
for (auto i : ROOT::TSeqUL(fNSlots)) {
start = end;
end += chunkSize;
fEntryRanges.emplace_back(start, end);
(void)i;
}
// TODO: redistribute reminder to all slots
fEntryRanges.back().second += fSize % fNSlots;
}
} // ns TDF
} // ns Experimental
} // ns ROOT
2 changes: 2 additions & 0 deletions tree/treeplayer/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ ROOT_ADD_GTEST(dataframe_regression dataframe/dataframe_regression.cxx LIBRARIES
ROOT_ADD_GTEST(dataframe_interface dataframe/dataframe_interface.cxx LIBRARIES TreePlayer)
ROOT_ADD_GTEST(dataframe_utils dataframe/dataframe_utils.cxx LIBRARIES TreePlayer)
ROOT_ADD_GTEST(dataframe_nodes dataframe/dataframe_nodes.cxx LIBRARIES TreePlayer)
ROOT_ADD_GTEST(datasource_trivial dataframe/datasource_trivial.cxx LIBRARIES TreePlayer)
ROOT_ADD_GTEST(datasource_root dataframe/datasource_root.cxx LIBRARIES TreePlayer)
Loading