-
Notifications
You must be signed in to change notification settings - Fork 1.4k
[WIP] Allow hadd to run in multiprocess mode #491
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
4e8c226
3235a7f
7cd817a
e2f5058
b9b1d3f
ef34f13
a7d49fa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -84,6 +84,7 @@ | |
| #include <climits> | ||
|
|
||
| #include "TFileMerger.h" | ||
| #include "ROOT/TProcessExecutor.hxx" | ||
|
|
||
| //////////////////////////////////////////////////////////////////////////////// | ||
|
|
||
|
|
@@ -104,6 +105,7 @@ int main( int argc, char **argv ) | |
| std::cout << "If the option -O is used, when merging TTree, the basket size is re-optimized" <<std::endl; | ||
| std::cout << "If the option -v is used, explicitly set the verbosity level;\n"\ | ||
| " 0 request no output, 99 is the default" <<std::endl; | ||
| std::cout << "If the option -m is used, the execution will be parallelized in multiple processes\n" <<std::endl; | ||
| std::cout << "If the option -n is used, hadd will open at most 'maxopenedfiles' at once, use 0\n" | ||
| " to request to use the system maximum." << std::endl; | ||
| std::cout << "If the option -cachesize is used, hadd will resize (or disable if 0) the\n" | ||
|
|
@@ -139,9 +141,13 @@ int main( int argc, char **argv ) | |
| Bool_t noTrees = kFALSE; | ||
| Bool_t keepCompressionAsIs = kFALSE; | ||
| Bool_t useFirstInputCompression = kFALSE; | ||
| Bool_t multiproc = kFALSE; | ||
| Int_t maxopenedfiles = 0; | ||
| Int_t verbosity = 99; | ||
| TString cacheSize; | ||
| SysInfo_t s; | ||
| gSystem->GetSysInfo(&s); | ||
| auto nProcesses = s.fCpus; | ||
|
|
||
| int outputPlace = 0; | ||
| int ffirst = 2; | ||
|
|
@@ -162,6 +168,33 @@ int main( int argc, char **argv ) | |
| } else if ( strcmp(argv[a],"-O") == 0 ) { | ||
| reoptimize = kTRUE; | ||
| ++ffirst; | ||
| } else if ( strcmp(argv[a],"-j") == 0 ) { | ||
| // If the number of processes is not specified, use the default. | ||
| if (a+1 != argc && argv[a+1][0] != '-') { | ||
| // number of processes specified | ||
| Long_t request = 1; | ||
| for (char *c = argv[a+1]; *c != '\0'; ++c) { | ||
| if (!isdigit(*c)) { | ||
| // Wrong number of Processes. Use the default: | ||
| std::cerr << "Error: could not parse the number of processes to run in parallel passed after -j: " << argv[a+1] << ". We will use the system maximum.\n"; | ||
| request = 0; | ||
| break; | ||
| } | ||
| } | ||
| if (request == 1) { | ||
| request = strtol(argv[a+1], 0, 10); | ||
| if (request < kMaxLong && request >= 0) { | ||
| nProcesses = (Int_t)request; | ||
| ++a; | ||
| ++ffirst; | ||
| std::cout << "Parallelizing with "<<nProcesses<<" processes.\n"; | ||
| } else { | ||
| std::cerr << "Error: could not parse the number of processes to use passed after -j: " << argv[a+1] << ". We will use the default value (number of logical cores).\n"; | ||
| } | ||
| } | ||
| } | ||
| multiproc = kTRUE; | ||
| ++ffirst; | ||
| } else if ( strcmp(argv[a],"-cachesize=") == 0 ) { | ||
| int size; | ||
| static const size_t arglen = strlen("-cachesize="); | ||
|
|
@@ -308,11 +341,11 @@ int main( int argc, char **argv ) | |
| std::cout << "hadd Target file: " << targetname << std::endl; | ||
| } | ||
|
|
||
| TFileMerger merger(kFALSE,kFALSE); | ||
| merger.SetMsgPrefix("hadd"); | ||
| merger.SetPrintLevel(verbosity - 1); | ||
| TFileMerger fileMerger(kFALSE,kFALSE); | ||
| fileMerger.SetMsgPrefix("hadd"); | ||
| fileMerger.SetPrintLevel(verbosity - 1); | ||
| if (maxopenedfiles > 0) { | ||
| merger.SetMaxOpenedFiles(maxopenedfiles); | ||
| fileMerger.SetMaxOpenedFiles(maxopenedfiles); | ||
| } | ||
| if (newcomp == -1) { | ||
| if (useFirstInputCompression || keepCompressionAsIs) { | ||
|
|
@@ -348,62 +381,127 @@ int main( int argc, char **argv ) | |
| std::cout << "hadd compression setting for all ouput: " << newcomp << '\n'; | ||
| } | ||
| if (append) { | ||
| if (!merger.OutputFile(targetname,"UPDATE",newcomp)) { | ||
| if (!fileMerger.OutputFile(targetname,"UPDATE",newcomp)) { | ||
| std::cerr << "hadd error opening target file for update :" << argv[ffirst-1] << "." << std::endl; | ||
| exit(2); | ||
| } | ||
| } else if (!merger.OutputFile(targetname,force,newcomp) ) { | ||
| } else if (!fileMerger.OutputFile(targetname,force,newcomp) ) { | ||
| std::cerr << "hadd error opening target file (does " << argv[ffirst-1] << " exist?)." << std::endl; | ||
| if (!force) std::cerr << "Pass \"-f\" argument to force re-creation of output file." << std::endl; | ||
| exit(1); | ||
| } | ||
|
|
||
| auto filesToProcess = argc - ffirst; | ||
| auto step = (filesToProcess+nProcesses-1)/nProcesses; | ||
| if(step < 3){ | ||
| //At least 3 files per process | ||
| step = 3; | ||
| nProcesses = (filesToProcess+step-1)/step; | ||
| std::cout<<"Each process should handle at least 3 files for efficiency."; | ||
| std::cout<<" Setting the number of processes to: "<<nProcesses<<std::endl; | ||
| } | ||
| std::vector<std::string> partialFiles; | ||
|
|
||
| if(multiproc){ | ||
| for(auto i = 0; (i*step)<filesToProcess; i++) { | ||
| std::stringstream buffer; | ||
| buffer <<"partial"<<i<<".root"; | ||
|
||
| partialFiles.emplace_back(buffer.str()); | ||
| } | ||
| } | ||
|
|
||
| for ( int i = ffirst; i < argc; i++ ) { | ||
| if (argv[i] && argv[i][0]=='@') { | ||
| std::ifstream indirect_file(argv[i]+1); | ||
| if( ! indirect_file.is_open() ) { | ||
| std::cerr<< "hadd could not open indirect file " << (argv[i]+1) << std::endl; | ||
| return 1; | ||
| auto mergeFiles = [&] (TFileMerger &merger){ | ||
| if (reoptimize) { | ||
| merger.SetFastMethod(kFALSE); | ||
| } else { | ||
| if (!keepCompressionAsIs && merger.HasCompressionChange()) { | ||
| // Don't warn if the user any request re-optimization. | ||
| std::cout <<"hadd Sources and Target have different compression levels"<<std::endl; | ||
| std::cout <<"hadd merging will be slower"<<std::endl; | ||
| } | ||
| while( indirect_file ){ | ||
| std::string line; | ||
| if( std::getline(indirect_file, line) && line.length() && !merger.AddFile(line.c_str()) ) { | ||
| return 1; | ||
| } | ||
| merger.SetNotrees(noTrees); | ||
| merger.SetMergeOptions(cacheSize); | ||
| Bool_t status; | ||
| if (append) status = merger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kAll); | ||
| else status = merger.Merge(); | ||
| return status; | ||
| }; | ||
|
|
||
| auto sequentialMerge = [&](TFileMerger &merger, int start, int nFiles){ | ||
|
|
||
| for ( auto i = start; i < (start + nFiles) && i<argc; i++ ) { | ||
| if (argv[i] && argv[i][0]=='@') { | ||
| std::ifstream indirect_file(argv[i]+1); | ||
| if( ! indirect_file.is_open() ) { | ||
| std::cerr<< "hadd could not open indirect file " << (argv[i]+1) << std::endl; | ||
| return kFALSE; | ||
| } | ||
| } | ||
| } else if( ! merger.AddFile(argv[i]) ) { | ||
| if ( skip_errors ) { | ||
| std::cerr << "hadd skipping file with error: " << argv[i] << std::endl; | ||
| } else { | ||
| std::cerr << "hadd exiting due to error in " << argv[i] << std::endl; | ||
| return 1; | ||
| while( indirect_file ){ | ||
| std::string line; | ||
| if( std::getline(indirect_file, line) && line.length() && !merger.AddFile(line.c_str()) ) { | ||
| return kFALSE; | ||
| } | ||
| } | ||
| } else if( ! merger.AddFile(argv[i]) ) { | ||
| if ( skip_errors ) { | ||
| std::cerr << "hadd skipping file with error: " << argv[i] << std::endl; | ||
| } else { | ||
| std::cerr << "hadd exiting due to error in " << argv[i] << std::endl; | ||
| } | ||
| return kFALSE; | ||
| } | ||
| } | ||
| } | ||
| if (reoptimize) { | ||
| merger.SetFastMethod(kFALSE); | ||
| } else { | ||
| if (!keepCompressionAsIs && merger.HasCompressionChange()) { | ||
| // Don't warn if the user any request re-optimization. | ||
| std::cout <<"hadd Sources and Target have different compression levels"<<std::endl; | ||
| std::cout <<"hadd merging will be slower"<<std::endl; | ||
| return mergeFiles(merger); | ||
| }; | ||
|
|
||
| auto parallelMerge = [&](int start){ | ||
| TFileMerger mergerP(kFALSE,kFALSE); | ||
| mergerP.SetMsgPrefix("hadd"); | ||
| mergerP.SetPrintLevel(verbosity - 1); | ||
| if (maxopenedfiles > 0) { | ||
| mergerP.SetMaxOpenedFiles(maxopenedfiles/nProcesses); | ||
| } | ||
| } | ||
| merger.SetNotrees(noTrees); | ||
| merger.SetMergeOptions(cacheSize); | ||
| if (!mergerP.OutputFile(partialFiles[(start-ffirst)/step].c_str(), newcomp)) { | ||
| std::cerr << "hadd error opening target partial file"<<std::endl; | ||
| exit(1); | ||
| } | ||
| return sequentialMerge(mergerP, start, step); | ||
| }; | ||
|
|
||
| auto reductionFunc = [&](){ | ||
| for(auto pf:partialFiles){ | ||
| fileMerger.AddFile(pf.c_str()); | ||
| } | ||
| return mergeFiles(fileMerger); | ||
| }; | ||
|
|
||
| Bool_t status; | ||
| if (append) status = merger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kAll); | ||
| else status = merger.Merge(); | ||
|
|
||
| if(multiproc) { | ||
| ROOT::TProcessExecutor p(nProcesses); | ||
| auto res = p.Map(parallelMerge, ROOT::TSeqI(ffirst, argc, step)); | ||
| status = std::accumulate(res.begin(), res.end(), 0U) == partialFiles.size(); | ||
| if(status){ | ||
| status = reductionFunc(); | ||
| } else{ | ||
| std::cout<<"hadd failed at the parallel stage"<<std::endl; | ||
| } | ||
| for(auto pf:partialFiles){ | ||
| gSystem->Unlink(pf.c_str()); | ||
|
||
| } | ||
| } else{ | ||
| status = sequentialMerge(fileMerger, ffirst, filesToProcess); | ||
| } | ||
|
|
||
| if (status) { | ||
| if (verbosity == 1) { | ||
| std::cout << "hadd merged " << merger.GetMergeList()->GetEntries() << " input files in " << targetname << ".\n"; | ||
| std::cout << "hadd merged " << fileMerger.GetMergeList()->GetEntries() << " input files in " << targetname << ".\n"; | ||
| } | ||
| return 0; | ||
| } else { | ||
| if (verbosity == 1) { | ||
| std::cout << "hadd failure during the merge of " << merger.GetMergeList()->GetEntries() << " input files in " << targetname << ".\n"; | ||
| std::cout << "hadd failure during the merge of " << fileMerger.GetMergeList()->GetEntries() << " input files in " << targetname << ".\n"; | ||
| } | ||
| return 1; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given that request is a long isn't the first part always true? If I remember correctly, one has to check errno to figure out if something went wrong in strtol.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessarily.
From cpp reference:
Which means that this may be incorrect when no conversion can be performed. There are a couple more places were hadd was already doing that. I'll check.