Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ if(NOT WIN32)
endif()
ROOT_EXECUTABLE(root.exe rmain.cxx LIBRARIES Core Rint)
ROOT_EXECUTABLE(proofserv.exe pmain.cxx LIBRARIES Core MathCore)
ROOT_EXECUTABLE(hadd hadd.cxx LIBRARIES Core RIO Net Hist Graf Graf3d Gpad Tree Matrix MathCore Thread)
ROOT_EXECUTABLE(hadd hadd.cxx LIBRARIES Core RIO Net Hist Graf Graf3d Gpad Tree Matrix MathCore MultiProc)
ROOT_EXECUTABLE(rootnb.exe nbmain.cxx LIBRARIES Core)

if(fortran AND CMAKE_Fortran_COMPILER)
Expand Down
210 changes: 172 additions & 38 deletions main/src/hadd.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@
#include "Riostream.h"
#include "TClass.h"
#include "TSystem.h"
#include "TUUID.h"
#include "ROOT/StringConv.hxx"
#include <stdlib.h>
#include <climits>

#include "TFileMerger.h"
#include "ROOT/TProcessExecutor.hxx"

////////////////////////////////////////////////////////////////////////////////

Expand All @@ -104,6 +106,13 @@ int main( int argc, char **argv )
std::cout << "If the option -O is used, when merging TTree, the basket size is re-optimized" <<std::endl;
std::cout << "If the option -v is used, explicitly set the verbosity level;\n"\
" 0 request no output, 99 is the default" <<std::endl;
std::cout << "If the option -j is used, the execution will be parallelized in multiple processes\n" << std::endl;
std::cout << "If the option -dbg is used, the execution will be parallelized in multiple processes in debug mode."
" This will not delete the partial files stored in the working directory\n"
<< std::endl;
std::cout << "If the option -d is used, the partial multiprocess execution will be carried out in the specified "
"directory\n"
<< std::endl;
std::cout << "If the option -n is used, hadd will open at most 'maxopenedfiles' at once, use 0\n"
" to request to use the system maximum." << std::endl;
std::cout << "If the option -cachesize is used, hadd will resize (or disable if 0) the\n"
Expand Down Expand Up @@ -139,10 +148,15 @@ int main( int argc, char **argv )
Bool_t noTrees = kFALSE;
Bool_t keepCompressionAsIs = kFALSE;
Bool_t useFirstInputCompression = kFALSE;
Bool_t multiproc = kFALSE;
Bool_t debug = kFALSE;
Int_t maxopenedfiles = 0;
Int_t verbosity = 99;
TString cacheSize;

SysInfo_t s;
gSystem->GetSysInfo(&s);
auto nProcesses = s.fCpus;
auto workingDir = gSystem->TempDirectory();
int outputPlace = 0;
int ffirst = 2;
Int_t newcomp = -1;
Expand All @@ -162,6 +176,53 @@ int main( int argc, char **argv )
} else if ( strcmp(argv[a],"-O") == 0 ) {
reoptimize = kTRUE;
++ffirst;
} else if (strcmp(argv[a], "-dbg") == 0) {
debug = kTRUE;
verbosity = kTRUE;
++ffirst;
} else if (strcmp(argv[a], "-d") == 0) {
if (a + 1 != argc && argv[a + 1][0] != '-') {
if (gSystem->AccessPathName(argv[a + 1])) {
std::cerr << "Error: could not access the directory specified: " << argv[a + 1]
<< ". We will use the system's temporal directory.\n";
} else {
workingDir = argv[a + 1];
}
++a;
++ffirst;
} else {
std::cout << "-d: no directory specified. We will use the system's temporal directory.\n";
}
++ffirst;
} else if (strcmp(argv[a], "-j") == 0) {
// If the number of processes is not specified, use the default.
if (a + 1 != argc && argv[a + 1][0] != '-') {
// number of processes specified
Long_t request = 1;
for (char *c = argv[a + 1]; *c != '\0'; ++c) {
if (!isdigit(*c)) {
// Wrong number of Processes. Use the default:
std::cerr << "Error: could not parse the number of processes to run in parallel passed after -j: "
<< argv[a + 1] << ". We will use the system maximum.\n";
request = 0;
break;
}
}
if (request == 1) {
request = strtol(argv[a + 1], 0, 10);
if (request < kMaxLong && request >= 0) {
nProcesses = (Int_t)request;
++a;
++ffirst;
std::cout << "Parallelizing with " << nProcesses << " processes.\n";
} else {
std::cerr << "Error: could not parse the number of processes to use passed after -j: " << argv[a + 1]
<< ". We will use the default value (number of logical cores).\n";
}
}
}
multiproc = kTRUE;
++ffirst;
} else if ( strcmp(argv[a],"-cachesize=") == 0 ) {
int size;
static const size_t arglen = strlen("-cachesize=");
Expand Down Expand Up @@ -308,11 +369,11 @@ int main( int argc, char **argv )
std::cout << "hadd Target file: " << targetname << std::endl;
}

TFileMerger merger(kFALSE,kFALSE);
merger.SetMsgPrefix("hadd");
merger.SetPrintLevel(verbosity - 1);
TFileMerger fileMerger(kFALSE, kFALSE);
fileMerger.SetMsgPrefix("hadd");
fileMerger.SetPrintLevel(verbosity - 1);
if (maxopenedfiles > 0) {
merger.SetMaxOpenedFiles(maxopenedfiles);
fileMerger.SetMaxOpenedFiles(maxopenedfiles);
}
if (newcomp == -1) {
if (useFirstInputCompression || keepCompressionAsIs) {
Expand Down Expand Up @@ -348,62 +409,135 @@ int main( int argc, char **argv )
std::cout << "hadd compression setting for all ouput: " << newcomp << '\n';
}
if (append) {
if (!merger.OutputFile(targetname,"UPDATE",newcomp)) {
if (!fileMerger.OutputFile(targetname, "UPDATE", newcomp)) {
std::cerr << "hadd error opening target file for update :" << argv[ffirst-1] << "." << std::endl;
exit(2);
}
} else if (!merger.OutputFile(targetname,force,newcomp) ) {
} else if (!fileMerger.OutputFile(targetname, force, newcomp)) {
std::cerr << "hadd error opening target file (does " << argv[ffirst-1] << " exist?)." << std::endl;
if (!force) std::cerr << "Pass \"-f\" argument to force re-creation of output file." << std::endl;
exit(1);
}

auto filesToProcess = argc - ffirst;
auto step = (filesToProcess + nProcesses - 1) / nProcesses;
if (multiproc && step < 3) {
// At least 3 files per process
step = 3;
nProcesses = (filesToProcess + step - 1) / step;
std::cout << "Each process should handle at least 3 files for efficiency.";
std::cout << " Setting the number of processes to: " << nProcesses << std::endl;
}
std::vector<std::string> partialFiles;

if (multiproc) {
auto uuid = TUUID();
auto partialTail = uuid.AsString();
for (auto i = 0; (i * step) < filesToProcess; i++) {
std::stringstream buffer;
buffer << workingDir << "/partial" << i << "_" << partialTail << ".root";
partialFiles.emplace_back(buffer.str());
}
}

for ( int i = ffirst; i < argc; i++ ) {
if (argv[i] && argv[i][0]=='@') {
std::ifstream indirect_file(argv[i]+1);
if( ! indirect_file.is_open() ) {
std::cerr<< "hadd could not open indirect file " << (argv[i]+1) << std::endl;
return 1;
auto mergeFiles = [&](TFileMerger &merger) {
if (reoptimize) {
merger.SetFastMethod(kFALSE);
} else {
if (!keepCompressionAsIs && merger.HasCompressionChange()) {
// Don't warn if the user any request re-optimization.
std::cout << "hadd Sources and Target have different compression levels" << std::endl;
std::cout << "hadd merging will be slower" << std::endl;
}
while( indirect_file ){
std::string line;
if( std::getline(indirect_file, line) && line.length() && !merger.AddFile(line.c_str()) ) {
return 1;
}
merger.SetNotrees(noTrees);
merger.SetMergeOptions(cacheSize);
Bool_t status;
if (append)
status = merger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kAll);
else
status = merger.Merge();
return status;
};

auto sequentialMerge = [&](TFileMerger &merger, int start, int nFiles) {

for (auto i = start; i < (start + nFiles) && i < argc; i++) {
if (argv[i] && argv[i][0] == '@') {
std::ifstream indirect_file(argv[i] + 1);
if (!indirect_file.is_open()) {
std::cerr << "hadd could not open indirect file " << (argv[i] + 1) << std::endl;
return kFALSE;
}
while (indirect_file) {
std::string line;
if (std::getline(indirect_file, line) && line.length() && !merger.AddFile(line.c_str())) {
return kFALSE;
}
}
} else if (!merger.AddFile(argv[i])) {
if (skip_errors) {
std::cerr << "hadd skipping file with error: " << argv[i] << std::endl;
} else {
std::cerr << "hadd exiting due to error in " << argv[i] << std::endl;
}
return kFALSE;
}
} else if( ! merger.AddFile(argv[i]) ) {
if ( skip_errors ) {
std::cerr << "hadd skipping file with error: " << argv[i] << std::endl;
} else {
std::cerr << "hadd exiting due to error in " << argv[i] << std::endl;
return 1;
}
return mergeFiles(merger);
};

auto parallelMerge = [&](int start) {
TFileMerger mergerP(kFALSE, kFALSE);
mergerP.SetMsgPrefix("hadd");
mergerP.SetPrintLevel(verbosity - 1);
if (maxopenedfiles > 0) {
mergerP.SetMaxOpenedFiles(maxopenedfiles / nProcesses);
}
if (!mergerP.OutputFile(partialFiles[(start - ffirst) / step].c_str(), newcomp)) {
std::cerr << "hadd error opening target partial file" << std::endl;
exit(1);
}
return sequentialMerge(mergerP, start, step);
};

auto reductionFunc = [&]() {
for (auto pf : partialFiles) {
fileMerger.AddFile(pf.c_str());
}
return mergeFiles(fileMerger);
};

Bool_t status;

if (multiproc) {
ROOT::TProcessExecutor p(nProcesses);
auto res = p.Map(parallelMerge, ROOT::TSeqI(ffirst, argc, step));
status = std::accumulate(res.begin(), res.end(), 0U) == partialFiles.size();
if (status) {
status = reductionFunc();
} else {
std::cout << "hadd failed at the parallel stage" << std::endl;
}
if (!debug) {
for (auto pf : partialFiles) {
gSystem->Unlink(pf.c_str());
}
}
}
if (reoptimize) {
merger.SetFastMethod(kFALSE);
} else {
if (!keepCompressionAsIs && merger.HasCompressionChange()) {
// Don't warn if the user any request re-optimization.
std::cout <<"hadd Sources and Target have different compression levels"<<std::endl;
std::cout <<"hadd merging will be slower"<<std::endl;
}
status = sequentialMerge(fileMerger, ffirst, filesToProcess);
}
merger.SetNotrees(noTrees);
merger.SetMergeOptions(cacheSize);
Bool_t status;
if (append) status = merger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kAll);
else status = merger.Merge();

if (status) {
if (verbosity == 1) {
std::cout << "hadd merged " << merger.GetMergeList()->GetEntries() << " input files in " << targetname << ".\n";
std::cout << "hadd merged " << fileMerger.GetMergeList()->GetEntries() << " input files in " << targetname
<< ".\n";
}
return 0;
} else {
if (verbosity == 1) {
std::cout << "hadd failure during the merge of " << merger.GetMergeList()->GetEntries() << " input files in " << targetname << ".\n";
std::cout << "hadd failure during the merge of " << fileMerger.GetMergeList()->GetEntries()
<< " input files in " << targetname << ".\n";
}
return 1;
}
Expand Down