-
Notifications
You must be signed in to change notification settings - Fork 1.4k
First-pass IMT implementation of FlushBaskets. #277
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
c247dcc
0be50e3
88def72
9235428
249cb14
e14e674
71b2161
cca9e30
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -131,9 +131,12 @@ int main(int argc, char **argv) | |
| Int_t branchStyle = 1; //new style by default | ||
| if (split < 0) {branchStyle = 0; split = -1-split;} | ||
|
|
||
| ROOT::EnableImplicitMT(4); | ||
|
||
|
|
||
| TFile *hfile; | ||
| TTree *tree; | ||
| Event *event = 0; | ||
| Event *event2 = 0; | ||
|
|
||
| // Fill event, header and tracks with some random numbers | ||
| // Create a timer object to benchmark this loop | ||
|
|
@@ -195,6 +198,7 @@ int main(int argc, char **argv) | |
| } else | ||
| hfile = new TFile("Event.root","RECREATE","TTree benchmark ROOT file"); | ||
| hfile->SetCompressionLevel(comp); | ||
| //hfile->SetCompressionAlgorithm(2); | ||
|
|
||
| // Create histogram to show write_time in function of time | ||
| Float_t curtime = -0.5; | ||
|
|
@@ -213,9 +217,16 @@ int main(int argc, char **argv) | |
| bufsize = 64000; | ||
| if (split) bufsize /= 4; | ||
| event = new Event(); // By setting the value, we own the pointer and must delete it. | ||
| event2 = new Event(); // By setting the value, we own the pointer and must delete it. | ||
| TTree::SetBranchStyle(branchStyle); | ||
| TBranch *branch = tree->Branch("event", &event, bufsize,split); | ||
| TBranch *branch2 = tree->Branch("event2", &event, bufsize,split); | ||
| TBranch *branch3 = tree->Branch("event3", &event2, bufsize,split); | ||
| TBranch *branch4 = tree->Branch("event4", &event2, bufsize,split); | ||
| branch->SetAutoDelete(kFALSE); | ||
| branch2->SetAutoDelete(kFALSE); | ||
| branch3->SetAutoDelete(kFALSE); | ||
| branch4->SetAutoDelete(kFALSE); | ||
| if(split >= 0 && branchStyle) tree->BranchRef(); | ||
| Float_t ptmin = 1; | ||
|
|
||
|
|
@@ -230,6 +241,7 @@ int main(int argc, char **argv) | |
| } | ||
|
|
||
| event->Build(ev, arg5, ptmin); | ||
| event2->Build(ev, arg5, ptmin); | ||
|
|
||
| if (write) nb += tree->Fill(); //fill the tree | ||
|
|
||
|
|
@@ -242,7 +254,8 @@ int main(int argc, char **argv) | |
| } | ||
| } | ||
| // We own the event (since we set the branch address explicitly), we need to delete it. | ||
| delete event; event = 0; | ||
| // delete event; event = 0; | ||
| // delete event2; event2 = 0; | ||
|
|
||
| // Stop timer and print results | ||
| timer.Stop(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -70,6 +70,8 @@ | |
| #include "TVirtualTreePlayer.h" | ||
| #endif | ||
|
|
||
| #include <atomic> | ||
|
|
||
| class TBranch; | ||
| class TBrowser; | ||
| class TFile; | ||
|
|
@@ -99,6 +101,7 @@ class TTree : public TNamed, public TAttLine, public TAttFill, public TAttMarker | |
|
|
||
| protected: | ||
| Long64_t fEntries; ///< Number of entries | ||
| // NOTE: cannot use std::atomic for these counters as it cannot be serialized. | ||
| Long64_t fTotBytes; ///< Total number of bytes in all branches before compression | ||
| Long64_t fZipBytes; ///< Total number of bytes in all branches after compression | ||
| Long64_t fSavedBytes; ///< Number of autosaved bytes | ||
|
|
@@ -160,6 +163,11 @@ class TTree : public TNamed, public TAttLine, public TAttFill, public TAttMarker | |
| TTree(const TTree& tt); // not implemented | ||
| TTree& operator=(const TTree& tt); // not implemented | ||
|
|
||
| // For simplicity, although fIMTFlush is always disabled in non-IMT builds, we don't #ifdef it out. | ||
| mutable Bool_t fIMTFlush{false}; ///<! True if we are doing a multithreaded flush. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Odd to see a mutable non-atomic. Is that really the intention? (i.e. is it really never possible to modify/read it in parallel?)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's the intention. Well, the real problem here is that The I wanted to avoid making it atomic to fulfill the request that there is no performance penalty when ROOT is used in non-IMT mode.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair enough, a comment here or at the use place along the line of this answer ought to be added. |
||
| mutable std::atomic<Long64_t> fIMTTotBytes; ///<! Total bytes for the IMT flush baskets | ||
| mutable std::atomic<Long64_t> fIMTZipBytes; ///<! Zip bytes for the IMT flush baskets. | ||
|
|
||
| void InitializeSortedBranches(); | ||
| void SortBranchesByTime(); | ||
|
|
||
|
|
@@ -303,8 +311,10 @@ class TTree : public TNamed, public TAttLine, public TAttFill, public TAttMarker | |
| virtual TFriendElement *AddFriend(const char* treename, const char* filename = ""); | ||
| virtual TFriendElement *AddFriend(const char* treename, TFile* file); | ||
| virtual TFriendElement *AddFriend(TTree* tree, const char* alias = "", Bool_t warn = kFALSE); | ||
| virtual void AddTotBytes(Int_t tot) { fTotBytes += tot; } | ||
| virtual void AddZipBytes(Int_t zip) { fZipBytes += zip; } | ||
| // As the TBasket invokes Add{Tot,Zip}Bytes on its parent tree, we must do these updates in a thread-safe | ||
| // manner only when we are flushing multiple baskets in parallel. | ||
| virtual void AddTotBytes(Int_t tot) { if (fIMTFlush) { fIMTTotBytes += tot; } else { fTotBytes += tot; } } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will fail to compile if R__USE_IMT is off. |
||
| virtual void AddZipBytes(Int_t zip) { if (fIMTFlush) { fIMTZipBytes += zip; } else { fZipBytes += zip; } } | ||
| virtual Long64_t AutoSave(Option_t* option = ""); | ||
| virtual Int_t Branch(TCollection* list, Int_t bufsize = 32000, Int_t splitlevel = 99, const char* name = ""); | ||
| virtual Int_t Branch(TList* list, Int_t bufsize = 32000, Int_t splitlevel = 99); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -933,6 +933,15 @@ Int_t TBasket::WriteBuffer() | |
| } | ||
| fMotherDir = file; // fBranch->GetDirectory(); | ||
|
|
||
| // This mutex prevents multiple TBasket::WriteBuffer invocations from interacting | ||
| // with the underlying TFile at once - TFile is assumed to *not* be thread-safe. | ||
| // | ||
| // The only parallelism we'd like to exploit (right now!) is the compression | ||
| // step - everything else should be serialized at the TFile level. | ||
| #ifdef R__USE_IMT | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given the slightly unusual use pattern (take the lock then explicit unlock then lock). Can you add comment on what this is propecting (and why it will not provoke a dead lock). |
||
| std::unique_lock<std::mutex> sentry(file->fWriteMutex); | ||
| #endif // R__USE_IMT | ||
|
|
||
| if (R__unlikely(fBufferRef->TestBit(TBufferFile::kNotDecompressed))) { | ||
| // Read the basket information that was saved inside the buffer. | ||
| Bool_t writing = fBufferRef->IsWriting(); | ||
|
|
@@ -997,8 +1006,19 @@ Int_t TBasket::WriteBuffer() | |
| for (Int_t i = 0; i < nbuffers; ++i) { | ||
| if (i == nbuffers - 1) bufmax = fObjlen - nzip; | ||
| else bufmax = kMAXZIPBUF; | ||
| //compress the buffer | ||
| // Compress the buffer. Note that we allow multiple TBasket compressions to occur at once | ||
| // for a given TFile: that's because the compression buffer when we use IMT is no longer | ||
| // shared amongst several threads. | ||
| #ifdef R__USE_IMT | ||
| sentry.unlock(); | ||
| #endif // R__USE_IMT | ||
| // NOTE this is declared with C linkage, so it shouldn't except. Also, when | ||
| // USE_IMT is defined, we are guaranteed that the compression buffer is unique per-branch. | ||
| // (see fCompressedBufferRef in constructor). | ||
| R__zipMultipleAlgorithm(cxlevel, &bufmax, objbuf, &bufmax, bufcur, &nout, cxAlgorithm); | ||
| #ifdef R__USE_IMT | ||
| sentry.lock(); | ||
| #endif // R__USE_IMT | ||
|
|
||
| // test if buffer has really been compressed. In case of small buffers | ||
| // when the buffer contains random data, it may happen that the compressed | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see
#include <mutex>?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.