Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Allow hadd to run in multiprocess
  • Loading branch information
xvallspl committed Apr 9, 2017
commit 4e8c22661b91aed78fd7c67e173b851f7122c8c6
2 changes: 1 addition & 1 deletion main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ if(NOT WIN32)
endif()
ROOT_EXECUTABLE(root.exe rmain.cxx LIBRARIES Core Rint)
ROOT_EXECUTABLE(proofserv.exe pmain.cxx LIBRARIES Core MathCore)
ROOT_EXECUTABLE(hadd hadd.cxx LIBRARIES Core RIO Net Hist Graf Graf3d Gpad Tree Matrix MathCore Thread)
ROOT_EXECUTABLE(hadd hadd.cxx LIBRARIES Core RIO Net Hist Graf Graf3d Gpad Tree Matrix MathCore MultiProc)
ROOT_EXECUTABLE(rootnb.exe nbmain.cxx LIBRARIES Core)

if(fortran AND CMAKE_Fortran_COMPILER)
Expand Down
174 changes: 136 additions & 38 deletions main/src/hadd.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
#include <climits>

#include "TFileMerger.h"
#include "ROOT/TProcessExecutor.hxx"

////////////////////////////////////////////////////////////////////////////////

Expand All @@ -104,6 +105,7 @@ int main( int argc, char **argv )
std::cout << "If the option -O is used, when merging TTree, the basket size is re-optimized" <<std::endl;
std::cout << "If the option -v is used, explicitly set the verbosity level;\n"\
" 0 request no output, 99 is the default" <<std::endl;
std::cout << "If the option -m is used, the execution will be parallelized in multiple processes\n" <<std::endl;
std::cout << "If the option -n is used, hadd will open at most 'maxopenedfiles' at once, use 0\n"
" to request to use the system maximum." << std::endl;
std::cout << "If the option -cachesize is used, hadd will resize (or disable if 0) the\n"
Expand Down Expand Up @@ -139,9 +141,13 @@ int main( int argc, char **argv )
Bool_t noTrees = kFALSE;
Bool_t keepCompressionAsIs = kFALSE;
Bool_t useFirstInputCompression = kFALSE;
Bool_t multiproc = kFALSE;
Int_t maxopenedfiles = 0;
Int_t verbosity = 99;
TString cacheSize;
SysInfo_t s;
gSystem->GetSysInfo(&s);
auto nProcesses = s.fCpus;

int outputPlace = 0;
int ffirst = 2;
Expand All @@ -162,6 +168,33 @@ int main( int argc, char **argv )
} else if ( strcmp(argv[a],"-O") == 0 ) {
reoptimize = kTRUE;
++ffirst;
} else if ( strcmp(argv[a],"-j") == 0 ) {
// If the number of processes is not specified, use the default.
if (a+1 != argc && argv[a+1][0] != '-') {
// number of processes specified
Long_t request = 1;
for (char *c = argv[a+1]; *c != '\0'; ++c) {
if (!isdigit(*c)) {
// Wrong number of Processes. Use the default:
std::cerr << "Error: could not parse the number of processes to run in parallel passed after -j: " << argv[a+1] << ". We will use the system maximum.\n";
request = 0;
break;
}
}
if (request == 1) {
request = strtol(argv[a+1], 0, 10);
if (request < kMaxLong && request >= 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that request is a long isn't the first part always true? If I remember correctly, one has to check errno to figure out if something went wrong in strtol.

Copy link
Contributor Author

@xvallspl xvallspl Apr 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily.

From cpp reference:

  • If successful, an integer value corresponding to the contents of str is returned.
  • If the converted value falls out of range of corresponding return type, a range error occurs (setting errno to ERANGE) and LONG_MAX, LONG_MIN, LLONG_MAX or LLONG_MIN is returned.
  • If no conversion can be performed, ​0​ is returned

Which means that this may be incorrect when no conversion can be performed. There are a couple more places were hadd was already doing that. I'll check.

nProcesses = (Int_t)request;
++a;
++ffirst;
std::cout << "Parallelizing with "<<nProcesses<<" processes.\n";
} else {
std::cerr << "Error: could not parse the number of processes to use passed after -j: " << argv[a+1] << ". We will use the default value (number of logical cores).\n";
}
}
}
multiproc = kTRUE;
++ffirst;
} else if ( strcmp(argv[a],"-cachesize=") == 0 ) {
int size;
static const size_t arglen = strlen("-cachesize=");
Expand Down Expand Up @@ -308,11 +341,11 @@ int main( int argc, char **argv )
std::cout << "hadd Target file: " << targetname << std::endl;
}

TFileMerger merger(kFALSE,kFALSE);
merger.SetMsgPrefix("hadd");
merger.SetPrintLevel(verbosity - 1);
TFileMerger fileMerger(kFALSE,kFALSE);
fileMerger.SetMsgPrefix("hadd");
fileMerger.SetPrintLevel(verbosity - 1);
if (maxopenedfiles > 0) {
merger.SetMaxOpenedFiles(maxopenedfiles);
fileMerger.SetMaxOpenedFiles(maxopenedfiles);
}
if (newcomp == -1) {
if (useFirstInputCompression || keepCompressionAsIs) {
Expand Down Expand Up @@ -348,62 +381,127 @@ int main( int argc, char **argv )
std::cout << "hadd compression setting for all ouput: " << newcomp << '\n';
}
if (append) {
if (!merger.OutputFile(targetname,"UPDATE",newcomp)) {
if (!fileMerger.OutputFile(targetname,"UPDATE",newcomp)) {
std::cerr << "hadd error opening target file for update :" << argv[ffirst-1] << "." << std::endl;
exit(2);
}
} else if (!merger.OutputFile(targetname,force,newcomp) ) {
} else if (!fileMerger.OutputFile(targetname,force,newcomp) ) {
std::cerr << "hadd error opening target file (does " << argv[ffirst-1] << " exist?)." << std::endl;
if (!force) std::cerr << "Pass \"-f\" argument to force re-creation of output file." << std::endl;
exit(1);
}

auto filesToProcess = argc - ffirst;
auto step = (filesToProcess+nProcesses-1)/nProcesses;
if(step < 3){
//At least 3 files per process
step = 3;
nProcesses = (filesToProcess+step-1)/step;
std::cout<<"Each process should handle at least 3 files for efficiency.";
std::cout<<" Setting the number of processes to: "<<nProcesses<<std::endl;
}
std::vector<std::string> partialFiles;

if(multiproc){
for(auto i = 0; (i*step)<filesToProcess; i++) {
std::stringstream buffer;
buffer <<"partial"<<i<<".root";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I read correctly the intermediary files name are "partial1.root", "partial2.root". If this is the case, we may want to enhance the name to allow for the possibility of running two hadd in the same directory at the same time (i.e. at the moment this is not possible).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Will look into that.

partialFiles.emplace_back(buffer.str());
}
}

for ( int i = ffirst; i < argc; i++ ) {
if (argv[i] && argv[i][0]=='@') {
std::ifstream indirect_file(argv[i]+1);
if( ! indirect_file.is_open() ) {
std::cerr<< "hadd could not open indirect file " << (argv[i]+1) << std::endl;
return 1;
auto mergeFiles = [&] (TFileMerger &merger){
if (reoptimize) {
merger.SetFastMethod(kFALSE);
} else {
if (!keepCompressionAsIs && merger.HasCompressionChange()) {
// Don't warn if the user any request re-optimization.
std::cout <<"hadd Sources and Target have different compression levels"<<std::endl;
std::cout <<"hadd merging will be slower"<<std::endl;
}
while( indirect_file ){
std::string line;
if( std::getline(indirect_file, line) && line.length() && !merger.AddFile(line.c_str()) ) {
return 1;
}
merger.SetNotrees(noTrees);
merger.SetMergeOptions(cacheSize);
Bool_t status;
if (append) status = merger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kAll);
else status = merger.Merge();
return status;
};

auto sequentialMerge = [&](TFileMerger &merger, int start, int nFiles){

for ( auto i = start; i < (start + nFiles) && i<argc; i++ ) {
if (argv[i] && argv[i][0]=='@') {
std::ifstream indirect_file(argv[i]+1);
if( ! indirect_file.is_open() ) {
std::cerr<< "hadd could not open indirect file " << (argv[i]+1) << std::endl;
return kFALSE;
}
}
} else if( ! merger.AddFile(argv[i]) ) {
if ( skip_errors ) {
std::cerr << "hadd skipping file with error: " << argv[i] << std::endl;
} else {
std::cerr << "hadd exiting due to error in " << argv[i] << std::endl;
return 1;
while( indirect_file ){
std::string line;
if( std::getline(indirect_file, line) && line.length() && !merger.AddFile(line.c_str()) ) {
return kFALSE;
}
}
} else if( ! merger.AddFile(argv[i]) ) {
if ( skip_errors ) {
std::cerr << "hadd skipping file with error: " << argv[i] << std::endl;
} else {
std::cerr << "hadd exiting due to error in " << argv[i] << std::endl;
}
return kFALSE;
}
}
}
if (reoptimize) {
merger.SetFastMethod(kFALSE);
} else {
if (!keepCompressionAsIs && merger.HasCompressionChange()) {
// Don't warn if the user any request re-optimization.
std::cout <<"hadd Sources and Target have different compression levels"<<std::endl;
std::cout <<"hadd merging will be slower"<<std::endl;
return mergeFiles(merger);
};

auto parallelMerge = [&](int start){
TFileMerger mergerP(kFALSE,kFALSE);
mergerP.SetMsgPrefix("hadd");
mergerP.SetPrintLevel(verbosity - 1);
if (maxopenedfiles > 0) {
mergerP.SetMaxOpenedFiles(maxopenedfiles/nProcesses);
}
}
merger.SetNotrees(noTrees);
merger.SetMergeOptions(cacheSize);
if (!mergerP.OutputFile(partialFiles[(start-ffirst)/step].c_str(), newcomp)) {
std::cerr << "hadd error opening target partial file"<<std::endl;
exit(1);
}
return sequentialMerge(mergerP, start, step);
};

auto reductionFunc = [&](){
for(auto pf:partialFiles){
fileMerger.AddFile(pf.c_str());
}
return mergeFiles(fileMerger);
};

Bool_t status;
if (append) status = merger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kAll);
else status = merger.Merge();

if(multiproc) {
ROOT::TProcessExecutor p(nProcesses);
auto res = p.Map(parallelMerge, ROOT::TSeqI(ffirst, argc, step));
status = std::accumulate(res.begin(), res.end(), 0U) == partialFiles.size();
if(status){
status = reductionFunc();
} else{
std::cout<<"hadd failed at the parallel stage"<<std::endl;
}
for(auto pf:partialFiles){
gSystem->Unlink(pf.c_str());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be helpfull for debugging to have a way to disable the unlink of the intermediary files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be useful, yes.
How? Adding another option? This will only be used in the multiprocess case, so maybe a combination of the two? Something like -jdbg?

}
} else{
status = sequentialMerge(fileMerger, ffirst, filesToProcess);
}

if (status) {
if (verbosity == 1) {
std::cout << "hadd merged " << merger.GetMergeList()->GetEntries() << " input files in " << targetname << ".\n";
std::cout << "hadd merged " << fileMerger.GetMergeList()->GetEntries() << " input files in " << targetname << ".\n";
}
return 0;
} else {
if (verbosity == 1) {
std::cout << "hadd failure during the merge of " << merger.GetMergeList()->GetEntries() << " input files in " << targetname << ".\n";
std::cout << "hadd failure during the merge of " << fileMerger.GetMergeList()->GetEntries() << " input files in " << targetname << ".\n";
}
return 1;
}
Expand Down