diff --git a/io/io/CMakeLists.txt b/io/io/CMakeLists.txt index 46c7966a98388..d4b3a663f075a 100644 --- a/io/io/CMakeLists.txt +++ b/io/io/CMakeLists.txt @@ -6,7 +6,7 @@ set(libname RIO) include_directories(${CMAKE_SOURCE_DIR}/core/clib/res) -ROOT_GENERATE_DICTIONARY(G__IO *.h STAGE1 MODULE ${libname} LINKDEF LinkDef.h) +ROOT_GENERATE_DICTIONARY(G__IO *.h ROOT/*.hxx STAGE1 MODULE ${libname} LINKDEF LinkDef.h) if(root7) ROOT_GLOB_SOURCES(root7src RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} v7/src/*.cxx) @@ -18,4 +18,8 @@ ROOT_LINKER_LIBRARY(${libname} $ $ +#include +#include +#include +#include + +class TArrayC; +class TBufferFile; + +namespace ROOT { +namespace Experimental { + +class TBufferMergerFile; + +/** + * \class TBufferMerger TBufferMerger.hxx + * \ingroup IO + * + * TBufferMerger is a class to facilitate writing data in + * parallel from multiple threads, while writing to a single + * output file. Its purpose is similar to TParallelMergingFile, + * but instead of using processes that connect to a network + * socket, TBufferMerger uses threads that each write to a + * TBufferMergerFile, which in turn push data into a queue + * managed by the TBufferMerger. + */ + +class TBufferMerger { +public: + /** Constructor + * @param name Output file name + * @param option Output file creation options + * @param ftitle Output file title + * @param compression Output file compression level + */ + TBufferMerger(const char *name, Option_t *option = "RECREATE", const char *ftitle = "", Int_t compress = 1); + + /** Destructor */ + virtual ~TBufferMerger(); + + /** Returns a TBufferMergerFile to which data can be written. + * At the end, all TBufferMergerFiles get merged into the output file. + */ + std::shared_ptr GetFile(); + + friend class TBufferMergerFile; + +private: + /** TBufferMerger has no default constructor */ + TBufferMerger(); + + /** TBufferMerger has no copy constructor */ + TBufferMerger(const TBufferMerger &); + + /** TBufferMerger has no copy operator */ + TBufferMerger& operator=(const TBufferMerger&); + + void Push(TBufferFile *buffer); + void Listen(); + + std::mutex fFilesMutex; //< Mutex used to lock fAttachedFiles + std::mutex fQueueMutex; //< Mutex used to lock fQueue + std::mutex fWriteMutex; //< Mutex used for the condition variable + std::condition_variable fCV; //< Condition variable used to wait for data + std::queue fQueue; //< Queue to which data is pushed and merged + std::unique_ptr fFile; //< Output file, owned by the user + std::unique_ptr fMergingThread; //< Worker thread that writes to disk + std::vector> fAttachedFiles; //< Attached files + + ClassDef(TBufferMerger, 0); +}; + +/** + * \class TBufferMerger TBufferMerger.hxx + * \ingroup IO + * + * A TBufferMergerFile is similar to a TMemFile, but when data + * is written to it, it is appended to the TBufferMerger queue. + * The TBufferMerger merges all data into the output file on disk. + */ + +class TBufferMergerFile : public TMemFile { +private: + TBufferMerger &fMerger; //< TBufferMerger this file is attached to + std::unique_ptr fClassSent; //< StreamerInfo this file has already written + + /** Constructor. Can only be called by TBufferMerger. + * @param m Merger this file is attached to. */ + TBufferMergerFile(TBufferMerger &m); + + /** TBufferMergerFile has no default constructor. */ + TBufferMergerFile(); + + /** TBufferMergerFile has no copy constructor. */ + TBufferMergerFile(const TBufferMergerFile &); + + /** TBufferMergerFile has no copy operator */ + TBufferMergerFile& operator=(const TBufferMergerFile&); + + friend class TBufferMerger; + +public: + /** Destructor */ + ~TBufferMergerFile(); + + using TMemFile::Write; + + /** Write data into a TBufferFile and append it to TBufferMerger. + * @param name Name + * @param opt Options + * @param bufsize Buffer size + * This function must be called before the TBufferMergerFile gets destroyed, + * or no data is appended to the TBufferMerger. + */ + virtual Int_t Write(const char *name = nullptr, Int_t opt = 0, Int_t bufsize = 0) override; + + /** Write StreamerInfo for objects that have not already been written. */ + virtual void WriteStreamerInfo() override; + + ClassDefOverride(TBufferMergerFile, 0); +}; + +} // namespace Experimental +} // namespace ROOT + +#endif diff --git a/io/io/src/TBufferMerger.cxx b/io/io/src/TBufferMerger.cxx new file mode 100644 index 0000000000000..24dcc851c74fd --- /dev/null +++ b/io/io/src/TBufferMerger.cxx @@ -0,0 +1,378 @@ +// @(#)root/io:$Id$ +// Author: Philippe Canal, Witold Pokorski, and Guilherme Amadio + +/************************************************************************* + * Copyright (C) 1995-2017, Rene Brun and Fons Rademakers. * + * All rights reserved. * + * * + * For the licensing terms see $ROOTSYS/LICENSE. * + * For the list of contributors see $ROOTSYS/README/CREDITS. * + *************************************************************************/ + +#include "ROOT/TBufferMerger.hxx" + +#include "TBits.h" +#include "TBufferFile.h" +#include "TClass.h" +#include "TError.h" +#include "TFileCacheWrite.h" +#include "TFileMerger.h" +#include "TKey.h" +#include "TMath.h" +#include "TMemFile.h" +#include "TSystem.h" +#include "TTimeStamp.h" + +namespace { + +Bool_t R__NeedInitialMerge(TDirectory *dir) +{ + if (dir == 0) return false; + + TIter nextkey(dir->GetListOfKeys()); + for (TKey *key = (TKey *)nextkey(); key != nullptr; key = (TKey *)nextkey()) { + TClass *cl = TClass::GetClass(key->GetClassName()); + if (cl->InheritsFrom(TDirectory::Class())) { + TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName()); + if (!subdir) { + subdir = (TDirectory *)key->ReadObj(); + } + if (R__NeedInitialMerge(subdir)) { + return true; + } + } else { + if (0 != cl->GetResetAfterMerge()) { + return true; + } + } + } + return false; +} + +void R__DeleteObject(TDirectory *dir, Bool_t withReset) +{ + if (dir == 0) return; + + TIter nextkey(dir->GetListOfKeys()); + TKey *key; + while ((key = (TKey *)nextkey())) { + TClass *cl = TClass::GetClass(key->GetClassName()); + if (cl->InheritsFrom(TDirectory::Class())) { + TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName()); + if (!subdir) { + subdir = (TDirectory *)key->ReadObj(); + } + R__DeleteObject(subdir, withReset); + } else { + Bool_t todelete = false; + if (withReset) { + todelete = (0 != cl->GetResetAfterMerge()); + } else { + todelete = (0 == cl->GetResetAfterMerge()); + } + if (todelete) { + key->Delete(); + dir->GetListOfKeys()->Remove(key); + delete key; + } + } + } +} + +void R__MigrateKey(TDirectory *destination, TDirectory *source) +{ + if (destination == 0 || source == 0) return; + TIter nextkey(source->GetListOfKeys()); + + TKey *key; + + while ((key = (TKey *)nextkey())) { + TClass *cl = TClass::GetClass(key->GetClassName()); + if (cl->InheritsFrom(TDirectory::Class())) { + TDirectory *source_subdir = (TDirectory *)source->GetList()->FindObject(key->GetName()); + if (!source_subdir) { + source_subdir = (TDirectory *)key->ReadObj(); + } + TDirectory *destination_subdir = destination->GetDirectory(key->GetName()); + if (!destination_subdir) { + destination_subdir = destination->mkdir(key->GetName()); + } + R__MigrateKey(destination_subdir, source_subdir); + } else { + TKey *oldkey = destination->GetKey(key->GetName()); + if (oldkey) { + oldkey->Delete(); + delete oldkey; + } + // a priori the files are from the same client + TKey *newkey = new TKey(destination, *key, 0 /* pidoffset */); + destination->GetFile()->SumBuffer(newkey->GetObjlen()); + newkey->WriteFile(0); + if (destination->GetFile()->TestBit(TFile::kWriteError)) { + return; + } + } + } + destination->SaveSelf(); +} + +struct ClientInfo { + TFile *fFile; // This object does *not* own the file, it's owned by the owner of ClientInfo + TString fLocalName; + UInt_t fContactsCount; + TTimeStamp fLastContact; + Double_t fTimeSincePrevContact; + + ClientInfo() : fFile(0), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {} + + ClientInfo(const char *filename, UInt_t clientId) : fFile(0), fContactsCount(0), fTimeSincePrevContact(0) + { + fLocalName.Form("%s-%d-%d", filename, clientId, gSystem->GetPid()); + } + + void Set(TFile *file) + { + // Register the new file as coming from this client. + if (file != fFile) { + // We need to keep any of the keys from the previous file that + // are not in the new file. + if (fFile) { + R__MigrateKey(fFile, file); + // delete the previous memory file (if any) + delete file; + } else { + fFile = file; + } + } + TTimeStamp now; + fTimeSincePrevContact = now.AsDouble() - fLastContact.AsDouble(); + fLastContact = now; + ++fContactsCount; + } +}; + +struct ThreadFileMerger : public TObject { + TString fFilename; + TBits fClientsContact; + UInt_t fNClientsContact; + std::vector fClients; + TTimeStamp fLastMerge; + TFileMerger fMerger; + + ThreadFileMerger(const char *filename, Bool_t writeCache = false) + : fFilename(filename), fNClientsContact(0), fMerger(false, true) + { + fMerger.SetPrintLevel(0); + fMerger.OutputFile(filename, "RECREATE"); + if (writeCache) new TFileCacheWrite(fMerger.GetOutputFile(), 32 * 1024 * 1024); + } + + ~ThreadFileMerger() + { + for (auto &&client : fClients) delete client.fFile; + } + + const char *GetName() const { return fFilename; } + + ULong_t Hash() const { return fFilename.Hash(); } + + Bool_t InitialMerge(TFile *input) + { + fMerger.AddFile(input); + Bool_t result = fMerger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kResetable); + R__DeleteObject(input, true); + return result; + } + + Bool_t NeedFinalMerge() { return fClientsContact.CountBits() > 0; } + + Bool_t NeedMerge(Float_t clientThreshold) + { + if (fClients.size() == 0) { + return false; + } + + // Calculate average and rms of the time between the last 2 contacts. + Double_t sum = 0; + Double_t sum2 = 0; + for (unsigned int c = 0; c < fClients.size(); ++c) { + sum += fClients[c].fTimeSincePrevContact; + sum2 += fClients[c].fTimeSincePrevContact * fClients[c].fTimeSincePrevContact; + } + Double_t avg = sum / fClients.size(); + Double_t sigma = sum2 ? TMath::Sqrt(sum2 / fClients.size() - avg * avg) : 0; + Double_t target = avg + 2 * sigma; + TTimeStamp now; + if ((now.AsDouble() - fLastMerge.AsDouble()) > target) { + return true; + } + Float_t cut = clientThreshold * fClients.size(); + return fClientsContact.CountBits() > cut || fNClientsContact > 2 * cut; + } + + Bool_t Merge() + { + // Merge the current inputs into the output file. + + // Remove object that can *not* be incrementally merged + // and will *not* be reset by the client code. + R__DeleteObject(fMerger.GetOutputFile(), false); + + for (unsigned int f = 0; f < fClients.size(); ++f) { + fMerger.AddFile(fClients[f].fFile); + } + + Bool_t result = fMerger.PartialMerge(TFileMerger::kAllIncremental); + + // Remove any 'resetable' object (like TTree) from the input file + // so that they will not be re-merged. Keep only the object that + // always need to be re-merged (Histograms). + + for (unsigned int f = 0; f < fClients.size(); ++f) { + if (fClients[f].fFile) { + R__DeleteObject(fClients[f].fFile, true); + } else { + // We back up the file (probably due to memory constraint) + TFile *file = TFile::Open(fClients[f].fLocalName, "UPDATE"); + // Remove object that can be incrementally merged and + // will be reset by the client code. + R__DeleteObject(file, true); + file->Write(); + delete file; + } + } + fLastMerge = TTimeStamp(); + fNClientsContact = 0; + fClientsContact.Clear(); + + return result; + } + + void RegisterClient(UInt_t clientId, TFile *file) + { + ++fNClientsContact; + + fClientsContact.SetBitNumber(clientId); + + if (fClients.size() < clientId + 1) { + fClients.push_back(ClientInfo(fFilename, clientId)); + } + fClients[clientId].Set(file); + } +}; + +} // unnamed namespace + +namespace ROOT { +namespace Experimental { + +TBufferMerger::TBufferMerger(const char *name, Option_t *option, const char *ftitle, Int_t compress) + : fFile(TFile::Open(name, option, ftitle, compress)), fMergingThread(new std::thread([this]() { this->Listen(); })) +{ +} + +TBufferMerger::~TBufferMerger() +{ + for (auto f : fAttachedFiles) + if (!f.expired()) Fatal("TBufferMerger", " TBufferMergerFiles must be destroyed before the server"); + + this->Push(nullptr); + fCV.notify_one(); + + fMergingThread->join(); +} + +std::shared_ptr TBufferMerger::GetFile() +{ + std::shared_ptr f; + { + std::lock_guard lk(fFilesMutex); + f.reset(new TBufferMergerFile(*this)); + fAttachedFiles.push_back(f); + } + return f; +} + +void TBufferMerger::Push(TBufferFile *buffer) +{ + { + std::lock_guard lock(fQueueMutex); + fQueue.push(buffer); + } + + fCV.notify_one(); +} + +void TBufferMerger::Listen() +{ + std::unique_lock wlock(fWriteMutex); + + bool done = false; + THashTable mergers; + + while (!done) { + fCV.wait(wlock, [this]() { return !this->fQueue.empty(); }); + + while (!fQueue.empty()) { + std::unique_ptr buffer; + + { + std::lock_guard qlock(fQueueMutex); + buffer.reset(fQueue.front()); + fQueue.pop(); + } + + if (!buffer) { + done = true; + break; + } + + buffer->SetReadMode(); + buffer->SetBufferOffset(); + + Long64_t length; + TString filename; + + buffer->ReadTString(filename); + buffer->ReadLong64(length); + + // UPDATE because we need to remove the TTree after merging them. + TMemFile *transient = new TMemFile(filename, buffer->Buffer() + buffer->Length(), length, "UPDATE"); + + buffer->SetBufferOffset(buffer->Length() + length); + + // control how often the histogram are merged. Here as soon as half the clients have reported. + const Float_t clientThreshold = 0.75; + + ThreadFileMerger *info = (ThreadFileMerger *)mergers.FindObject(filename); + + if (!info) { + info = new ThreadFileMerger(filename, false); + mergers.Add(info); + } + + if (R__NeedInitialMerge(transient)) { + info->InitialMerge(transient); + } + + info->RegisterClient(0, transient); + + if (info->NeedMerge(clientThreshold)) info->Merge(); + + transient = nullptr; + } + } + + TIter next(&mergers); + ThreadFileMerger *info; + while ((info = (ThreadFileMerger *)next())) { + if (info->NeedFinalMerge()) { + info->Merge(); + } + } + + mergers.Delete(); +} + +} // namespace Experimental +} // namespace ROOT diff --git a/io/io/src/TBufferMergerFile.cxx b/io/io/src/TBufferMergerFile.cxx new file mode 100644 index 0000000000000..dc68146844c5a --- /dev/null +++ b/io/io/src/TBufferMergerFile.cxx @@ -0,0 +1,83 @@ +// @(#)root/io:$Id$ +// Author: Philippe Canal, Witold Pokorski, and Guilherme Amadio + +/************************************************************************* + * Copyright (C) 1995-2017, Rene Brun and Fons Rademakers. * + * All rights reserved. * + * * + * For the licensing terms see $ROOTSYS/LICENSE. * + * For the list of contributors see $ROOTSYS/README/CREDITS. * + *************************************************************************/ + +#include "ROOT/TBufferMerger.hxx" + +#include "TArrayC.h" +#include "TBufferFile.h" + +namespace ROOT { +namespace Experimental { + +TBufferMergerFile::TBufferMergerFile(TBufferMerger &m) + : TMemFile(m.fFile->GetName(), "RECREATE", "", m.fFile->GetCompressionLevel()), fMerger(m), fClassSent(nullptr) +{ +} + +TBufferMergerFile::~TBufferMergerFile() +{ +} + +Int_t TBufferMergerFile::Write(const char *name, Int_t opt, Int_t bufsize) +{ + Int_t nbytes = TMemFile::Write(name, opt, bufsize); + + if (nbytes) { + TBufferFile *fBuffer = new TBufferFile(TBuffer::kWrite); + + fBuffer->WriteTString(GetName()); + fBuffer->WriteLong64(GetEND()); + CopyTo(*fBuffer); + + fMerger.Push(fBuffer); + + // Record StreamerInfo sent to the server + Int_t isize = fClassIndex->GetSize(); + if (!fClassSent) { + fClassSent.reset(new TArrayC(isize)); + } else { + if (isize > fClassSent->GetSize()) { + fClassSent->Set(isize); + } + } + for (Int_t c = 0; c < isize; ++c) { + if (fClassIndex->fArray[c]) { + fClassSent->fArray[c] = 1; + } + } + ResetAfterMerge(0); + } + return nbytes; +} + +void TBufferMergerFile::WriteStreamerInfo() +{ + if (!fWritable) return; + if (!fClassIndex) return; + // no need to update the index if no new classes added to the file + if (fClassIndex->fArray[0] == 0) return; + + // clear fClassIndex for anything we already sent. + if (fClassSent) { + Int_t isize = fClassIndex->GetSize(); + Int_t ssize = fClassSent->GetSize(); + for (Int_t c = 0; c < isize && c < ssize; ++c) { + if (fClassSent->fArray[c]) { + fClassIndex->fArray[c] = 0; + } + } + } + + TMemFile::WriteStreamerInfo(); +} + +} // namespace Experimental +} // namespace ROOT diff --git a/io/io/test/CMakeLists.txt b/io/io/test/CMakeLists.txt new file mode 100644 index 0000000000000..b1bc38fa16472 --- /dev/null +++ b/io/io/test/CMakeLists.txt @@ -0,0 +1 @@ +ROOT_ADD_GTEST(testTBufferMerger TBufferMerger.cxx LIBRARIES RIO Tree) diff --git a/io/io/test/TBufferMerger.cxx b/io/io/test/TBufferMerger.cxx new file mode 100644 index 0000000000000..503694dcf565a --- /dev/null +++ b/io/io/test/TBufferMerger.cxx @@ -0,0 +1,127 @@ +#include "ROOT/TBufferMerger.hxx" + +#include "TFile.h" +#include "TROOT.h" +#include "TTree.h" + +#include +#include + +#include "gtest/gtest.h" + +using namespace ROOT::Experimental; + +static void Fill(std::shared_ptr tree, int init, int count) +{ + int n = 0; + + tree->Branch("n", &n, "n/I"); + + for (int i = 0; i < count; ++i) { + n = init + i; + tree->Fill(); + } +} + +TEST(TBufferMerger, CreateAndDestroy) +{ + TBufferMerger merger("tbuffermerger_create.root"); +} + +TEST(TBufferMerger, CreateAndDestroyWithAttachedFiles) +{ + TBufferMerger merger("tbuffermerger_create.root"); + + auto f1 = merger.GetFile(); + auto f2 = merger.GetFile(); + auto f3 = merger.GetFile(); +} + +TEST(TBufferMerger, SequentialTreeFill) +{ + int nevents = 1024; + + ROOT::EnableThreadSafety(); + + { + TBufferMerger merger("tbuffermerger_sequential.root"); + + auto myfile = merger.GetFile(); + auto mytree = std::make_shared("mytree", "mytree"); + + Fill(mytree, 0, nevents); + + mytree->Write(); + myfile->Write(); + } +} + +TEST(TBufferMerger, ParallelTreeFill) +{ + int nthreads = 4; + int nevents = 256; + + ROOT::EnableThreadSafety(); + + { + TBufferMerger merger("tbuffermerger_parallel.root"); + std::vector threads; + for (int i = 0; i < nthreads; ++i) { + threads.emplace_back([=, &merger]() { + auto myfile = merger.GetFile(); + auto mytree = std::make_shared("mytree", "mytree"); + + Fill(mytree, i * nevents, nevents); + + mytree->Write(); + myfile->Write(); + }); + } + + for (auto &&t : threads) t.join(); + } +} + +TEST(TBufferMerger, CheckTreeFillResults) +{ + int sum_s, sum_p; + + { // sum of all branch values in sequential mode + auto f = std::unique_ptr(TFile::Open("tbuffermerger_sequential.root")); + auto t = std::unique_ptr((TTree *)f->Get("mytree")); + + int n, sum = 0; + int nentries = (int)t->GetEntries(); + + t->SetBranchAddress("n", &n); + + for (int i = 0; i < nentries; ++i) { + t->GetEntry(i); + sum += n; + } + + sum_s = sum; + } + + { // sum of all branch values in parallel mode + auto f = std::unique_ptr(TFile::Open("tbuffermerger_parallel.root")); + auto t = std::unique_ptr((TTree *)f->Get("mytree")); + + int n, sum = 0; + int nentries = (int)t->GetEntries(); + + t->SetBranchAddress("n", &n); + + for (int i = 0; i < nentries; ++i) { + t->GetEntry(i); + sum += n; + } + + sum_p = sum; + } + + // Note: 0 + 1 + ... + 1024 = 523776 + + EXPECT_EQ(523776, sum_s); + EXPECT_EQ(523776, sum_p); +} diff --git a/tutorials/multicore/mt103_fillNtuples.C b/tutorials/multicore/mt103_fillNtuples.C new file mode 100644 index 0000000000000..65db638d7be8f --- /dev/null +++ b/tutorials/multicore/mt103_fillNtuples.C @@ -0,0 +1,69 @@ +/// \file +/// \ingroup tutorial_multicore +/// +/// Fill an Ntuple in distinct workers, and write the output to a file. +/// This tutorial illustrates the basics of how it's possible with ROOT +/// to write simultaneously to a single output file using TBufferMerger. +/// +/// \macro_code +/// +/// \author Guilherme Amadio +/// \date May 2017 + +#include + +using ROOT::Experimental::TBufferMerger; +using ROOT::Experimental::TBufferMergerFile; + +#include +#include + +// A simple function to fill the ntuple with random values +void fill(TNtuple &ntuple, size_t n) +{ + std::random_device rd; + std::default_random_engine rng(rd()); + std::normal_distribution dist(0.0, 1.0); + + auto gaussian_random = [&]() { return dist(rng); }; + + for (auto i : ROOT::TSeqI(n)) ntuple.Fill(gaussian_random()); +} + +void mt103_fillNtuples() +{ + // Avoid unnecessary output + gROOT->SetBatch(); + + // Make ROOT thread-safe + ROOT::EnableThreadSafety(); + + // Total number of events + const size_t nEvents = 65535; + + // Match number of threads to what the hardware can do + const size_t nWorkers = std::thread::hardware_concurrency(); + + // Split work in equal parts + const size_t nEventsPerWorker = nEvents / nWorkers; + + // Create the TBufferMerger + TBufferMerger merger("mp103_fillNtuple.root"); + + // Define what each worker will do + auto work_function = [&]() { + auto f = merger.GetFile(); + TNtuple ntrand("ntrand", "Random Numbers", "r"); + fill(ntrand, nEventsPerWorker); + ntrand.Write(); + f->Write(); + }; + + // Create worker threads + std::vector workers; + + for (auto i : ROOT::TSeqI(nWorkers)) workers.emplace_back(work_function); + + // Make sure workers are done + for (auto &&worker : workers) worker.join(); +}