diff --git a/main/CMakeLists.txt b/main/CMakeLists.txt index a431a2b00ad0b..113f15e478299 100644 --- a/main/CMakeLists.txt +++ b/main/CMakeLists.txt @@ -14,7 +14,7 @@ if(NOT WIN32) endif() ROOT_EXECUTABLE(root.exe rmain.cxx LIBRARIES Core Rint) ROOT_EXECUTABLE(proofserv.exe pmain.cxx LIBRARIES Core MathCore) -ROOT_EXECUTABLE(hadd hadd.cxx LIBRARIES Core RIO Net Hist Graf Graf3d Gpad Tree Matrix MathCore Thread) +ROOT_EXECUTABLE(hadd hadd.cxx LIBRARIES Core RIO Net Hist Graf Graf3d Gpad Tree Matrix MathCore MultiProc) ROOT_EXECUTABLE(rootnb.exe nbmain.cxx LIBRARIES Core) if(fortran AND CMAKE_Fortran_COMPILER) diff --git a/main/src/hadd.cxx b/main/src/hadd.cxx index 812fa89a41393..bb06a11afea50 100644 --- a/main/src/hadd.cxx +++ b/main/src/hadd.cxx @@ -79,11 +79,13 @@ #include "Riostream.h" #include "TClass.h" #include "TSystem.h" +#include "TUUID.h" #include "ROOT/StringConv.hxx" #include #include #include "TFileMerger.h" +#include "ROOT/TProcessExecutor.hxx" //////////////////////////////////////////////////////////////////////////////// @@ -104,6 +106,13 @@ int main( int argc, char **argv ) std::cout << "If the option -O is used, when merging TTree, the basket size is re-optimized" <GetSysInfo(&s); + auto nProcesses = s.fCpus; + auto workingDir = gSystem->TempDirectory(); int outputPlace = 0; int ffirst = 2; Int_t newcomp = -1; @@ -162,6 +176,53 @@ int main( int argc, char **argv ) } else if ( strcmp(argv[a],"-O") == 0 ) { reoptimize = kTRUE; ++ffirst; + } else if (strcmp(argv[a], "-dbg") == 0) { + debug = kTRUE; + verbosity = kTRUE; + ++ffirst; + } else if (strcmp(argv[a], "-d") == 0) { + if (a + 1 != argc && argv[a + 1][0] != '-') { + if (gSystem->AccessPathName(argv[a + 1])) { + std::cerr << "Error: could not access the directory specified: " << argv[a + 1] + << ". We will use the system's temporal directory.\n"; + } else { + workingDir = argv[a + 1]; + } + ++a; + ++ffirst; + } else { + std::cout << "-d: no directory specified. We will use the system's temporal directory.\n"; + } + ++ffirst; + } else if (strcmp(argv[a], "-j") == 0) { + // If the number of processes is not specified, use the default. + if (a + 1 != argc && argv[a + 1][0] != '-') { + // number of processes specified + Long_t request = 1; + for (char *c = argv[a + 1]; *c != '\0'; ++c) { + if (!isdigit(*c)) { + // Wrong number of Processes. Use the default: + std::cerr << "Error: could not parse the number of processes to run in parallel passed after -j: " + << argv[a + 1] << ". We will use the system maximum.\n"; + request = 0; + break; + } + } + if (request == 1) { + request = strtol(argv[a + 1], 0, 10); + if (request < kMaxLong && request >= 0) { + nProcesses = (Int_t)request; + ++a; + ++ffirst; + std::cout << "Parallelizing with " << nProcesses << " processes.\n"; + } else { + std::cerr << "Error: could not parse the number of processes to use passed after -j: " << argv[a + 1] + << ". We will use the default value (number of logical cores).\n"; + } + } + } + multiproc = kTRUE; + ++ffirst; } else if ( strcmp(argv[a],"-cachesize=") == 0 ) { int size; static const size_t arglen = strlen("-cachesize="); @@ -308,11 +369,11 @@ int main( int argc, char **argv ) std::cout << "hadd Target file: " << targetname << std::endl; } - TFileMerger merger(kFALSE,kFALSE); - merger.SetMsgPrefix("hadd"); - merger.SetPrintLevel(verbosity - 1); + TFileMerger fileMerger(kFALSE, kFALSE); + fileMerger.SetMsgPrefix("hadd"); + fileMerger.SetPrintLevel(verbosity - 1); if (maxopenedfiles > 0) { - merger.SetMaxOpenedFiles(maxopenedfiles); + fileMerger.SetMaxOpenedFiles(maxopenedfiles); } if (newcomp == -1) { if (useFirstInputCompression || keepCompressionAsIs) { @@ -348,62 +409,135 @@ int main( int argc, char **argv ) std::cout << "hadd compression setting for all ouput: " << newcomp << '\n'; } if (append) { - if (!merger.OutputFile(targetname,"UPDATE",newcomp)) { + if (!fileMerger.OutputFile(targetname, "UPDATE", newcomp)) { std::cerr << "hadd error opening target file for update :" << argv[ffirst-1] << "." << std::endl; exit(2); } - } else if (!merger.OutputFile(targetname,force,newcomp) ) { + } else if (!fileMerger.OutputFile(targetname, force, newcomp)) { std::cerr << "hadd error opening target file (does " << argv[ffirst-1] << " exist?)." << std::endl; if (!force) std::cerr << "Pass \"-f\" argument to force re-creation of output file." << std::endl; exit(1); } + auto filesToProcess = argc - ffirst; + auto step = (filesToProcess + nProcesses - 1) / nProcesses; + if (multiproc && step < 3) { + // At least 3 files per process + step = 3; + nProcesses = (filesToProcess + step - 1) / step; + std::cout << "Each process should handle at least 3 files for efficiency."; + std::cout << " Setting the number of processes to: " << nProcesses << std::endl; + } + std::vector partialFiles; + + if (multiproc) { + auto uuid = TUUID(); + auto partialTail = uuid.AsString(); + for (auto i = 0; (i * step) < filesToProcess; i++) { + std::stringstream buffer; + buffer << workingDir << "/partial" << i << "_" << partialTail << ".root"; + partialFiles.emplace_back(buffer.str()); + } + } - for ( int i = ffirst; i < argc; i++ ) { - if (argv[i] && argv[i][0]=='@') { - std::ifstream indirect_file(argv[i]+1); - if( ! indirect_file.is_open() ) { - std::cerr<< "hadd could not open indirect file " << (argv[i]+1) << std::endl; - return 1; + auto mergeFiles = [&](TFileMerger &merger) { + if (reoptimize) { + merger.SetFastMethod(kFALSE); + } else { + if (!keepCompressionAsIs && merger.HasCompressionChange()) { + // Don't warn if the user any request re-optimization. + std::cout << "hadd Sources and Target have different compression levels" << std::endl; + std::cout << "hadd merging will be slower" << std::endl; } - while( indirect_file ){ - std::string line; - if( std::getline(indirect_file, line) && line.length() && !merger.AddFile(line.c_str()) ) { - return 1; + } + merger.SetNotrees(noTrees); + merger.SetMergeOptions(cacheSize); + Bool_t status; + if (append) + status = merger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kAll); + else + status = merger.Merge(); + return status; + }; + + auto sequentialMerge = [&](TFileMerger &merger, int start, int nFiles) { + + for (auto i = start; i < (start + nFiles) && i < argc; i++) { + if (argv[i] && argv[i][0] == '@') { + std::ifstream indirect_file(argv[i] + 1); + if (!indirect_file.is_open()) { + std::cerr << "hadd could not open indirect file " << (argv[i] + 1) << std::endl; + return kFALSE; } + while (indirect_file) { + std::string line; + if (std::getline(indirect_file, line) && line.length() && !merger.AddFile(line.c_str())) { + return kFALSE; + } + } + } else if (!merger.AddFile(argv[i])) { + if (skip_errors) { + std::cerr << "hadd skipping file with error: " << argv[i] << std::endl; + } else { + std::cerr << "hadd exiting due to error in " << argv[i] << std::endl; + } + return kFALSE; } - } else if( ! merger.AddFile(argv[i]) ) { - if ( skip_errors ) { - std::cerr << "hadd skipping file with error: " << argv[i] << std::endl; - } else { - std::cerr << "hadd exiting due to error in " << argv[i] << std::endl; - return 1; + } + return mergeFiles(merger); + }; + + auto parallelMerge = [&](int start) { + TFileMerger mergerP(kFALSE, kFALSE); + mergerP.SetMsgPrefix("hadd"); + mergerP.SetPrintLevel(verbosity - 1); + if (maxopenedfiles > 0) { + mergerP.SetMaxOpenedFiles(maxopenedfiles / nProcesses); + } + if (!mergerP.OutputFile(partialFiles[(start - ffirst) / step].c_str(), newcomp)) { + std::cerr << "hadd error opening target partial file" << std::endl; + exit(1); + } + return sequentialMerge(mergerP, start, step); + }; + + auto reductionFunc = [&]() { + for (auto pf : partialFiles) { + fileMerger.AddFile(pf.c_str()); + } + return mergeFiles(fileMerger); + }; + + Bool_t status; + + if (multiproc) { + ROOT::TProcessExecutor p(nProcesses); + auto res = p.Map(parallelMerge, ROOT::TSeqI(ffirst, argc, step)); + status = std::accumulate(res.begin(), res.end(), 0U) == partialFiles.size(); + if (status) { + status = reductionFunc(); + } else { + std::cout << "hadd failed at the parallel stage" << std::endl; + } + if (!debug) { + for (auto pf : partialFiles) { + gSystem->Unlink(pf.c_str()); } } - } - if (reoptimize) { - merger.SetFastMethod(kFALSE); } else { - if (!keepCompressionAsIs && merger.HasCompressionChange()) { - // Don't warn if the user any request re-optimization. - std::cout <<"hadd Sources and Target have different compression levels"<GetEntries() << " input files in " << targetname << ".\n"; + std::cout << "hadd merged " << fileMerger.GetMergeList()->GetEntries() << " input files in " << targetname + << ".\n"; } return 0; } else { if (verbosity == 1) { - std::cout << "hadd failure during the merge of " << merger.GetMergeList()->GetEntries() << " input files in " << targetname << ".\n"; + std::cout << "hadd failure during the merge of " << fileMerger.GetMergeList()->GetEntries() + << " input files in " << targetname << ".\n"; } return 1; }