Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions core/base/inc/TDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,7 @@ class TDirectory : public TNamed {
if ( newCurrent ) newCurrent->cd();
else CdNull();
}
~TContext()
{
// Destructor. Reset the current directory to its
// previous state.
if ( fDirectory ) {
fDirectory->UnregisterContext(this);
fDirectory->cd();
}
else CdNull();
}
~TContext();
};

protected:
Expand Down
14 changes: 14 additions & 0 deletions core/base/src/TDirectory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,20 @@ TDirectory::~TDirectory()
}
}

////////////////////////////////////////////////////////////////////////////////
/// Destructor.

TDirectory::TContext::~TContext()
{
// Destructor. Reset the current directory to its
// previous state.
if (fDirectory) {
fDirectory->UnregisterContext(this);
fDirectory->cd();
} else
CdNull();
}

////////////////////////////////////////////////////////////////////////////////
/// Sets the flag controlling the automatic add objects like histograms, TGraph2D, etc
/// in memory
Expand Down
14 changes: 10 additions & 4 deletions io/io/inc/ROOT/TBufferMerger.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <thread>

class TBufferFile;
class TFile;

namespace ROOT {
namespace Experimental {
Expand Down Expand Up @@ -50,6 +51,11 @@ public:
*/
TBufferMerger(const char *name, Option_t *option = "RECREATE", Int_t compress = 1);

/** Constructor
* @param output Output \c TFile
*/
TBufferMerger(std::unique_ptr<TFile> output);

/** Destructor */
virtual ~TBufferMerger();

Expand Down Expand Up @@ -100,12 +106,12 @@ private:
/** TBufferMerger has no copy operator */
TBufferMerger &operator=(const TBufferMerger &);

void Init(std::unique_ptr<TFile>);

void Push(TBufferFile *buffer);
void WriteOutputFile();

const std::string fName;
const std::string fOption;
const Int_t fCompress;
TFile* fFile; //< Output file.
size_t fAutoSave; //< AutoSave only every fAutoSave bytes
std::mutex fQueueMutex; //< Mutex used to lock fQueue
std::condition_variable fDataAvailable; //< Condition variable used to wait for data
Expand All @@ -114,7 +120,7 @@ private:
std::vector<std::weak_ptr<TBufferMergerFile>> fAttachedFiles; //< Attached files
std::function<void(void)> fCallback; //< Callback for when data is removed from queue

ClassDef(TBufferMerger, 0);
ClassDef(TBufferMerger, 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this? This only matters for classes that are streamed. This should have remained as 0.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, when you have a ClassDef and you change data members you also should bump the ClassDef version.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, when you have a ClassDef and you change data members you also should bump the ClassDef version.

only when you change a persistent member :) ... and version == 0 means that all data members are implicitly marked transient.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall I revert it back to 0?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, done. I don't understand how is that a non persistent data member but you are the expert.

};

/**
Expand Down
3 changes: 3 additions & 0 deletions io/io/inc/TFileMerger.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include "TString.h"
#include "TStopwatch.h"

#include <memory>

class TList;
class TFile;
class TDirectory;
Expand Down Expand Up @@ -97,6 +99,7 @@ class TFileMerger : public TObject {
virtual Bool_t OutputFile(const char *url, Bool_t force, Int_t compressionLevel);
virtual Bool_t OutputFile(const char *url, const char *mode = "RECREATE");
virtual Bool_t OutputFile(const char *url, const char *mode, Int_t compressionLevel);
virtual Bool_t OutputFile(std::unique_ptr<TFile> file);
virtual void PrintFiles(Option_t *options);
virtual Bool_t Merge(Bool_t = kTRUE);
virtual Bool_t PartialMerge(Int_t type = kAll | kIncremental);
Expand Down
25 changes: 21 additions & 4 deletions io/io/src/TBufferMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,26 @@ namespace ROOT {
namespace Experimental {

TBufferMerger::TBufferMerger(const char *name, Option_t *option, Int_t compress)
: fName(name), fOption(option), fCompress(compress), fAutoSave(0),
fMergingThread(new std::thread([&]() { this->WriteOutputFile(); }))
{
// NOTE: We cannot use ctor chaining or in-place initialization because we want this operation to have no effect on
// ROOT's gDirectory.
TDirectory::TContext ctxt;
if (TFile *output = TFile::Open(name, option, /*title*/ name, compress))
Init(std::unique_ptr<TFile>(output));
else
Error("OutputFile", "cannot open the MERGER output file %s", name);
}

TBufferMerger::TBufferMerger(std::unique_ptr<TFile> output)
{
Init(std::move(output));
}

void TBufferMerger::Init(std::unique_ptr<TFile> output)
{
fFile = output.release();
fAutoSave = 0;
fMergingThread.reset(new std::thread([&]() { this->WriteOutputFile(); }));
}

TBufferMerger::~TBufferMerger()
Expand Down Expand Up @@ -84,7 +101,7 @@ void TBufferMerger::WriteOutputFile()

{
R__LOCKGUARD(gROOTMutex);
merger.OutputFile(fName.c_str(), fOption.c_str(), fCompress);
merger.OutputFile(std::unique_ptr<TFile>(fFile));
}

while (true) {
Expand All @@ -106,7 +123,7 @@ void TBufferMerger::WriteOutputFile()

{
R__LOCKGUARD(gROOTMutex);
memfiles.push_back(new TMemFile(fName.c_str(), buffer->Buffer() + buffer->Length(), length, "read"));
memfiles.push_back(new TMemFile(fFile->GetName(), buffer->Buffer() + buffer->Length(), length, "read"));
buffer->SetBufferOffset(buffer->Length() + length);
merger.AddFile(memfiles.back(), false);

Expand Down
2 changes: 1 addition & 1 deletion io/io/src/TBufferMergerFile.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace ROOT {
namespace Experimental {

TBufferMergerFile::TBufferMergerFile(TBufferMerger &m)
: TMemFile(m.fName.c_str(), "recreate", "", m.fCompress), fMerger(m)
: TMemFile(m.fFile->GetName(), "recreate", "", m.fFile->GetCompressionSettings()), fMerger(m)
{
}

Expand Down
36 changes: 29 additions & 7 deletions io/io/src/TFileMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -311,20 +311,42 @@ Bool_t TFileMerger::OutputFile(const char *outputfile, Bool_t force)

Bool_t TFileMerger::OutputFile(const char *outputfile, const char *mode, Int_t compressionLevel)
{
// We want gDirectory untouched by anything going on here
TDirectory::TContext ctxt;
if (TFile *outputFile = TFile::Open(outputfile, mode, "", compressionLevel))
return OutputFile(std::unique_ptr<TFile>(outputFile));

Error("OutputFile", "cannot open the MERGER output file %s", fOutputFilename.Data());
return kFALSE;
}

////////////////////////////////////////////////////////////////////////////////
/// Set an output file opened externally by the users

Bool_t TFileMerger::OutputFile(std::unique_ptr<TFile> outputfile)
{
if (!outputfile || outputfile->IsZombie()) {
Error("OutputFile", "cannot open the MERGER output file %s", (outputfile) ? outputfile->GetName() : "");
return kFALSE;
}

if (!outputfile->IsWritable()) {
Error("OutputFile", "output file %s is not writable", outputfile->GetName());
return kFALSE;
}

fExplicitCompLevel = kTRUE;

TFile *oldfile = fOutputFile;
fOutputFile = 0; // This avoids the complaint from RecursiveRemove about the file being deleted which is here spurrious. (see RecursiveRemove).
fOutputFile = 0; // This avoids the complaint from RecursiveRemove about the file being deleted which is here
// spurrious. (see RecursiveRemove).
SafeDelete(oldfile);

fOutputFilename = outputfile;

fOutputFilename = outputfile->GetName();
// We want gDirectory untouched by anything going on here
TDirectory::TContext ctxt;
if (!(fOutputFile = TFile::Open(outputfile, mode, "", compressionLevel)) || fOutputFile->IsZombie()) {
Error("OutputFile", "cannot open the MERGER output file %s", fOutputFilename.Data());
return kFALSE;
}
fOutputFile = outputfile.release(); // Transfer the ownership of the file.

return kTRUE;
}

Expand Down
2 changes: 1 addition & 1 deletion io/io/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ROOT_ADD_GTEST(testTBufferMerger TBufferMerger.cxx LIBRARIES RIO Tree)
ROOT_ADD_GTEST(IOTests TBufferMerger.cxx TFileMergerTests.cxx LIBRARIES RIO Tree)
1 change: 1 addition & 0 deletions io/io/test/TBufferMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ TEST(TBufferMerger, CheckTreeFillResults)
{ // sum of all branch values in parallel mode
TFile f("tbuffermerger_parallel.root");
auto t = (TTree *)f.Get("mytree");
ASSERT_TRUE(t != nullptr);

int n, sum = 0;
int nentries = (int)t->GetEntries();
Expand Down
98 changes: 98 additions & 0 deletions io/io/test/TFileMergerTests.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#include "TFileMerger.h"

#include "TMemFile.h"
#include "TTree.h"

#include "gtest/gtest.h"

namespace {
using testing::internal::GetCapturedStderr;
using testing::internal::CaptureStderr;
using testing::internal::RE;
class ExpectedErrorRAII {
std::string ExpectedRegex;
void pop()
{
std::string Seen = GetCapturedStderr();
bool match = RE::FullMatch(Seen, RE(ExpectedRegex));
EXPECT_TRUE(match);
if (!match) {
std::string msg = "Match failed!\nSeen: '" + Seen + "'\nRegex: '" + ExpectedRegex + "'\n";
GTEST_NONFATAL_FAILURE_(msg.c_str());
}
}

public:
ExpectedErrorRAII(std::string E) : ExpectedRegex(E) { CaptureStderr(); }
~ExpectedErrorRAII() { pop(); }
};
}

#define EXPECT_ROOT_ERROR(expression, expected_error) \
{ \
ExpectedErrorRAII EE(expected_error); \
expression; \
}

static void CreateATuple(TMemFile &file, const char *name, double value)
{
auto mytree = new TTree(name, "A tree");
// FIXME: We inherit EnableImplicitIMT from TBufferMerger tests (we are sharing the same executable) where we call
// EnableThreadSafety(). Here, we hit a race condition in TBranch::FlushBaskets. Once we get that fixed we probably
// should re-enable implicit MT.
//
// In general, we should probably have a way to conditionally enable/disable thread safety.
mytree->SetImplicitMT(false);

mytree->SetDirectory(&file);
mytree->Branch(name, &value);
mytree->Fill();
file.Write();
}

static void CheckTree(TMemFile &file, const char *name, double expectedValue)
{
auto t = static_cast<TTree *>(file.Get(name));
ASSERT_TRUE(t != nullptr);

double d;
t->SetBranchAddress(name, &d);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting branch address to a stack address requires to either call t->ResetBranchAddresses(); or delete the TTree (otherwise it is technically in an invalid state).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean that file.Get(name) allocates an object and it is the user's responsibility to deallocate it? If yes, I assume I can transform it into a unique_ptr and would that work well with having a t->ResetBranchAddresses();?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean that file.Get(name) allocates an object and it is the user's responsibility to deallocate it?

In the case of a TTree, the point is an implicit shared pointer. You can delete it (and the TFile will be informed) and you can let the TFile delete it (but your are not directly inform when it does unless you are registered in the list of cleanups).

If yes, I assume I can transform it into a unique_ptr

Yes, you can as long as you make sure that the lifetime of the unique_ptr is less or equal to the litetime of the TFile.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be done.

t->GetEntry(0);
EXPECT_EQ(expectedValue, d);
// Setting branch address to a stack address requires to either call ResetBranchAddresses or delete the tree.
t->ResetBranchAddresses();
}

TEST(TFileMerger, CreateWithTFilePointer)
{
TMemFile a("a.root", "RECREATE");
CreateATuple(a, "a_tree", 1.);

TMemFile b("b.root", "RECREATE");
CreateATuple(b, "b_tree", 2.);

TFileMerger merger;
auto output = std::unique_ptr<TMemFile>(new TMemFile("output.root", "CREATE"));
bool success = merger.OutputFile(std::move(output));

ASSERT_TRUE(success);

merger.AddFile(&a, false);
merger.AddFile(&b, false);
// FIXME: Calling merger.Merge() will call Close() and *delete* output.
merger.PartialMerge();

auto &result = *static_cast<TMemFile *>(merger.GetOutputFile());
CheckTree(result, "a_tree", 1);
CheckTree(result, "b_tree", 2);
}

TEST(TFileMerger, CreateWithUnwritableTFilePointer)
{
TFileMerger merger;
auto output = std::unique_ptr<TMemFile>(new TMemFile("output.root", "RECREATE"));
// FIXME: The ctor of TMemFile sets the 'zombie' flag to all TMemFiles whose options are different than CREATE and
// RECREATE. We should probably fix the API but until then work around it.
output->SetWritable(false);
EXPECT_ROOT_ERROR(merger.OutputFile(std::move(output)), "Error in .* output file output.root is not writable\n");
}