Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion io/io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ set(libname RIO)

include_directories(${CMAKE_SOURCE_DIR}/core/clib/res)

ROOT_GENERATE_DICTIONARY(G__IO *.h STAGE1 MODULE ${libname} LINKDEF LinkDef.h)
ROOT_GENERATE_DICTIONARY(G__IO *.h ROOT/*.hxx STAGE1 MODULE ${libname} LINKDEF LinkDef.h)

if(root7)
ROOT_GLOB_SOURCES(root7src RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} v7/src/*.cxx)
Expand All @@ -18,4 +18,8 @@ ROOT_LINKER_LIBRARY(${libname} $<TARGET_OBJECTS:RIOObjs> $<TARGET_OBJECTS:RootPc
DEPENDENCIES Core Thread)
ROOT_INSTALL_HEADERS()

if(testing)
add_subdirectory(test)
endif()

#--- Extra rules ----------------------------------------------------------
2 changes: 2 additions & 0 deletions io/io/inc/LinkDef.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,7 @@
#pragma link C++ class TStreamerInfoActions::TConfiguredAction+;
#pragma link C++ class TStreamerInfoActions::TActionSequence+;
#pragma link C++ class TStreamerInfoActions::TConfiguration-;
#pragma link C++ class ROOT::Experimental::TBufferMerger;
#pragma link C++ class ROOT::Experimental::TBufferMergerFile;

#endif
142 changes: 142 additions & 0 deletions io/io/inc/ROOT/TBufferMerger.hxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// @(#)root/io:$Id$
// Author: Philippe Canal, Witold Pokorski, and Guilherme Amadio

/*************************************************************************
* Copyright (C) 1995-2017, Rene Brun and Fons Rademakers. *
* All rights reserved. *
* *
* For the licensing terms see $ROOTSYS/LICENSE. *
* For the list of contributors see $ROOTSYS/README/CREDITS. *
*************************************************************************/

#ifndef ROOT_TBufferMerger
#define ROOT_TBufferMerger

#include "TMemFile.h"

#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>

class TArrayC;
class TBufferFile;

namespace ROOT {
namespace Experimental {

class TBufferMergerFile;

/**
* \class TBufferMerger TBufferMerger.hxx
* \ingroup IO
*
* TBufferMerger is a class to facilitate writing data in
* parallel from multiple threads, while writing to a single
* output file. Its purpose is similar to TParallelMergingFile,
* but instead of using processes that connect to a network
* socket, TBufferMerger uses threads that each write to a
* TBufferMergerFile, which in turn push data into a queue
* managed by the TBufferMerger.
*/

class TBufferMerger {
public:
/** Constructor
* @param name Output file name
* @param option Output file creation options
* @param ftitle Output file title
* @param compression Output file compression level
*/
TBufferMerger(const char *name, Option_t *option = "RECREATE", const char *ftitle = "", Int_t compress = 1);

/** Destructor */
virtual ~TBufferMerger();

/** Returns a TBufferMergerFile to which data can be written.
* At the end, all TBufferMergerFiles get merged into the output file.
*/
std::shared_ptr<TBufferMergerFile> GetFile();

friend class TBufferMergerFile;

private:
/** TBufferMerger has no default constructor */
TBufferMerger();

/** TBufferMerger has no copy constructor */
TBufferMerger(const TBufferMerger &);

/** TBufferMerger has no copy operator */
TBufferMerger& operator=(const TBufferMerger&);

void Push(TBufferFile *buffer);
void Listen();

std::mutex fFilesMutex; //< Mutex used to lock fAttachedFiles
std::mutex fQueueMutex; //< Mutex used to lock fQueue
std::mutex fWriteMutex; //< Mutex used for the condition variable
std::condition_variable fCV; //< Condition variable used to wait for data
std::queue<TBufferFile *> fQueue; //< Queue to which data is pushed and merged
std::unique_ptr<TFile> fFile; //< Output file, owned by the user
std::unique_ptr<std::thread> fMergingThread; //< Worker thread that writes to disk
std::vector<std::weak_ptr<TBufferMergerFile>> fAttachedFiles; //< Attached files

ClassDef(TBufferMerger, 0);
};

/**
* \class TBufferMerger TBufferMerger.hxx
* \ingroup IO
*
* A TBufferMergerFile is similar to a TMemFile, but when data
* is written to it, it is appended to the TBufferMerger queue.
* The TBufferMerger merges all data into the output file on disk.
*/

class TBufferMergerFile : public TMemFile {
private:
TBufferMerger &fMerger; //< TBufferMerger this file is attached to
std::unique_ptr<TArrayC> fClassSent; //< StreamerInfo this file has already written
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably can probably be replaced by a TBits (more compact).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to touch the imported code, because I wasn't sure what exactly was stored in it, but an array of characters seems wierd as replacement for a vector of bools, if that's what it really is for here. I would prefer using std::bitset if that's ok in that case. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not need to be addressed before merging.

The main difference between std::bitset and TBits is that bitset has a fixed size at compile time while TBits is expandable (and some operation in TBits are (or at least used to be) faster). Here you can not predict the maximum number of TStreamerInfo involved in storing data in this file.

The reason it is a TArrayC is that at the time the original version of this code was written (circa 2000), TBits did not exist yet ... i.e. this code is copy/paste per se from TFile::WriteStreamerInfo.

The purpose is solely to keep track of whether a StreamerInfo, refer to by its index has been sent over ...

... right but that does not make any sense when the client and the server are in the same process ....

So actually WriteStreamerInfo and any use of fClassSent can go away ....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I'm not acquainted yet with the internals of the I/O system, so I cannot make such judgments. If it is indeed unnecessary, then we should remove it.


/** Constructor. Can only be called by TBufferMerger.
* @param m Merger this file is attached to. */
TBufferMergerFile(TBufferMerger &m);

/** TBufferMergerFile has no default constructor. */
TBufferMergerFile();

/** TBufferMergerFile has no copy constructor. */
TBufferMergerFile(const TBufferMergerFile &);

/** TBufferMergerFile has no copy operator */
TBufferMergerFile& operator=(const TBufferMergerFile&);

friend class TBufferMerger;

public:
/** Destructor */
~TBufferMergerFile();

using TMemFile::Write;

/** Write data into a TBufferFile and append it to TBufferMerger.
* @param name Name
* @param opt Options
* @param bufsize Buffer size
* This function must be called before the TBufferMergerFile gets destroyed,
* or no data is appended to the TBufferMerger.
*/
virtual Int_t Write(const char *name = nullptr, Int_t opt = 0, Int_t bufsize = 0) override;

/** Write StreamerInfo for objects that have not already been written. */
virtual void WriteStreamerInfo() override;

ClassDefOverride(TBufferMergerFile, 0);
};

} // namespace Experimental
} // namespace ROOT

#endif
Loading