test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EvFDaqDirector.h
Go to the documentation of this file.
1 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
2 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
3 
7 
10 
11 //std headers
12 #include <string>
13 #include <sstream>
14 #include <iomanip>
15 #include <vector>
16 #include <map>
17 #include <list>
18 #include <mutex>
19 
20 //system headers
21 //#include <sys/types.h>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <errno.h>
25 #include <string.h>
26 #include <stdio.h>
27 
28 class SystemBounds;
29 class GlobalContext;
30 class StreamID;
31 
32 class InputFile;
33 class InputChunk;
34 
35 namespace edm {
36  class PathsAndConsumesOfModulesBase;
37  class ProcessContext;
38 }
39 
40 namespace Json{
41  class Value;
42 }
43 
44 namespace edm {
46 }
47 
48 namespace evf{
49 
51 
52  class FastMonitoringService;
53 
55  {
56  public:
57 
59 
62  void initRun();
63  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
64  void preallocate(edm::service::SystemBounds const& bounds);
66  void preBeginRun(edm::GlobalContext const& globalContext);
67  void postEndRun(edm::GlobalContext const& globalContext);
68  void preGlobalEndLumi(edm::GlobalContext const& globalContext);
69  void preSourceEvent(edm::StreamID const& streamID);
70  //std::string &baseDir(){return base_dir_;}
71  void overrideRunNumber(unsigned int run) {run_=run;}
75 
77  std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
78  std::string getRawFilePath(const unsigned int ls, const unsigned int index) const;
79  std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const;
80  std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
81  std::string getDatFilePath(const unsigned int ls, std::string const& stream) const;
82  std::string getOpenDatFilePath(const unsigned int ls, std::string const& stream) const;
83  std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
84  std::string getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
85  std::string getMergedDatFilePath(const unsigned int ls, std::string const& stream) const;
86  std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const;
87  std::string getOpenInitFilePath(std::string const& stream) const;
88  std::string getInitFilePath(std::string const& stream) const;
89  std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
90  std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
91  std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
92  std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
93  std::string getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
94  std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
95  std::string getEoLSFilePathOnBU(const unsigned int ls) const;
96  std::string getEoLSFilePathOnFU(const unsigned int ls) const;
97  std::string getBoLSFilePathOnFU(const unsigned int ls) const;
100  std::string getRunOpenDirPath() const {return run_dir_ +"/open";}
102  void removeFile(unsigned int ls, unsigned int index);
103  void removeFile(std::string );
104 
105  FileStatus updateFuLock(unsigned int& ls, std::string& nextFile, uint32_t& fsize, uint64_t& lockWaitTime);
107  unsigned int getRunNumber() const { return run_; }
108  FILE * maybeCreateAndLockFileHeadForStream(unsigned int ls, std::string &stream);
110  void lockInitLock();
111  void unlockInitLock();
113  void updateFileIndex(int const& fileIndex) {currentFileIndex_=fileIndex;}
114  std::vector<int>* getStreamFileTracker() {return &streamFileTracker_;}
115  bool isSingleStreamThread() {return nStreams_==1 && nThreads_==1;}
116  void lockFULocal();
117  void unlockFULocal();
118  void lockFULocal2();
119  void unlockFULocal2();
120  void createRunOpendirMaybe();
122  int readLastLSEntry(std::string const& file);
123  void setDeleteTracking( std::mutex* fileDeleteLock,std::list<std::pair<int,InputFile*>> *filesToDelete) {
124  fileDeleteLockPtr_=fileDeleteLock;
125  filesToDeletePtr_ = filesToDelete;
126  }
128  void checkMergeTypePSet(edm::ProcessContext const& pc);
129  std::string getStreamDestinations(std::string const& stream) const;
130  std::string getStreamMergeType(std::string const& stream, MergeType defaultType);
133 
134 
135  private:
136  //bool bulock();
137  //bool fulock();
138  bool bumpFile(unsigned int& ls, unsigned int& index, std::string& nextFile, uint32_t& fsize, int maxLS);
139  void openFULockfileStream(std::string& fuLockFilePath, bool create);
140  std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const;
141  std::string outputFileNameStem(const unsigned int ls, std::string const& stream) const;
142  std::string mergedFileNameStem(const unsigned int ls, std::string const& stream) const;
143  std::string initFileName(std::string const& stream) const;
144  std::string eolsFileName(const unsigned int ls) const;
145  std::string eorFileName() const;
146  int getNFilesFromEoLS(std::string BUEoLSFile);
147 
151  unsigned int run_;
156  unsigned int fuLockPollInterval_;
160 
166 
173 
180 
182 
183  unsigned long previousFileSize_;
184 
185  struct flock bu_w_flk;
186  struct flock bu_r_flk;
187  struct flock bu_w_fulk;
188  struct flock bu_r_fulk;
189  struct flock fu_rw_flk;
190  struct flock fu_rw_fulk;
191  struct flock data_rw_flk;
192  struct flock data_rw_fulk;
193  //struct flock fulocal_rw_flk;
194  //struct flock fulocal_rw_fulk;
195  //struct flock fulocal_rw_flk2;
196  //struct flock fulocal_rw_fulk2;
197 
199  std::vector<int> streamFileTracker_;
201 
203  std::list<std::pair<int,InputFile*>> *filesToDeletePtr_ = nullptr;
204 
205  pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER;
206 
207  unsigned int nStreams_=0;
208  unsigned int nThreads_=0;
209 
210  bool readEolsDefinition_ = true;
211  unsigned int eolsNFilesIndex_ = 1;
214  unsigned int stop_ls_override_ = 0;
215 
216  std::shared_ptr<Json::Value> transferSystemJson_;
217  std::map<std::string,std::string> mergeTypeMap_;
218 
219  //values initialized in .cc file
220  static const std::vector<std::string> MergeTypeNames_;
221  };
222 }
223 
224 #endif
225 
std::string & buBaseRunDir()
struct flock bu_w_fulk
unsigned int nThreads_
std::string getStreamDestinations(std::string const &stream) const
struct flock fu_rw_flk
std::string run_string_
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::vector< int > streamFileTracker_
std::list< std::pair< int, InputFile * > > * filesToDeletePtr_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::vector< int > * getStreamFileTracker()
std::string outputFileNameStem(const unsigned int ls, std::string const &stream) const
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
static boost::mutex mutex
Definition: LHEProxy.cc:11
std::shared_ptr< Json::Value > transferSystemJson_
void openFULockfileStream(std::string &fuLockFilePath, bool create)
std::string getInitFilePath(std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
pthread_mutex_t init_lock_
struct flock bu_r_fulk
def ls
Definition: eostools.py:348
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
std::string getOpenInitFilePath(std::string const &stream) const
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
struct flock data_rw_flk
void createProcessingNotificationMaybe() const
std::map< std::string, std::string > mergeTypeMap_
std::string eolsFileName(const unsigned int ls) const
Represents a JSON value.
Definition: value.h:111
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::string getEoRFilePath() const
unsigned long previousFileSize_
bool microMergeDisabled() const
void overrideRunNumber(unsigned int run)
struct flock fu_rw_fulk
std::string hltSourceDirectory_
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::mutex * fileDeleteLockPtr_
std::string mergeTypePset_
void updateFileIndex(int const &fileIndex)
EvFDaqDirector(const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
bool outputAdler32Recheck() const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
std::string stopFilePath_
std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const
FILE * maybeCreateAndLockFileHeadForStream(unsigned int ls, std::string &stream)
std::string bu_base_dir_
std::string findCurrentRunDir()
void removeFile(unsigned int ls, unsigned int index)
unsigned int stop_ls_override_
std::string selectedTransferMode_
std::string & buBaseRunOpenDir()
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
int readLastLSEntry(std::string const &file)
std::string stopFilePathPid_
unsigned int eolsNFilesIndex_
std::string & baseRunDir()
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
int getNFilesFromEoLS(std::string BUEoLSFile)
struct flock bu_w_flk
void preBeginRun(edm::GlobalContext const &globalContext)
void preSourceEvent(edm::StreamID const &streamID)
unsigned long long uint64_t
Definition: Time.h:15
void checkMergeTypePSet(edm::ProcessContext const &pc)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
evf::FastMonitoringService * fms_
std::string bu_run_dir_
std::string eorFileName() const
std::string getBoLSFilePathOnFU(const unsigned int ls) const
std::string findRunDir(unsigned int)
Definition: DirManager.cc:37
void postEndRun(edm::GlobalContext const &globalContext)
static const std::vector< std::string > MergeTypeNames_
void checkTransferSystemPSet(edm::ProcessContext const &pc)
unsigned int getRunNumber() const
bool emptyLumisectionMode() const
std::string getEoLSFilePathOnFU(const unsigned int ls) const
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, int maxLS)
void preallocate(edm::service::SystemBounds const &bounds)
struct flock data_rw_fulk
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
std::string getRunOpenDirPath() const
void setFMS(evf::FastMonitoringService *fms)
unsigned int fuLockPollInterval_
struct flock bu_r_flk
std::string mergedFileNameStem(const unsigned int ls, std::string const &stream) const
unsigned int nStreams_
std::string getEoRFilePathOnFU() const
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
std::string bu_run_open_dir_
std::string initFileName(std::string const &stream) const