CMS 3D CMS Logo

EvFDaqDirector.h
Go to the documentation of this file.
1 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
2 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
3 
7 
10 
11 //std headers
12 #include <string>
13 #include <sstream>
14 #include <iomanip>
15 #include <vector>
16 #include <map>
17 #include <list>
18 #include <mutex>
19 
20 //system headers
21 //#include <sys/types.h>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <errno.h>
25 #include <string.h>
26 #include <stdio.h>
27 
28 class SystemBounds;
29 class GlobalContext;
30 class StreamID;
31 
32 struct InputFile;
33 struct InputChunk;
34 
35 namespace edm {
36  class PathsAndConsumesOfModulesBase;
37  class ProcessContext;
38 }
39 
40 namespace Json{
41  class Value;
42 }
43 
44 namespace edm {
46 }
47 
48 namespace evf{
49 
51 
53 
55  {
56  public:
57 
58  enum FileStatus { noFile, sameFile, newFile, newLumi, runEnded, runAbort };
59 
61  ~EvFDaqDirector();
62  void initRun();
63  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
64  void preallocate(edm::service::SystemBounds const& bounds);
65  void preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const&);
66  void preBeginRun(edm::GlobalContext const& globalContext);
67  void postEndRun(edm::GlobalContext const& globalContext);
68  void preGlobalEndLumi(edm::GlobalContext const& globalContext);
69  //std::string &baseDir(){return base_dir_;}
70  void overrideRunNumber(unsigned int run) {run_=run;}
71  std::string &baseRunDir(){return run_dir_;}
72  std::string &buBaseRunDir(){return bu_run_dir_;}
73  std::string &buBaseRunOpenDir(){return bu_run_open_dir_;}
74 
75  std::string findCurrentRunDir(){ return dirManager_.findRunDir(run_);}
76  std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
77  std::string getRawFilePath(const unsigned int ls, const unsigned int index) const;
78  std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const;
79  std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
80  std::string getDatFilePath(const unsigned int ls, std::string const& stream) const;
81  std::string getOpenDatFilePath(const unsigned int ls, std::string const& stream) const;
82  std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
83  std::string getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
84  std::string getMergedDatFilePath(const unsigned int ls, std::string const& stream) const;
85  std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const;
86  std::string getOpenInitFilePath(std::string const& stream) const;
87  std::string getInitFilePath(std::string const& stream) const;
88  std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
89  std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
90  std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
91  std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
92  std::string getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
93  std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
94  std::string getEoLSFilePathOnBU(const unsigned int ls) const;
95  std::string getEoLSFilePathOnFU(const unsigned int ls) const;
96  std::string getBoLSFilePathOnFU(const unsigned int ls) const;
97  std::string getEoRFilePath() const;
98  std::string getEoRFilePathOnFU() const;
99  std::string getRunOpenDirPath() const {return run_dir_ +"/open";}
100  bool outputAdler32Recheck() const {return outputAdler32Recheck_;}
101  void removeFile(unsigned int ls, unsigned int index);
102  void removeFile(std::string );
103 
104  FileStatus updateFuLock(unsigned int& ls, std::string& nextFile, uint32_t& fsize, uint64_t& lockWaitTime);
105  void tryInitializeFuLockFile();
106  unsigned int getRunNumber() const { return run_; }
107  FILE * maybeCreateAndLockFileHeadForStream(unsigned int ls, std::string &stream);
108  void unlockAndCloseMergeStream();
109  void lockInitLock();
110  void unlockInitLock();
111  void setFMS(evf::FastMonitoringService* fms) {fms_=fms;}
112  bool isSingleStreamThread() {return nStreams_==1 && nThreads_==1;}
113  void lockFULocal();
114  void unlockFULocal();
115  void lockFULocal2();
116  void unlockFULocal2();
117  void createRunOpendirMaybe();
118  void createProcessingNotificationMaybe() const;
119  int readLastLSEntry(std::string const& file);
120  void setDeleteTracking( std::mutex* fileDeleteLock,std::list<std::pair<int,InputFile*>> *filesToDelete) {
121  fileDeleteLockPtr_=fileDeleteLock;
122  filesToDeletePtr_ = filesToDelete;
123  }
124  void checkTransferSystemPSet(edm::ProcessContext const& pc);
125  void checkMergeTypePSet(edm::ProcessContext const& pc);
126  std::string getStreamDestinations(std::string const& stream) const;
127  std::string getStreamMergeType(std::string const& stream, MergeType defaultType);
128  bool emptyLumisectionMode() const {return emptyLumisectionMode_;}
129  bool microMergeDisabled() const {return microMergeDisabled_;}
130 
131 
132  private:
133  //bool bulock();
134  //bool fulock();
135  bool bumpFile(unsigned int& ls, unsigned int& index, std::string& nextFile, uint32_t& fsize, int maxLS);
136  void openFULockfileStream(std::string& fuLockFilePath, bool create);
137  std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const;
138  std::string outputFileNameStem(const unsigned int ls, std::string const& stream) const;
139  std::string mergedFileNameStem(const unsigned int ls, std::string const& stream) const;
140  std::string initFileName(std::string const& stream) const;
141  std::string eolsFileName(const unsigned int ls) const;
142  std::string eorFileName() const;
143  int getNFilesFromEoLS(std::string BUEoLSFile);
144 
148  unsigned int run_;
153  unsigned int fuLockPollInterval_;
157 
163 
170 
177 
179 
180  unsigned long previousFileSize_;
181 
182  struct flock bu_w_flk;
183  struct flock bu_r_flk;
184  struct flock bu_w_fulk;
185  struct flock bu_r_fulk;
186  struct flock fu_rw_flk;
187  struct flock fu_rw_fulk;
188  struct flock data_rw_flk;
189  struct flock data_rw_fulk;
190  //struct flock fulocal_rw_flk;
191  //struct flock fulocal_rw_fulk;
192  //struct flock fulocal_rw_flk2;
193  //struct flock fulocal_rw_fulk2;
194 
195  evf::FastMonitoringService * fms_ = nullptr;
196 
197  std::mutex *fileDeleteLockPtr_ = nullptr;
198  std::list<std::pair<int,InputFile*>> *filesToDeletePtr_ = nullptr;
199 
200  pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER;
201 
202  unsigned int nStreams_=0;
203  unsigned int nThreads_=0;
204 
205  bool readEolsDefinition_ = true;
206  unsigned int eolsNFilesIndex_ = 1;
209  unsigned int stop_ls_override_ = 0;
210 
211  std::shared_ptr<Json::Value> transferSystemJson_;
212  std::map<std::string,std::string> mergeTypeMap_;
213 
214  //values initialized in .cc file
215  static const std::vector<std::string> MergeTypeNames_;
216  };
217 }
218 
219 #endif
220 
std::string & buBaseRunDir()
std::string run_string_
def create(alignables, pedeDump, additionalData, outputFile, config)
static boost::mutex mutex
Definition: LHEProxy.cc:11
std::shared_ptr< Json::Value > transferSystemJson_
std::map< std::string, std::string > mergeTypeMap_
Represents a JSON value.
Definition: value.h:111
unsigned long previousFileSize_
bool microMergeDisabled() const
void overrideRunNumber(unsigned int run)
std::string hltSourceDirectory_
std::string mergeTypePset_
bool outputAdler32Recheck() const
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
std::string stopFilePath_
std::string bu_base_dir_
std::string findCurrentRunDir()
std::string selectedTransferMode_
std::string & buBaseRunOpenDir()
JSON (JavaScript Object Notation).
std::string eorFileName(const unsigned int run)
std::string stopFilePathPid_
std::string & baseRunDir()
def ls(path, rec=False)
Definition: eostools.py:348
unsigned long long uint64_t
Definition: Time.h:15
std::string bu_run_dir_
HLT enums.
static const std::vector< std::string > MergeTypeNames_
unsigned int getRunNumber() const
bool emptyLumisectionMode() const
std::string getRunOpenDirPath() const
void setFMS(evf::FastMonitoringService *fms)
unsigned int fuLockPollInterval_
std::string eolsFileName(const unsigned int run, const unsigned int ls)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
std::string bu_run_open_dir_