CMS 3D CMS Logo

EvFDaqDirector.h
Go to the documentation of this file.
1 #ifndef EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
2 #define EVENTFILTER_UTILTIES_PLUGINS_EVFDAQDIRECTOR
3 
7 
10 
11 //std headers
12 #include <string>
13 #include <sstream>
14 #include <iomanip>
15 #include <vector>
16 #include <map>
17 #include <list>
18 #include <mutex>
19 
20 //system headers
21 #include <sys/stat.h>
22 #include <sys/file.h>
23 #include <fcntl.h>
24 #include <cerrno>
25 #include <cstring>
26 #include <cstdio>
27 
28 #include <tbb/concurrent_hash_map.h>
29 #include <boost/filesystem.hpp>
30 #include <boost/asio.hpp>
31 
32 class SystemBounds;
33 class GlobalContext;
34 class StreamID;
35 
36 struct InputFile;
37 struct InputChunk;
38 
39 namespace edm {
40  class PathsAndConsumesOfModulesBase;
41  class ProcessContext;
42 }
43 
44 namespace Json{
45  class Value;
46 }
47 
48 namespace jsoncollector {
49 class DataPointDefinition;
50 }
51 
52 namespace edm {
54 }
55 
56 namespace evf{
57 
59 
61 
63  {
64  public:
65 
66  enum FileStatus { noFile, sameFile, newFile, newLumi, runEnded, runAbort };
67 
69  ~EvFDaqDirector();
70  void initRun();
71  static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
72  void preallocate(edm::service::SystemBounds const& bounds);
73  void preBeginJob(edm::PathsAndConsumesOfModulesBase const&, edm::ProcessContext const&);
74  void preBeginRun(edm::GlobalContext const& globalContext);
75  void postEndRun(edm::GlobalContext const& globalContext);
76  void preGlobalEndLumi(edm::GlobalContext const& globalContext);
77  void overrideRunNumber(unsigned int run) {run_=run;}
78  std::string &baseRunDir(){return run_dir_;}
79  std::string &buBaseRunDir(){return bu_run_dir_;}
80  std::string &buBaseRunOpenDir(){return bu_run_open_dir_;}
81  bool useFileBroker() const {return useFileBroker_;}
82 
83  std::string findCurrentRunDir(){ return dirManager_.findRunDir(run_);}
84  std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
85  std::string getRawFilePath(const unsigned int ls, const unsigned int index) const;
86  std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const;
87  std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const;
88  std::string getDatFilePath(const unsigned int ls, std::string const& stream) const;
89  std::string getOpenDatFilePath(const unsigned int ls, std::string const& stream) const;
90  std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
91  std::string getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const;
92  std::string getMergedDatFilePath(const unsigned int ls, std::string const& stream) const;
93  std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const;
94  std::string getOpenInitFilePath(std::string const& stream) const;
95  std::string getInitFilePath(std::string const& stream) const;
96  std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
97  std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
98  std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const& stream) const;
99  std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
100  std::string getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
101  std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const& stream) const;
102  std::string getEoLSFilePathOnBU(const unsigned int ls) const;
103  std::string getEoLSFilePathOnFU(const unsigned int ls) const;
104  std::string getBoLSFilePathOnFU(const unsigned int ls) const;
105  std::string getEoRFilePath() const;
106  std::string getEoRFilePathOnFU() const;
107  std::string getFFFParamsFilePathOnBU() const;
108  std::string getRunOpenDirPath() const {return run_dir_ +"/open";}
109  bool outputAdler32Recheck() const {return outputAdler32Recheck_;}
110  void removeFile(unsigned int ls, unsigned int index);
111  void removeFile(std::string );
112 
113  FileStatus updateFuLock(unsigned int& ls, std::string& nextFile, uint32_t& fsize, uint64_t& lockWaitTime);
114  void tryInitializeFuLockFile();
115  unsigned int getRunNumber() const { return run_; }
116  void lockInitLock();
117  void unlockInitLock();
118  void setFMS(evf::FastMonitoringService* fms) {fms_=fms;}
119  bool isSingleStreamThread() {return nStreams_==1 && nThreads_==1;}
120  void lockFULocal();
121  void unlockFULocal();
122  void lockFULocal2();
123  void unlockFULocal2();
124  void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const;
125  void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS = true);
126  int grabNextJsonFile(std::string const& jsonSourcePath, std::string const& rawSourcePath, int64_t& fileSizeFromJson, bool& fileFound);
127  int grabNextJsonFileAndUnlock(boost::filesystem::path const& jsonSourcePath);
128 
129  EvFDaqDirector::FileStatus contactFileBroker(unsigned int& serverHttpStatus, bool& serverState,
130  uint32_t& serverLS, uint32_t& closedServerLS,
131  std::string& nextFileJson, std::string& nextFileRaw, int maxLS);
132 
133  FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int& ls, std::string& nextFile,
134  int& serverEventsInNewFile_, int64_t& fileSize, uint64_t& thisLockWaitTimeUs);
135  void createRunOpendirMaybe();
136  void createProcessingNotificationMaybe() const;
137  int readLastLSEntry(std::string const& file);
138  unsigned int getLumisectionToStart() const;
139  void setDeleteTracking( std::mutex* fileDeleteLock,std::list<std::pair<int,InputFile*>> *filesToDelete) {
140  fileDeleteLockPtr_=fileDeleteLock;
141  filesToDeletePtr_ = filesToDelete;
142  }
143  void checkTransferSystemPSet(edm::ProcessContext const& pc);
144  void checkMergeTypePSet(edm::ProcessContext const& pc);
145  std::string getStreamDestinations(std::string const& stream) const;
146  std::string getStreamMergeType(std::string const& stream, MergeType defaultType);
147  static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid);
148 
149  private:
150  bool bumpFile(unsigned int& ls, unsigned int& index, std::string& nextFile, uint32_t& fsize, int maxLS);
151  void openFULockfileStream(bool create);
152  std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const;
153  std::string outputFileNameStem(const unsigned int ls, std::string const& stream) const;
154  std::string mergedFileNameStem(const unsigned int ls, std::string const& stream) const;
155  std::string initFileName(std::string const& stream) const;
156  std::string eolsFileName(const unsigned int ls) const;
157  std::string eorFileName() const;
158  int getNFilesFromEoLS(std::string BUEoLSFile);
159 
163  unsigned int run_;
169  unsigned int startFromLS_ = 1;
174  unsigned int fuLockPollInterval_;
176 
185 
191 
197 
199 
200  unsigned long previousFileSize_;
201 
202  struct flock bu_w_flk;
203  struct flock bu_r_flk;
204  struct flock bu_w_fulk;
205  struct flock bu_r_fulk;
206  struct flock fu_rw_flk;
207  struct flock fu_rw_fulk;
208 
209  evf::FastMonitoringService * fms_ = nullptr;
210 
211  std::mutex *fileDeleteLockPtr_ = nullptr;
212  std::list<std::pair<int,InputFile*>> *filesToDeletePtr_ = nullptr;
213 
214  pthread_mutex_t init_lock_ = PTHREAD_MUTEX_INITIALIZER;
215 
216  unsigned int nStreams_=0;
217  unsigned int nThreads_=0;
218 
219  bool readEolsDefinition_ = true;
220  unsigned int eolsNFilesIndex_ = 1;
223  unsigned int stop_ls_override_ = 0;
224 
225  std::shared_ptr<Json::Value> transferSystemJson_;
226  tbb::concurrent_hash_map<std::string,std::string> mergeTypeMap_;
227 
228  //values initialized in .cc file
229  static const std::vector<std::string> MergeTypeNames_;
230 
231 
232  //json parser
234 
235  boost::asio::io_service io_service_;
236  std::unique_ptr<boost::asio::ip::tcp::resolver> resolver_;
237  std::unique_ptr<boost::asio::ip::tcp::resolver::query> query_;
238  std::unique_ptr<boost::asio::ip::tcp::resolver::iterator> endpoint_iterator_;
239  std::unique_ptr<boost::asio::ip::tcp::socket> socket_;
240  //boost::asio::io_context io_context_;
241  //tcp::resolver resolver_;
242  //tcp::resolver::results_type endpoints_;
243 
244  };
245 }
246 
247 #endif
248 
std::string & buBaseRunDir()
static boost::mutex mutex
Definition: Proxy.cc:11
Definition: fillJson.h:27
std::string run_string_
boost::asio::io_service io_service_
std::string fulockfile_
def create(alignables, pedeDump, additionalData, outputFile, config)
std::shared_ptr< Json::Value > transferSystemJson_
jsoncollector::DataPointDefinition * dpd_
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
Represents a JSON value.
Definition: value.h:111
unsigned long previousFileSize_
bool useFileBroker() const
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
void overrideRunNumber(unsigned int run)
std::string hltSourceDirectory_
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
std::string mergeTypePset_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
bool outputAdler32Recheck() const
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
std::string stopFilePath_
std::string bu_base_dir_
std::string findCurrentRunDir()
std::string selectedTransferMode_
std::string & buBaseRunOpenDir()
JSON (JavaScript Object Notation).
std::string eorFileName(const unsigned int run)
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string stopFilePathPid_
std::string & baseRunDir()
std::string run_nstring_
def ls(path, rec=False)
Definition: eostools.py:349
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
unsigned long long uint64_t
Definition: Time.h:15
std::string bu_run_dir_
HLT enums.
static const std::vector< std::string > MergeTypeNames_
unsigned int getRunNumber() const
std::string fileBrokerPort_
std::string getRunOpenDirPath() const
void setFMS(evf::FastMonitoringService *fms)
std::string fileBrokerHost_
unsigned int fuLockPollInterval_
std::string eolsFileName(const unsigned int run, const unsigned int ls)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
std::string bu_run_open_dir_