CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EvFDaqDirector.h
Go to the documentation of this file.
1 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
2 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
3 
7 
10 
11 //std headers
12 #include <string>
13 #include <sstream>
14 #include <iomanip>
15 #include <vector>
16 #include <list>
17 #include <mutex>
18 
19 //system headers
20 //#include <sys/types.h>
21 #include <sys/stat.h>
22 #include <fcntl.h>
23 #include <errno.h>
24 #include <string.h>
25 #include <stdio.h>
26 
27 class SystemBounds;
28 class GlobalContext;
29 class StreamID;
30 
31 class InputFile;
32 class InputChunk;
33 
34 namespace evf{
35 
37 
39  {
40  public:
41 
43 
44  explicit EvFDaqDirector( const edm::ParameterSet &pset, edm::ActivityRegistry& reg );
46  void preallocate(edm::service::SystemBounds const& bounds);
47  void preBeginRun(edm::GlobalContext const& globalContext);
48  void postEndRun(edm::GlobalContext const& globalContext);
49  void preGlobalEndLumi(edm::GlobalContext const& globalContext);
50  void preSourceEvent(edm::StreamID const& streamID);
51  //std::string &baseDir(){return base_dir_;}
55 
57  std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
58  std::string getRawFilePath(const unsigned int ls, const unsigned int index) const;
59  std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const;
60  std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
61  std::string getOpenDatFilePath(const unsigned int ls, std::string const& stream) const;
62  std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
63  std::string getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
64  std::string getMergedDatFilePath(const unsigned int ls, std::string const& stream) const;
67  std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
69  std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
70  std::string getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
71  std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
72  std::string getEoLSFilePathOnBU(const unsigned int ls) const;
73  std::string getEoLSFilePathOnFU(const unsigned int ls) const;
76  std::string getRunOpenDirPath() const {return run_dir_ +"/open";}
77  void removeFile(unsigned int ls, unsigned int index);
78  void removeFile(std::string );
79 
80  FileStatus updateFuLock(unsigned int& ls, std::string& nextFile, uint32_t& fsize);
82  unsigned int getRunNumber() const { return run_; }
83  unsigned int getJumpLS() const { return jumpLS_; }
84  unsigned int getJumpIndex() const { return jumpIndex_; }
89  void lockInitLock();
90  void unlockInitLock();
92  void updateFileIndex(int const& fileIndex) {currentFileIndex_=fileIndex;}
93  std::vector<int>* getStreamFileTracker() {return &streamFileTracker_;}
94  bool isSingleStreamThread() {return nStreams_==1 && nThreads_==1;}
95  void lockFULocal();
96  void unlockFULocal();
97  void lockFULocal2();
98  void unlockFULocal2();
99  void createRunOpendirMaybe();
100  void setDeleteTracking( std::mutex* fileDeleteLock,std::list<std::pair<int,InputFile*>> *filesToDelete) {
101  fileDeleteLockPtr_=fileDeleteLock;
102  filesToDeletePtr_ = filesToDelete;
103  }
104 
105 
106  private:
107  //bool bulock();
108  //bool fulock();
109  bool bumpFile(unsigned int& ls, unsigned int& index, std::string& nextFile, uint32_t& fsize);
110  void openFULockfileStream(std::string& fuLockFilePath, bool create);
111  std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const;
112  std::string outputFileNameStem(const unsigned int ls, std::string const& stream) const;
113  std::string mergedFileNameStem(const unsigned int ls, std::string const& stream) const;
115  std::string eolsFileName(const unsigned int ls) const;
116  std::string eorFileName() const;
117 
122  unsigned int run_;
123 
129 
136 
143 
145 
146  unsigned long previousFileSize_;
147  unsigned int jumpLS_, jumpIndex_;
148 
149  struct flock bu_w_flk;
150  struct flock bu_r_flk;
151  struct flock bu_w_fulk;
152  struct flock bu_r_fulk;
153  struct flock fu_rw_flk;
154  struct flock fu_rw_fulk;
155  struct flock data_rw_flk;
156  struct flock data_rw_fulk;
157  //struct flock fulocal_rw_flk;
158  //struct flock fulocal_rw_fulk;
159  //struct flock fulocal_rw_flk2;
160  //struct flock fulocal_rw_fulk2;
161 
163  std::vector<int> streamFileTracker_;
165 
167  std::list<std::pair<int,InputFile*>> *filesToDeletePtr_ = nullptr;
168 
169  pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER;
170 
171  unsigned int nStreams_=0;
172  unsigned int nThreads_=0;
173 
174  };
175 }
176 
177 #endif
178 
std::string & buBaseRunDir()
struct flock bu_w_fulk
unsigned int getJumpIndex() const
unsigned int nThreads_
struct flock fu_rw_flk
std::string run_string_
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::vector< int > streamFileTracker_
std::list< std::pair< int, InputFile * > > * filesToDeletePtr_
std::vector< int > * getStreamFileTracker()
std::string outputFileNameStem(const unsigned int ls, std::string const &stream) const
static boost::mutex mutex
Definition: LHEProxy.cc:11
void openFULockfileStream(std::string &fuLockFilePath, bool create)
std::string getInitFilePath(std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
pthread_mutex_t init_lock_
struct flock bu_r_fulk
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
struct flock data_rw_flk
std::string eolsFileName(const unsigned int ls) const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::string getEoRFilePath() const
unsigned long previousFileSize_
unsigned int jumpIndex_
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize)
struct flock fu_rw_fulk
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::mutex * fileDeleteLockPtr_
void updateFileIndex(int const &fileIndex)
EvFDaqDirector(const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const
FILE * maybeCreateAndLockFileHeadForStream(unsigned int ls, std::string &stream)
std::string bu_base_dir_
std::string findCurrentRunDir()
void removeFile(unsigned int ls, unsigned int index)
std::string & buBaseRunOpenDir()
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
std::string & baseRunDir()
bool getTestModeNoBuilderUnit()
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize)
struct flock bu_w_flk
void preBeginRun(edm::GlobalContext const &globalContext)
void preSourceEvent(edm::StreamID const &streamID)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
evf::FastMonitoringService * fms_
std::string bu_run_dir_
std::string eorFileName() const
std::string findRunDir(unsigned int)
Definition: DirManager.cc:37
void postEndRun(edm::GlobalContext const &globalContext)
unsigned int getRunNumber() const
unsigned int jumpLS_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::string getJumpFilePath() const
void preallocate(edm::service::SystemBounds const &bounds)
struct flock data_rw_fulk
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
unsigned int getJumpLS() const
std::string getRunOpenDirPath() const
void setFMS(evf::FastMonitoringService *fms)
SurfaceDeformation * create(int type, const std::vector< double > &params)
struct flock bu_r_flk
std::string mergedFileNameStem(const unsigned int ls, std::string const &stream) const
unsigned int nStreams_
std::string getEoRFilePathOnFU() const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
std::string bu_run_open_dir_
std::string initFileName(std::string const &stream) const