CMS 3D CMS Logo

EvFDaqDirector.cc
Go to the documentation of this file.
10 
18 
19 #include <iostream>
20 #include <fstream>
21 #include <sstream>
22 #include <sys/time.h>
23 #include <unistd.h>
24 #include <cstdio>
25 #include <boost/lexical_cast.hpp>
26 #include <boost/algorithm/string.hpp>
27 
28 //using boost::asio::ip::tcp;
29 
30 //#define DEBUG
31 
32 using namespace jsoncollector;
33 
34 namespace evf {
35 
36  //for enum MergeType
37  const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
38 
40  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
41  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
42  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
43  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
44  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
45  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
46  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
47  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
48  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
49  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
50  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
51  requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
52  selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
53  mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
54  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
55  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
56  hostname_(""),
57  bu_readlock_fd_(-1),
58  bu_writelock_fd_(-1),
59  fu_readwritelock_fd_(-1),
60  fulocal_rwlock_fd_(-1),
61  fulocal_rwlock_fd2_(-1),
62  bu_w_lock_stream(nullptr),
63  bu_r_lock_stream(nullptr),
64  fu_rw_lock_stream(nullptr),
65  dirManager_(base_dir_),
66  previousFileSize_(0),
67  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
68  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
69  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
70  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
71  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
72  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
78 
79  //save hostname for later
80  char hostname[33];
81  gethostname(hostname, 32);
82  hostname_ = hostname;
83 
84  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
85  if (fuLockPollIntervalPtr) {
86  try {
87  fuLockPollInterval_ = boost::lexical_cast<unsigned int>(std::string(fuLockPollIntervalPtr));
88  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
89  << " us";
90  } catch (boost::bad_lexical_cast const&) {
91  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
92  }
93  }
94 
95  //override file service parameter if specified by environment
96  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
97  if (fileBrokerParamPtr) {
98  try {
99  useFileBroker_ = (boost::lexical_cast<unsigned int>(std::string(fileBrokerParamPtr))) > 0;
100  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
101  } catch (boost::bad_lexical_cast const&) {
102  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
103  }
104  }
105  if (useFileBroker_) {
107  //find BU data address from hltd configuration
109  struct stat buf;
110  if (stat("/etc/appliance/bus.config", &buf) == 0) {
111  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
112  std::getline(busconfig, fileBrokerHost_);
113  }
114  if (fileBrokerHost_.empty())
115  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
116  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
117  throw cms::Exception("EvFDaqDirector")
118  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
119 
120  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
121  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
122  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
123  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
124  }
125 
126  char* startFromLSPtr = std::getenv("FFF_STARTFROMLS");
127  if (startFromLSPtr) {
128  try {
129  startFromLS_ = boost::lexical_cast<unsigned int>(std::string(startFromLSPtr));
130  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
131  } catch (boost::bad_lexical_cast const&) {
132  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
133  }
134  }
135 
136  //override file service parameter if specified by environment
137  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
138  if (fileBrokerUseLockParamPtr) {
139  try {
140  fileBrokerUseLocalLock_ = (boost::lexical_cast<unsigned int>(std::string(fileBrokerUseLockParamPtr))) > 0;
141  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
143  } catch (boost::bad_lexical_cast const&) {
144  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
145  }
146  }
147  }
148 
150  std::stringstream ss;
151  ss << "run" << std::setfill('0') << std::setw(6) << run_;
152  run_string_ = ss.str();
153  ss = std::stringstream();
154  ss << run_;
155  run_nstring_ = ss.str();
156  run_dir_ = base_dir_ + "/" + run_string_;
157  ss = std::stringstream();
158  ss << getpid();
159  pid_ = ss.str();
160 
161  // check if base dir exists or create it accordingly
162  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
163  if (retval != 0 && errno != EEXIST) {
164  throw cms::Exception("DaqDirector")
165  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
166  }
167 
168  //create run dir in base dir
169  umask(0);
170  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
171  if (retval != 0 && errno != EEXIST) {
172  throw cms::Exception("DaqDirector")
173  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
174  }
175 
176  //create fu-local.lock in run open dir
177  if (!directorBU_) {
179  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
181  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
182  if (fulocal_rwlock_fd_ == -1)
183  throw cms::Exception("DaqDirector")
184  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
185  chmod(fulocal_lock_.c_str(), 0777);
186  fsync(fulocal_rwlock_fd_);
187  //open second fd for another input source thread
189  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
190  if (fulocal_rwlock_fd2_ == -1)
191  throw cms::Exception("DaqDirector")
192  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
193  }
194 
195  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
196  //for BU, it is created at this point
197  if (directorBU_) {
199  std::string bulockfile = bu_run_dir_ + "/bu.lock";
200  fulockfile_ = bu_run_dir_ + "/fu.lock";
201 
202  //make or find bu run dir
203  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
204  if (retval != 0 && errno != EEXIST) {
205  throw cms::Exception("DaqDirector")
206  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno) << "\n";
207  }
208  bu_run_open_dir_ = bu_run_dir_ + "/open";
209  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
210  if (retval != 0 && errno != EEXIST) {
211  throw cms::Exception("DaqDirector")
212  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno) << "\n";
213  }
214 
215  // the BU director does not need to know about the fu lock
216  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
217  if (bu_writelock_fd_ == -1)
218  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
219  else
220  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
221  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
222  if (bu_w_lock_stream == nullptr)
223  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
224 
225  // BU INITIALIZES LOCK FILE
226  // FU LOCK FILE OPEN
227  openFULockfileStream(true);
229  fflush(fu_rw_lock_stream);
230  close(fu_readwritelock_fd_);
231 
232  if (!hltSourceDirectory_.empty()) {
233  struct stat buf;
234  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
235  std::string hltdir = bu_run_dir_ + "/hlt";
236  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
237  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
238  if (retval != 0 && errno != EEXIST)
239  throw cms::Exception("DaqDirector")
240  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno) << "\n";
241 
242  std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
243  std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
244 
245  std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
246  for (auto& optfile : optfiles) {
247  try {
248  std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
249  } catch (...) {
250  }
251  }
252 
253  std::filesystem::rename(tmphltdir, hltdir);
254  } else
255  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
256  }
257  //else{}//no configuration specified
258  } else {
259  // for FU, check if bu base dir exists
260 
261  retval = mkdir(bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
262  if (retval != 0 && errno != EEXIST) {
263  throw cms::Exception("DaqDirector")
264  << " Error checking for bu base dir -: " << bu_base_dir_ << " mkdir error:" << strerror(errno) << "\n";
265  }
266 
268  fulockfile_ = bu_run_dir_ + "/fu.lock";
269  openFULockfileStream(false);
270  }
271 
272  pthread_mutex_init(&init_lock_, nullptr);
273 
274  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
275  std::stringstream sstp;
276  sstp << stopFilePath_ << "_pid" << pid_;
277  stopFilePathPid_ = sstp.str();
278 
279  if (!directorBU_) {
280  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
281  struct stat statbuf;
282  if (!stat(defPath.c_str(), &statbuf))
283  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
284  else {
285  //look in source directory if not present in ramdisk
286  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
287  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
288  if (stat(defPath.c_str(), &statbuf)) {
289  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
290  if (stat(defPath.c_str(), &statbuf)) {
291  defPath = defPathSuffix;
292  }
293  }
294  }
295  dpd_ = new DataPointDefinition();
296  std::string defLabel = "data";
297  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
298  }
299  }
300 
302  //close server connection
303  if (socket_.get() && socket_->is_open()) {
304  boost::system::error_code ec;
305  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
306  socket_->close(ec);
307  }
308 
309  if (fulocal_rwlock_fd_ != -1) {
310  unlockFULocal();
311  close(fulocal_rwlock_fd_);
312  }
313 
314  if (fulocal_rwlock_fd2_ != -1) {
315  unlockFULocal2();
316  close(fulocal_rwlock_fd2_);
317  }
318  }
319 
321  initRun();
322 
323  nThreads_ = bounds.maxNumberOfStreams();
324  nStreams_ = bounds.maxNumberOfThreads();
325  }
326 
329  desc.setComment(
330  "Service used for file locking arbitration and for propagating information between other EvF components");
331  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
332  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
333  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
334  desc.addUntracked<bool>("useFileBroker", false)
335  ->setComment("Use BU file service to grab input data instead of NFS file locking");
336  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
337  ->setComment("Allow service to discover BU address from hltd configuration");
338  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
339  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
340  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
341  ->setComment("Use keep alive to avoid using large number of sockets");
342  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
343  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
344  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
345  ->setComment("Lock polling interval in microseconds for the input directory file lock");
346  desc.addUntracked<bool>("outputAdler32Recheck", false)
347  ->setComment("Check Adler32 of per-process output files while micro-merging");
348  desc.addUntracked<bool>("requireTransfersPSet", false)
349  ->setComment("Require complete transferSystem PSet in the process configuration");
350  desc.addUntracked<std::string>("selectedTransferMode", "")
351  ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
352  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
353  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
354  desc.addUntracked<std::string>("mergingPset", "")
355  ->setComment("Name of merging PSet to look for merging type definitions for streams");
356  descriptions.add("EvFDaqDirector", desc);
357  }
358 
361  checkMergeTypePSet(pc);
362  }
363 
364  void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
365  //assert(run_ == id.run());
366 
367  // check if the requested run is the latest one - issue a warning if it isn't
369  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
370  << ". This is not the highest run " << dirManager_.findHighestRunDir();
371  }
372  }
373 
374  void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
375  close(bu_readlock_fd_);
376  close(bu_writelock_fd_);
377  if (directorBU_) {
378  std::string filename = bu_run_dir_ + "/bu.lock";
380  }
381  }
382 
384  //delete all files belonging to just closed lumi
385  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
387  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
388  return;
389  }
390 
391  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
392  auto it = filesToDeletePtr_->begin();
393  while (it != filesToDeletePtr_->end()) {
394  if (it->second->lumi_ == ls) {
395  it = filesToDeletePtr_->erase(it);
396  } else
397  it++;
398  }
399  }
400 
401  std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
403  }
404 
405  std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
407  }
408 
409  std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
410  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
411  }
412 
413  std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
414  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
415  }
416 
417  std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
419  }
420 
423  }
424 
427  }
428 
431  }
432 
435  }
436 
439  }
440 
442  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
443  }
444 
447  }
448 
450  std::string const& stream) const {
452  }
453 
455  std::string const& stream) const {
457  }
458 
460  std::string const& stream) const {
462  }
463 
466  }
467 
470  }
471 
474  }
475 
477  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
478  }
479 
481  return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
482  }
483 
485  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
486  }
487 
489 
491 
492  std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
493 
495  int retval = remove(filename.c_str());
496  if (retval != 0)
497  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
498  << ". error = " << strerror(errno);
499  }
500 
501  void EvFDaqDirector::removeFile(unsigned int ls, unsigned int index) { removeFile(getRawFilePath(ls, index)); }
502 
504  std::string& nextFile,
505  uint32_t& fsize,
506  uint16_t& rawHeaderSize,
507  uint64_t& lockWaitTime,
508  bool& setExceptionState) {
509  EvFDaqDirector::FileStatus fileStatus = noFile;
510  rawHeaderSize = 0;
511 
512  int retval = -1;
513  int lock_attempts = 0;
514  long total_lock_attempts = 0;
515 
516  struct stat buf;
517  int stopFileLS = -1;
518  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
519  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
520  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
521  if (stopFileCheck == 0)
522  stopFileLS = readLastLSEntry(stopFilePath_);
523  else
524  stopFileLS = 1; //stop without drain if only pid is stopped
525  if (!stop_ls_override_) {
526  //if lumisection is higher than in stop file, should quit at next from current
527  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
528  stopFileLS = stop_ls_override_ = ls;
529  } else
530  stopFileLS = stop_ls_override_;
531  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
532  << stopFileLS;
533  //return runEnded;
534  } else //if file was removed before reaching stop condition, reset this
535  stop_ls_override_ = 0;
536 
537  timeval ts_lockbegin;
538  gettimeofday(&ts_lockbegin, nullptr);
539 
540  while (retval == -1) {
541  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
542  if (retval == -1)
543  usleep(fuLockPollInterval_);
544  else
545  continue;
546 
547  lock_attempts += fuLockPollInterval_;
548  total_lock_attempts += fuLockPollInterval_;
549  if (lock_attempts > 5000000 || errno == 116) {
550  if (errno == 116)
551  edm::LogWarning("EvFDaqDirector")
552  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
553  else
554  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
555  "fu.lock file are present -: errno "
556  << errno << ":" << strerror(errno) << std::endl;
557 
558  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
559  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
560  ls++;
561  return noFile;
562  }
563 
564  if (stat(bu_run_dir_.c_str(), &buf) != 0)
565  return runEnded;
566  if (stat(fulockfile_.c_str(), &buf) != 0)
567  return runEnded;
568 
569  lock_attempts = 0;
570  }
571  if (total_lock_attempts > 5 * 60000000) {
572  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
573  return runAbort;
574  }
575  }
576 
577  timeval ts_lockend;
578  gettimeofday(&ts_lockend, nullptr);
579  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
580  if (deltat > 0.)
581  lockWaitTime = deltat;
582 
583  if (retval != 0)
584  return fileStatus;
585 
586 #ifdef DEBUG
587  timeval ts_lockend;
588  gettimeofday(&ts_lockend, 0);
589 #endif
590 
591  //open another lock file FD after the lock using main fd has been acquired
592  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
593  if (fu_readwritelock_fd2 == -1)
594  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
595  << " create. error:" << strerror(errno);
596 
597  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
598 
599  // if the stream is readable
600  if (fu_rw_lock_stream2 != nullptr) {
601  unsigned int readLs, readIndex;
602  int check = 0;
603  // rewind the stream
604  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
605  // if rewinded ok
606  if (check == 0) {
607  // read its' values
608  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
609  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
610 
611  unsigned int currentLs = readLs;
612  bool bumpedOk = false;
613  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
614  //no lock file write in this case
615  if (ls && ls + 1 < currentLs)
616  ls++;
617  else {
618  // try to bump (look for new index or EoLS file)
619  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
620  //avoid 2 lumisections jump
621  if (ls && readLs > currentLs && currentLs > ls) {
622  ls++;
623  readLs = currentLs = ls;
624  readIndex = 0;
625  bumpedOk = false;
626  //no write to lock file
627  } else {
628  if (ls == 0 && readLs > currentLs) {
629  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
630  //in this case there is no new file in the same LS
631  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
632  readLs = currentLs;
633  readIndex = 0;
634  bumpedOk = false;
635  //no write to lock file
636  }
637  //update return LS value
638  ls = readLs;
639  }
640  }
641  if (bumpedOk) {
642  // there is a new index file to grab, lock file needs to be updated
643  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
644  if (check == 0) {
645  ftruncate(fu_readwritelock_fd2, 0);
646  // write next index in the file, which is the file the next process should take
647  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
648  fflush(fu_rw_lock_stream2);
649  fsync(fu_readwritelock_fd2);
650  fileStatus = newFile;
651  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
652  } else {
653  edm::LogError("EvFDaqDirector")
654  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
655  setExceptionState = true;
656  return noFile;
657  }
658  } else if (currentLs < readLs) {
659  //there is no new file in next LS (yet), but lock file can be updated to the next LS
660  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
661  if (check == 0) {
662  ftruncate(fu_readwritelock_fd2, 0);
663  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
664  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
665  fflush(fu_rw_lock_stream2);
666  fsync(fu_readwritelock_fd2);
667  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
668  } else {
669  edm::LogError("EvFDaqDirector")
670  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
671  setExceptionState = true;
672  return noFile;
673  }
674  }
675  } else {
676  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
677  << strerror(errno);
678  }
679  } else {
680  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
681  }
682  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
683 
684 #ifdef DEBUG
685  timeval ts_preunlock;
686  gettimeofday(&ts_preunlock, 0);
687  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
688  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
689 #endif
690 
691  //if new json is present, lock file which FedRawDataInputSource will later unlock
692  if (fileStatus == newFile)
693  lockFULocal();
694 
695  //release lock at this point
696  int retvalu = -1;
697  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
698  if (retvalu == -1)
699  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
700 
701 #ifdef DEBUG
702  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
703 #endif
704 
705  if (fileStatus == noFile) {
706  struct stat buf;
707  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
708  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
709  fileStatus = runEnded;
710  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
711  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
712  fileStatus = runEnded;
713  }
714  }
715  return fileStatus;
716  }
717 
719  std::ifstream ij(BUEoLSFile);
720  Json::Value deserializeRoot;
722 
723  if (!reader.parse(ij, deserializeRoot)) {
724  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
725  return -1;
726  }
727 
729  DataPoint dp;
730  dp.deserialize(deserializeRoot);
731 
732  //read definition
733  if (readEolsDefinition_) {
734  //std::string def = boost::algorithm::trim(dp.getDefinition());
735  std::string def = dp.getDefinition();
736  if (def.empty())
737  readEolsDefinition_ = false;
738  while (!def.empty()) {
740  if (def.find('/') == 0)
741  fullpath = def;
742  else
743  fullpath = bu_run_dir_ + '/' + def;
744  struct stat buf;
745  if (stat(fullpath.c_str(), &buf) == 0) {
746  DataPointDefinition eolsDpd;
747  std::string defLabel = "legend";
748  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
749  if (eolsDpd.getNames().empty()) {
750  //try with "data" label if "legend" format is not used
751  eolsDpd = DataPointDefinition();
752  defLabel = "data";
753  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
754  }
755  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
756  if (eolsDpd.getNames().at(i) == "NFiles")
758  readEolsDefinition_ = false;
759  break;
760  }
761  //check if we can still find definition
762  if (def.size() <= 1 || def.find('/') == std::string::npos) {
763  readEolsDefinition_ = false;
764  break;
765  }
766  def = def.substr(def.find('/') + 1);
767  }
768  }
769 
770  if (dp.getData().size() > eolsNFilesIndex_)
771  data = dp.getData()[eolsNFilesIndex_];
772  else {
773  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
774  return -1;
775  }
776  return boost::lexical_cast<int>(data);
777  }
778 
779  bool EvFDaqDirector::bumpFile(unsigned int& ls,
780  unsigned int& index,
781  std::string& nextFile,
782  uint32_t& fsize,
783  uint16_t& rawHeaderSize,
784  int maxLS,
785  bool& setExceptionState) {
786  if (previousFileSize_ != 0) {
787  if (!fms_) {
789  }
790  if (fms_)
792  previousFileSize_ = 0;
793  }
794  nextFile = "";
795 
796  //reached limit
797  if (maxLS >= 0 && ls > (unsigned int)maxLS)
798  return false;
799 
800  struct stat buf;
801  std::stringstream ss;
802  unsigned int nextIndex = index;
803  nextIndex++;
804 
805  // 1. Check suggested file
806  std::string nextFileJson = getInputJsonFilePath(ls, index);
807  if (stat(nextFileJson.c_str(), &buf) == 0) {
808  fsize = previousFileSize_ = buf.st_size;
809  nextFile = nextFileJson;
810  return true;
811  }
812  // 2. No file -> lumi ended? (and how many?)
813  else {
814  // 3. No file -> check for standalone raw file
815  std::string nextFileRaw = getRawFilePath(ls, index);
816  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
817  fsize = previousFileSize_ = buf.st_size;
818  nextFile = nextFileRaw;
819  return true;
820  }
821 
822  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
823 
824  if (stat(BUEoLSFile.c_str(), &buf) == 0) {
825  // recheck that no raw file appeared in the meantime
826  if (stat(nextFileJson.c_str(), &buf) == 0) {
827  fsize = previousFileSize_ = buf.st_size;
828  nextFile = nextFileJson;
829  return true;
830  }
831  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
832  fsize = previousFileSize_ = buf.st_size;
833  nextFile = nextFileRaw;
834  return true;
835  }
836 
837  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
838  if (indexFilesInLS < 0)
839  //parsing failed
840  return false;
841  else {
842  //check index
843  if ((int)index < indexFilesInLS) {
844  //we have 2 files, and check for 1 failed... retry (2 will never be here)
845  edm::LogError("EvFDaqDirector")
846  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
847  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
848  setExceptionState = true;
849  return false;
850  }
851  }
852  // this lumi ended, check for files
853  ++ls;
854  index = 0;
855 
856  //reached limit
857  if (maxLS >= 0 && ls > (unsigned int)maxLS)
858  return false;
859 
860  nextFileJson = getInputJsonFilePath(ls, 0);
861  nextFileRaw = getRawFilePath(ls, 0);
862  if (stat(nextFileJson.c_str(), &buf) == 0) {
863  // a new file was found at new lumisection, index 0
864  fsize = previousFileSize_ = buf.st_size;
865  nextFile = nextFileJson;
866  return true;
867  }
868  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
869  fsize = previousFileSize_ = buf.st_size;
870  nextFile = nextFileRaw;
871  return true;
872  }
873  return false;
874  }
875  }
876  // no new file found
877  return false;
878  }
879 
881  if (fu_rw_lock_stream == nullptr)
882  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
883  else {
884  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
885  unsigned int readLs = 1, readIndex = 0;
886  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
887  }
888  }
889 
891  if (create) {
893  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
894  chmod(fulockfile_.c_str(), 0766);
895  } else {
896  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
897  }
898  if (fu_readwritelock_fd_ == -1)
899  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
900  << " create:" << create << " error:" << strerror(errno);
901  else
902  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
903 
904  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
905  if (fu_rw_lock_stream == nullptr)
906  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
907  }
908 
909  void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
910 
911  void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
912 
914  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
915  flock(fulocal_rwlock_fd_, LOCK_SH);
916  }
917 
919  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
920  flock(fulocal_rwlock_fd_, LOCK_UN);
921  }
922 
924  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
925  flock(fulocal_rwlock_fd2_, LOCK_EX);
926  }
927 
929  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
930  flock(fulocal_rwlock_fd2_, LOCK_UN);
931  }
932 
933  void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
934  //used for backpressure mechanisms and monitoring
935  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
936  struct stat buf;
937  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
938  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
939  close(bol_fd);
940  }
941  }
942 
943  void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
944  const uint32_t currentLumiSection,
945  bool doCreateBoLS) {
946  if (currentLumiSection > 0) {
947  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
948  struct stat buf;
949  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
950  if (!found) {
951  int eol_fd = open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
952  close(eol_fd);
953  if (doCreateBoLS)
954  createBoLSFile(lumiSection, false);
955  }
956  } else if (doCreateBoLS) {
957  createBoLSFile(lumiSection, true); //needed for initial lumisection
958  }
959  }
960 
962  int& rawFd,
963  uint16_t& rawHeaderSize,
964  uint32_t& lsFromHeader,
965  int32_t& eventsFromHeader,
966  int64_t& fileSizeFromHeader,
967  bool requireHeader,
968  bool retry,
969  bool closeFile) {
970  int infile;
971 
972  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
973  if (retry) {
974  edm::LogWarning("EvFDaqDirector")
975  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
976  return parseFRDFileHeader(rawSourcePath,
977  rawFd,
978  rawHeaderSize,
979  lsFromHeader,
980  eventsFromHeader,
981  fileSizeFromHeader,
982  requireHeader,
983  false,
984  closeFile);
985  } else {
986  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
987  edm::LogError("EvFDaqDirector")
988  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
989  if (errno == ENOENT)
990  return 1; // error && file not found
991  else
992  return -1;
993  }
994  }
995  }
996 
997  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
998  FRDFileHeader_v1 fileHead;
999 
1000  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1001  if (closeFile) {
1002  close(infile);
1003  infile = -1;
1004  }
1005 
1006  if (sz_read < 0) {
1007  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - unable to read " << rawSourcePath << " : "
1008  << strerror(errno);
1009  if (infile != -1)
1010  close(infile);
1011  return -1;
1012  }
1013  if ((size_t)sz_read < buf_sz) {
1014  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - file smaller than header: " << rawSourcePath;
1015  if (infile != -1)
1016  close(infile);
1017  return -1;
1018  }
1019 
1020  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1021 
1022  if (frd_version == 0) {
1023  //no header (specific sequence not detected)
1024  if (requireHeader) {
1025  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1026  if (infile != -1)
1027  close(infile);
1028  return -1;
1029  } else {
1030  //no header, but valid file
1031  lseek(infile, 0, SEEK_SET);
1032  rawHeaderSize = 0;
1033  lsFromHeader = 0;
1034  eventsFromHeader = -1;
1035  fileSizeFromHeader = -1;
1036  }
1037  } else {
1038  //version 1 header
1039  uint32_t headerSizeRaw = fileHead.headerSize_;
1040  if (headerSizeRaw < buf_sz) {
1041  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1042  << " v:" << frd_version;
1043  if (infile != -1)
1044  close(infile);
1045  return -1;
1046  }
1047  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1048  lsFromHeader = fileHead.lumiSection_;
1049  eventsFromHeader = (int32_t)fileHead.eventCount_;
1050  fileSizeFromHeader = (int64_t)fileHead.fileSize_;
1051  rawHeaderSize = fileHead.headerSize_;
1052  }
1053  rawFd = infile;
1054  return 0; //OK
1055  }
1056 
1057  bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
1058  int infile;
1059  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1060  edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1061  << strerror(errno);
1062  return false;
1063  }
1064  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
1065  FRDFileHeader_v1 fileHead;
1066 
1067  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1068 
1069  if (sz_read < 0) {
1070  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << rawSourcePath << " : "
1071  << strerror(errno);
1072  if (infile != -1)
1073  close(infile);
1074  return false;
1075  }
1076  if ((size_t)sz_read < buf_sz) {
1077  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << rawSourcePath;
1078  if (infile != -1)
1079  close(infile);
1080  return false;
1081  }
1082 
1083  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1084 
1085  close(infile);
1086 
1087  if (frd_version > 0) {
1088  rawHeaderSize = fileHead.headerSize_;
1089  return true;
1090  }
1091 
1092  rawHeaderSize = 0;
1093  return false;
1094  }
1095 
1097  int& rawFd,
1098  uint16_t& rawHeaderSize,
1099  int64_t& fileSizeFromHeader,
1100  bool& fileFound,
1101  uint32_t serverLS,
1102  bool closeFile) {
1103  fileFound = true;
1104 
1105  //take only first three tokens delimited by "_" in the renamed raw file name
1106  std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1107  size_t pos = 0, n_tokens = 0;
1108  while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1109  }
1110  std::string reducedJsonStem = jsonStem.substr(0, pos);
1111 
1112  std::ostringstream fileNameWithPID;
1113  //should be ported to use fffnaming
1114  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1115 
1116  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1117 
1118  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1119 
1120  //parse RAW file header if it exists
1121  uint32_t lsFromRaw;
1122  int32_t nbEventsWrittenRaw;
1123  int64_t fileSizeFromRaw;
1124  auto ret = parseFRDFileHeader(
1125  rawSourcePath, rawFd, rawHeaderSize, lsFromRaw, nbEventsWrittenRaw, fileSizeFromRaw, true, true, closeFile);
1126  if (ret != 0) {
1127  if (ret == 1)
1128  fileFound = false;
1129  return -1;
1130  }
1131 
1132  int outfile;
1133  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1134  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1135  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1136  if (errno == EEXIST) {
1137  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1138  << " : ";
1139  return -1;
1140  }
1141  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1142  << strerror(errno);
1143  struct stat out_stat;
1144  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1145  edm::LogWarning("EvFDaqDirector")
1146  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1147  << jsonDestPath;
1148  if (unlink(jsonDestPath.c_str()) == -1) {
1149  edm::LogWarning("EvFDaqDirector")
1150  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1151  }
1152  }
1153  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1154  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1155  << jsonDestPath << " : " << strerror(errno);
1156  return -1;
1157  }
1158  }
1159  //write JSON file (TODO: use jsoncpp)
1160  std::stringstream ss;
1161  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1162  std::string sstr = ss.str();
1163 
1164  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1165  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1166  << " : " << strerror(errno);
1167  return -1;
1168  }
1169  close(outfile);
1170  if (serverLS && serverLS != lsFromRaw)
1171  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1172  << " and raw file header LS " << lsFromRaw;
1173 
1174  fileSizeFromHeader = fileSizeFromRaw;
1175  return nbEventsWrittenRaw;
1176  }
1177 
1179  std::string const& rawSourcePath,
1180  int64_t& fileSizeFromJson,
1181  bool& fileFound) {
1182  fileFound = true;
1183 
1184  //should be ported to use fffnaming
1185  std::ostringstream fileNameWithPID;
1186  fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1187  << std::setw(5) << pid_ << ".jsn";
1188 
1189  // assemble json destination path
1190  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1191 
1192  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1193 
1194  int infile = -1, outfile = -1;
1195 
1196  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1197  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1198  << strerror(errno);
1199  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1200  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1201  << jsonSourcePath << " : " << strerror(errno);
1202  if (errno == ENOENT)
1203  fileFound = false;
1204  return -1;
1205  }
1206  }
1207 
1208  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1209  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1210  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1211  if (errno == EEXIST) {
1212  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1213  << " : ";
1214  ::close(infile);
1215  return -1;
1216  }
1217  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1218  << strerror(errno);
1219  struct stat out_stat;
1220  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1221  edm::LogWarning("EvFDaqDirector")
1222  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1223  if (unlink(jsonDestPath.c_str()) == -1) {
1224  edm::LogWarning("EvFDaqDirector")
1225  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1226  }
1227  }
1228  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1229  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1230  << jsonDestPath << " : " << strerror(errno);
1231  ::close(infile);
1232  return -1;
1233  }
1234  }
1235  //copy contents
1236  const std::size_t buf_sz = 512;
1237  std::size_t tot_written = 0;
1238  std::unique_ptr<char> buf(new char[buf_sz]);
1239 
1240  ssize_t sz, sz_read = 1, sz_write;
1241  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1242  sz_write = 0;
1243  do {
1244  assert(sz_read - sz_write > 0);
1245  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1246  sz_read = sz; // cause read loop termination
1247  break;
1248  }
1249  assert(sz > 0);
1250  sz_write += sz;
1251  tot_written += sz;
1252  } while (sz_write < sz_read);
1253  }
1254  close(infile);
1255  close(outfile);
1256 
1257  if (tot_written > 0) {
1258  //leave file if it was empty for diagnosis
1259  if (unlink(jsonSourcePath.c_str()) == -1) {
1260  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1261  << strerror(errno);
1262  return -1;
1263  }
1264  } else {
1265  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1266  << jsonSourcePath;
1267  return -1;
1268  }
1269 
1270  Json::Value deserializeRoot;
1272 
1273  std::string data;
1274  std::stringstream ss;
1275  bool result;
1276  try {
1277  if (tot_written <= buf_sz) {
1278  result = reader.parse(buf.get(), deserializeRoot);
1279  } else {
1280  //json will normally not be bigger than buf_sz bytes
1281  try {
1282  std::ifstream ij(jsonDestPath);
1283  ss << ij.rdbuf();
1284  } catch (std::filesystem::filesystem_error const& ex) {
1285  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1286  return -1;
1287  }
1288  result = reader.parse(ss.str(), deserializeRoot);
1289  }
1290  if (!result) {
1291  if (tot_written <= buf_sz)
1292  ss << buf.get();
1293  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1294  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1295  << ss.str() << ".";
1296  return -1;
1297  }
1298 
1299  //read BU JSON
1300  DataPoint dp;
1301  dp.deserialize(deserializeRoot);
1302  bool success = false;
1303  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1304  if (dpd_->getNames().at(i) == "NEvents")
1305  if (i < dp.getData().size()) {
1306  data = dp.getData()[i];
1307  success = true;
1308  break;
1309  }
1310  }
1311  if (!success) {
1312  if (!dp.getData().empty())
1313  data = dp.getData()[0];
1314  else {
1315  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1316  << "grabNextJsonFile - "
1317  << " error reading number of events from BU JSON; No input value. data -: " << data;
1318  return -1;
1319  }
1320  }
1321 
1322  //try to read raw file size
1323  fileSizeFromJson = -1;
1324  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1325  if (dpd_->getNames().at(i) == "NBytes") {
1326  if (i < dp.getData().size()) {
1327  std::string dataSize = dp.getData()[i];
1328  try {
1329  fileSizeFromJson = boost::lexical_cast<long>(dataSize);
1330  } catch (boost::bad_lexical_cast const&) {
1331  //non-fatal currently, processing can continue without this value
1332  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1333  << "Input value is -: " << dataSize;
1334  }
1335  break;
1336  }
1337  }
1338  }
1339  return boost::lexical_cast<int>(data);
1340  } catch (boost::bad_lexical_cast const& e) {
1341  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1342  << "Input value is -: " << data;
1343  } catch (std::runtime_error const& e) {
1344  //Can be thrown by Json parser
1345  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1346  }
1347 
1348  catch (std::exception const& e) {
1349  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1350  } catch (...) {
1351  //unknown exception
1352  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1353  }
1354 
1355  return -1;
1356  }
1357 
1359  std::string data;
1360  try {
1361  // assemble json destination path
1362  std::filesystem::path jsonDestPath(baseRunDir());
1363 
1364  //should be ported to use fffnaming
1365  std::ostringstream fileNameWithPID;
1366  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1367  << ".jsn";
1368  jsonDestPath /= fileNameWithPID.str();
1369 
1370  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1371  try {
1372  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1373  } catch (std::filesystem::filesystem_error const& ex) {
1374  // Input dir gone?
1375  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1376  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1377  sleep(1);
1378  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1379  }
1380  unlockFULocal();
1381 
1382  try {
1383  //sometimes this fails but file gets deleted
1384  std::filesystem::remove(jsonSourcePath);
1385  } catch (std::filesystem::filesystem_error const& ex) {
1386  // Input dir gone?
1387  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1388  } catch (std::exception const& ex) {
1389  // Input dir gone?
1390  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1391  }
1392 
1393  std::ifstream ij(jsonDestPath);
1394  Json::Value deserializeRoot;
1396 
1397  std::stringstream ss;
1398  ss << ij.rdbuf();
1399  if (!reader.parse(ss.str(), deserializeRoot)) {
1400  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1401  << "\nERROR:\n"
1402  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1403  << ss.str() << ".";
1404  throw std::runtime_error("Cannot deserialize input JSON file");
1405  }
1406 
1407  //read BU JSON
1408  std::string data;
1409  DataPoint dp;
1410  dp.deserialize(deserializeRoot);
1411  bool success = false;
1412  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1413  if (dpd_->getNames().at(i) == "NEvents")
1414  if (i < dp.getData().size()) {
1415  data = dp.getData()[i];
1416  success = true;
1417  }
1418  }
1419  if (!success) {
1420  if (!dp.getData().empty())
1421  data = dp.getData()[0];
1422  else
1423  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1424  << " error reading number of events from BU JSON -: No input value " << data;
1425  }
1426  return boost::lexical_cast<int>(data);
1427  } catch (std::filesystem::filesystem_error const& ex) {
1428  // Input dir gone?
1429  unlockFULocal();
1430  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1431  } catch (std::runtime_error const& e) {
1432  // Another process grabbed the file and NFS did not register this
1433  unlockFULocal();
1434  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1435  } catch (boost::bad_lexical_cast const&) {
1436  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1437  << "Input value is -: " << data;
1438  } catch (std::exception const& e) {
1439  // BU run directory disappeared?
1440  unlockFULocal();
1441  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1442  }
1443 
1444  return -1;
1445  }
1446 
1448  bool& serverError,
1449  uint32_t& serverLS,
1450  uint32_t& closedServerLS,
1451  std::string& nextFileJson,
1452  std::string& nextFileRaw,
1453  bool& rawHeader,
1454  int maxLS) {
1455  EvFDaqDirector::FileStatus fileStatus = noFile;
1456  serverError = false;
1457 
1458  boost::system::error_code ec;
1459  try {
1460  while (true) {
1461  //socket connect
1462  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1464 
1465  if (ec) {
1466  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1467  serverError = true;
1468  break;
1469  }
1470  }
1471 
1472  boost::asio::streambuf request;
1473  std::ostream request_stream(&request);
1474  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1475  if (maxLS >= 0) {
1476  std::stringstream spath;
1477  spath << path << "&stopls=" << maxLS;
1478  path = spath.str();
1479  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1480  }
1481  request_stream << "GET " << path << " HTTP/1.1\r\n";
1482  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1483  request_stream << "Accept: */*\r\n";
1484  request_stream << "Connection: keep-alive\r\n\r\n";
1485 
1486  boost::asio::write(*socket_, request, ec);
1487  if (ec) {
1488  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1489  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1490  //we got disconnected, try to reconnect to the server before writing the request
1492  if (ec) {
1493  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1494  serverError = true;
1495  break;
1496  }
1497  continue;
1498  }
1499  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1500  serverError = true;
1501  break;
1502  }
1503 
1504  boost::asio::streambuf response;
1505  boost::asio::read_until(*socket_, response, "\r\n", ec);
1506  if (ec) {
1507  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1508  serverError = true;
1509  break;
1510  }
1511 
1512  std::istream response_stream(&response);
1513 
1514  std::string http_version;
1515  response_stream >> http_version;
1516 
1517  response_stream >> serverHttpStatus;
1518 
1519  std::string status_message;
1520  std::getline(response_stream, status_message);
1521  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1522  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1523  serverError = true;
1524  break;
1525  }
1526  if (serverHttpStatus != 200) {
1527  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1528  serverError = true;
1529  break;
1530  }
1531 
1532  // Process the response headers.
1534  while (std::getline(response_stream, header) && header != "\r") {
1535  }
1536 
1537  std::string fileInfo;
1538  std::map<std::string, std::string> serverMap;
1539  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1540  auto pos = fileInfo.find('=');
1541  if (pos == std::string::npos)
1542  continue;
1543  auto stitle = fileInfo.substr(0, pos);
1544  auto svalue = fileInfo.substr(pos + 1);
1545  serverMap[stitle] = svalue;
1546  }
1547 
1548  //check that response run number if correct
1549  auto server_version = serverMap.find("version");
1550  assert(server_version != serverMap.end());
1551 
1552  auto server_run = serverMap.find("runnumber");
1553  assert(server_run != serverMap.end());
1554  assert(run_nstring_ == server_run->second);
1555 
1556  auto server_state = serverMap.find("state");
1557  assert(server_state != serverMap.end());
1558 
1559  auto server_eols = serverMap.find("lasteols");
1560  assert(server_eols != serverMap.end());
1561 
1562  auto server_ls = serverMap.find("lumisection");
1563 
1564  int version_maj = 1;
1565  int version_min = 0;
1566  int version_rev = 0;
1567  {
1568  auto* s_ptr = server_version->second.c_str();
1569  if (!server_version->second.empty() && server_version->second[0] == '"')
1570  s_ptr++;
1571  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1572  if (res < 3) {
1573  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1574  if (res < 2) {
1575  res = sscanf(s_ptr, "%d", &version_maj);
1576  if (res < 1) {
1577  //expecting at least 1 number (major version)
1578  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1579  }
1580  }
1581  }
1582  }
1583 
1584  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1585  if (server_ls != serverMap.end())
1586  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1587  else
1588  serverLS = closedServerLS + 1;
1589 
1590  std::string s_state = server_state->second;
1591  if (s_state == "STARTING") //initial, always empty starting with LS 1
1592  {
1593  auto server_file = serverMap.find("file");
1594  assert(server_file == serverMap.end()); //no file with starting state
1595  fileStatus = noFile;
1596  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1597  } else if (s_state == "READY") {
1598  auto server_file = serverMap.find("file");
1599  if (server_file == serverMap.end()) {
1600  //can be returned by server if files from new LS already appeared but LS is not yet closed
1601  if (serverLS <= closedServerLS)
1602  serverLS = closedServerLS + 1;
1603  fileStatus = noFile;
1604  edm::LogInfo("EvFDaqDirector")
1605  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1606  } else {
1607  std::string filestem;
1608  std::string fileprefix;
1609  auto server_fileprefix = serverMap.find("fileprefix");
1610 
1611  if (server_fileprefix != serverMap.end()) {
1612  auto pssize = server_fileprefix->second.size();
1613  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1614  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1615  else
1616  fileprefix = server_fileprefix->second;
1617  }
1618 
1619  //remove string literals
1620  auto ssize = server_file->second.size();
1621  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1622  filestem = server_file->second.substr(1, ssize - 2);
1623  else
1624  filestem = server_file->second;
1625  assert(!filestem.empty());
1626  if (version_maj > 1) {
1627  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1628  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1629  nextFileJson = "";
1630  rawHeader = true;
1631  } else {
1632  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1633  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1634  nextFileJson = filestem + ".jsn";
1635  rawHeader = false;
1636  }
1637  fileStatus = newFile;
1638  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1639  << serverLS << " file:" << filestem;
1640  }
1641  } else if (s_state == "EOLS") {
1642  serverLS = closedServerLS + 1;
1643  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1644  fileStatus = noFile;
1645  } else if (s_state == "EOR") {
1646  //server_eor = serverMap.find("iseor");
1647  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1648  fileStatus = runEnded;
1649  } else if (s_state == "NORUN") {
1650  auto err_msg = serverMap.find("errormessage");
1651  if (err_msg != serverMap.end())
1652  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1653  else
1654  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1655  edm::LogWarning("EvFDaqDirector") << "executing run end";
1656  fileStatus = runEnded;
1657  } else if (s_state == "ERROR") {
1658  auto err_msg = serverMap.find("errormessage");
1659  if (err_msg != serverMap.end())
1660  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1661  else
1662  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1663  fileStatus = noFile;
1664  serverError = true;
1665  } else {
1666  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1667  fileStatus = noFile;
1668  serverError = true;
1669  }
1670 
1671  // Read until EOF, writing data to output as we go.
1672  if (!fileBrokerKeepAlive_) {
1673  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1674  }
1675  if (ec != boost::asio::error::eof) {
1676  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1677  serverError = true;
1678  }
1679  }
1680  break;
1681  }
1682  } catch (std::exception const& e) {
1683  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1684  serverError = true;
1685  }
1686 
1687  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1688  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1689  if (ec) {
1690  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1691  }
1692  socket_->close(ec);
1693  if (ec) {
1694  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1695  }
1696  }
1697 
1698  if (serverError) {
1699  if (socket_->is_open())
1700  socket_->close(ec);
1701  if (ec) {
1702  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1703  }
1704  fileStatus = noFile;
1705  sleep(1); //back-off if error detected
1706  }
1707  return fileStatus;
1708  }
1709 
1711  unsigned int& ls,
1712  std::string& nextFileRaw,
1713  int& rawFd,
1714  uint16_t& rawHeaderSize,
1715  int32_t& serverEventsInNewFile,
1716  int64_t& fileSizeFromMetadata,
1717  uint64_t& thisLockWaitTimeUs) {
1718  EvFDaqDirector::FileStatus fileStatus = noFile;
1719 
1720  //int retval = -1;
1721  //int lock_attempts = 0;
1722  //long total_lock_attempts = 0;
1723 
1724  struct stat buf;
1725  int stopFileLS = -1;
1726  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1727  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1728  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1729  if (stopFileCheck == 0)
1730  stopFileLS = readLastLSEntry(stopFilePath_);
1731  else
1732  stopFileLS = 1; //stop without drain if only pid is stopped
1733  if (!stop_ls_override_) {
1734  //if lumisection is higher than in stop file, should quit at next from current
1735  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1736  stopFileLS = stop_ls_override_ = ls;
1737  } else
1738  stopFileLS = stop_ls_override_;
1739  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1740  << stopFileLS;
1741  //return runEnded;
1742  } else //if file was removed before reaching stop condition, reset this
1743  stop_ls_override_ = 0;
1744 
1745  /* look for EoLS
1746  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1747  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1748  ls++;
1749  return noFile;
1750  }
1751  */
1752 
1753  timeval ts_lockbegin;
1754  gettimeofday(&ts_lockbegin, nullptr);
1755 
1756  std::string nextFileJson;
1757  uint32_t serverLS, closedServerLS;
1758  unsigned int serverHttpStatus;
1759  bool serverError;
1760 
1761  //local lock to force index json and EoLS files to appear in order
1763  lockFULocal2();
1764 
1765  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1766  bool rawHeader = false;
1767  fileStatus = contactFileBroker(
1768  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1769 
1770  if (serverError) {
1771  //do not update anything
1773  unlockFULocal2();
1774  return noFile;
1775  }
1776 
1777  //handle creation of EoLS and BoLS files if lumisection has changed
1778  if (currentLumiSection == 0) {
1779  if (fileStatus == runEnded) {
1780  createLumiSectionFiles(closedServerLS, 0);
1781  createLumiSectionFiles(serverLS, closedServerLS, false); // +1
1782  } else
1783  createLumiSectionFiles(serverLS, 0);
1784  } else {
1785  //loop over and create any EoLS files missing
1786  if (closedServerLS >= currentLumiSection) {
1787  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1788  createLumiSectionFiles(i + 1, i);
1789  }
1790  }
1791 
1792  bool fileFound = true;
1793 
1794  if (fileStatus == newFile) {
1795  if (rawHeader > 0)
1796  serverEventsInNewFile =
1797  grabNextJsonFromRaw(nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false);
1798  else
1799  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1800  }
1801  //closing file in case of any error
1802  if (serverEventsInNewFile < 0 && rawFd != -1) {
1803  close(rawFd);
1804  rawFd = -1;
1805  }
1806  if (!fileFound) {
1807  //catch condition where directory got deleted
1808  fileStatus = noFile;
1809  struct stat buf;
1810  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1811  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1812  fileStatus = runEnded;
1813  }
1814  }
1815 
1816  //can unlock because all files have been created locally
1818  unlockFULocal2();
1819 
1820  if (fileStatus == runEnded)
1821  ls = std::max(currentLumiSection, serverLS);
1822  else if (fileStatus == newFile) {
1823  assert(serverLS >= ls);
1824  ls = serverLS;
1825  } else if (fileStatus == noFile) {
1826  if (serverLS >= ls)
1827  ls = serverLS;
1828  else {
1829  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1830  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1831  sleep(1);
1832  }
1833  }
1834 
1835  return fileStatus;
1836  }
1837 
1839  // create open dir if not already there
1840 
1842  if (!std::filesystem::is_directory(openPath)) {
1843  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1844  std::filesystem::create_directories(openPath);
1845  }
1846  }
1847 
1849  std::ifstream ij(file);
1850  Json::Value deserializeRoot;
1852 
1853  if (!reader.parse(ij, deserializeRoot)) {
1854  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1855  return -1;
1856  }
1857 
1858  int ret = deserializeRoot.get("lastLS", "").asInt();
1859  return ret;
1860  }
1861 
1863  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1865  struct stat buf;
1866  unsigned int lscount = startFromLS_;
1867  do {
1868  std::stringstream ss;
1869  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1870  fullpath = ss.str();
1871  lscount++;
1872  } while (stat(fullpath.c_str(), &buf) == 0);
1873  return lscount - 1;
1874  }
1875 
1876  //if transferSystem PSet is present in the menu, we require it to be complete and consistent for all specified streams
1878  if (transferSystemJson_)
1879  return;
1880 
1881  transferSystemJson_.reset(new Json::Value);
1882  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1883  if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
1884  const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
1885 
1886  Json::Value destinationsVal(Json::arrayValue);
1887  std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
1888  for (auto& dest : destinations)
1889  destinationsVal.append(dest);
1890  (*transferSystemJson_)["destinations"] = destinationsVal;
1891 
1892  Json::Value modesVal(Json::arrayValue);
1893  std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
1894  for (auto& mode : modes)
1895  modesVal.append(mode);
1896  (*transferSystemJson_)["transferModes"] = modesVal;
1897 
1898  for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
1899  if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
1900  const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
1901  Json::Value streamVal;
1902  for (auto& mode : modes) {
1903  //validation
1904  if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
1905  throw cms::Exception("EvFDaqDirector")
1906  << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
1907  << ")";
1908  std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
1909 
1910  Json::Value sDestsValue(Json::arrayValue);
1911 
1912  if (streamDestinations.empty())
1913  throw cms::Exception("EvFDaqDirector")
1914  << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;
1915 
1916  for (auto& sdest : streamDestinations) {
1917  bool sDestValid = false;
1918  sDestsValue.append(sdest);
1919  for (auto& dest : destinations) {
1920  if (dest == sdest)
1921  sDestValid = true;
1922  }
1923  if (!sDestValid)
1924  throw cms::Exception("EvFDaqDirector")
1925  << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
1926  << ", dest:" << sdest;
1927  }
1928  streamVal[mode] = sDestsValue;
1929  }
1930  (*transferSystemJson_)[psKeyItr->first] = streamVal;
1931  }
1932  }
1933  } else {
1934  if (requireTSPSet_)
1935  throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
1936  }
1937  }
1938 
1940  std::string streamRequestName;
1941  if (transferSystemJson_->isMember(stream.c_str()))
1942  streamRequestName = stream;
1943  else {
1944  std::stringstream msg;
1945  msg << "Transfer system mode definitions missing for -: " << stream;
1946  if (requireTSPSet_)
1947  throw cms::Exception("EvFDaqDirector") << msg.str();
1948  else {
1949  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1950  return std::string("Failsafe");
1951  }
1952  }
1953  //return empty if strict check parameter is not on
1954  if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1955  edm::LogWarning("EvFDaqDirector")
1956  << "Selected mode string is not provided as DaqDirector parameter."
1957  << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1958  return std::string("Failsafe");
1959  }
1960  if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1961  throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
1962  }
1963  //check if stream has properly listed transfer stream
1964  if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
1965  std::stringstream msg;
1966  msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
1967  if (requireTSPSet_)
1968  throw cms::Exception("EvFDaqDirector") << msg.str();
1969  else
1970  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1971  return std::string("Failsafe");
1972  }
1973  Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");
1974 
1975  //flatten string json::Array into CSV std::string
1976  std::string ret;
1977  for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
1978  if (!ret.empty())
1979  ret += ",";
1980  ret += (*it).asString();
1981  }
1982  return ret;
1983  }
1984 
1986  if (mergeTypePset_.empty())
1987  return;
1988  if (!mergeTypeMap_.empty())
1989  return;
1990  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1991  if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
1992  const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
1993  for (const std::string& pname : tsPset.getParameterNames()) {
1994  std::string streamType = tsPset.getParameter<std::string>(pname);
1995  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
1996  mergeTypeMap_.insert(ac, pname);
1997  ac->second = streamType;
1998  ac.release();
1999  }
2000  }
2001  }
2002 
2004  tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
2005  if (mergeTypeMap_.find(search_ac, stream))
2006  return search_ac->second;
2007 
2008  edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
2009  std::string defaultName = MergeTypeNames_[defaultType];
2010  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2011  mergeTypeMap_.insert(ac, stream);
2012  ac->second = defaultName;
2013  ac.release();
2014  return defaultName;
2015  }
2016 
2018  std::string proc_flag = run_dir_ + "/processing";
2019  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2020  close(proc_flag_fd);
2021  }
2022 
2023  struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
2024 #ifdef __APPLE__
2025  return {start, len, pid, type, whence};
2026 #else
2027  return {type, whence, start, len, pid};
2028 #endif
2029  }
2030 
2031 } // namespace evf
ConfigurationDescriptions.h
runTheMatrix.ret
ret
prodAgent to be discontinued
Definition: runTheMatrix.py:542
evf::EvFDaqDirector::getOpenOutputJsonFilePath
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:425
edm::GlobalContext::luminosityBlockID
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:60
evf::EvFDaqDirector::preallocate
void preallocate(edm::service::SystemBounds const &bounds)
Definition: EvFDaqDirector.cc:320
evf::EvFDaqDirector::directorBU_
bool directorBU_
Definition: EvFDaqDirector.h:217
evf::EvFDaqDirector::previousFileSize_
unsigned long previousFileSize_
Definition: EvFDaqDirector.h:245
evf::EvFDaqDirector::bumpFile
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
Definition: EvFDaqDirector.cc:779
evf::EvFDaqDirector::requireTSPSet_
bool requireTSPSet_
Definition: EvFDaqDirector.h:214
evf::EvFDaqDirector::unlockFULocal2
void unlockFULocal2()
Definition: EvFDaqDirector.cc:928
evf::EvFDaqDirector::endpoint_iterator_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
Definition: EvFDaqDirector.h:282
eostools.ls
def ls(path, rec=False)
Definition: eostools.py:349
evf::EvFDaqDirector::dirManager_
DirManager dirManager_
Definition: EvFDaqDirector.h:243
Json::ValueIterator
Iterator for object and array value.
Definition: value.h:908
evf::EvFDaqDirector::createBoLSFile
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
Definition: EvFDaqDirector.cc:933
evf::EvFDaqDirector::fileBrokerKeepAlive_
bool fileBrokerKeepAlive_
Definition: EvFDaqDirector.h:210
evf::EvFDaqDirector::getRawFilePath
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
Definition: EvFDaqDirector.cc:405
electrons_cff.bool
bool
Definition: electrons_cff.py:366
mps_fire.i
i
Definition: mps_fire.py:428
evf::EvFDaqDirector::fulocal_rwlock_fd_
int fulocal_rwlock_fd_
Definition: EvFDaqDirector.h:234
getFRDFileHeaderVersion
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:41
DataPoint.h
evf::EvFDaqDirector::runEnded
Definition: EvFDaqDirector.h:64
MessageLogger.h
evf::EvFDaqDirector::useFileBroker_
bool useFileBroker_
Definition: EvFDaqDirector.h:206
evf::EvFDaqDirector::init_lock_
pthread_mutex_t init_lock_
Definition: EvFDaqDirector.h:259
evf::EvFDaqDirector::getEoLSFilePathOnBU
std::string getEoLSFilePathOnBU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:476
funct::false
false
Definition: Factorize.h:29
evf::EvFDaqDirector::filesToDeletePtr_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
Definition: EvFDaqDirector.h:257
evf::EvFDaqDirector::getEoRFilePath
std::string getEoRFilePath() const
Definition: EvFDaqDirector.cc:488
evf::EvFDaqDirector::readEolsDefinition_
bool readEolsDefinition_
Definition: EvFDaqDirector.h:264
evf::EvFDaqDirector::mergeTypePset_
std::string mergeTypePset_
Definition: EvFDaqDirector.h:216
reco_skim_cfg_mod.fullpath
fullpath
Definition: reco_skim_cfg_mod.py:202
filterCSVwithJSON.copy
copy
Definition: filterCSVwithJSON.py:36
edm::ActivityRegistry::watchPostGlobalEndRun
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
Definition: ActivityRegistry.h:334
evf::EvFDaqDirector::bu_run_open_dir_
std::string bu_run_open_dir_
Definition: EvFDaqDirector.h:228
jsoncollector::DataPointDefinition::getNames
std::vector< std::string > const & getNames()
Definition: DataPointDefinition.h:44
fffnaming::streamerDataFileNameWithPid
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:72
fffnaming::streamerDataChecksumFileNameWithInstance
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:91
Json::Value::end
const_iterator end() const
evf::EvFDaqDirector::bu_readlock_fd_
int bu_readlock_fd_
Definition: EvFDaqDirector.h:231
evf::EvFDaqDirector::nThreads_
unsigned int nThreads_
Definition: EvFDaqDirector.h:262
Json::arrayValue
array value (ordered list)
Definition: value.h:30
evf::EvFDaqDirector::getStreamMergeType
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
Definition: EvFDaqDirector.cc:2003
evf::EvFDaqDirector::getLumisectionToStart
unsigned int getLumisectionToStart() const
Definition: EvFDaqDirector.cc:1862
edm::ProcessContext
Definition: ProcessContext.h:27
evf::EvFDaqDirector::checkTransferSystemPSet
void checkTransferSystemPSet(edm::ProcessContext const &pc)
Definition: EvFDaqDirector.cc:1877
cms::cuda::stream
cudaStream_t stream
Definition: HistoContainer.h:57
Json::Value::get
Value get(UInt index, const Value &defaultValue) const
pos
Definition: PixelAliasList.h:18
evf::EvFDaqDirector::getRootHistogramFilePath
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:468
ALCARECOPromptCalibProdSiPixelAli0T_cff.mode
mode
Definition: ALCARECOPromptCalibProdSiPixelAli0T_cff.py:96
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
evf::EvFDaqDirector::bu_writelock_fd_
int bu_writelock_fd_
Definition: EvFDaqDirector.h:232
evf::EvFDaqDirector::getProtocolBufferHistogramFilePath
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:454
evf::EvFDaqDirector::contactFileBroker
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
Definition: EvFDaqDirector.cc:1447
fffnaming::protocolBufferHistogramFileNameWithInstance
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:129
evf::EvFDaqDirector::getStreamDestinations
std::string getStreamDestinations(std::string const &stream) const
Definition: EvFDaqDirector.cc:1939
evf::EvFDaqDirector::tryInitializeFuLockFile
void tryInitializeFuLockFile()
Definition: EvFDaqDirector.cc:880
cms::cuda::assert
assert(be >=bs)
evf::EvFDaqDirector::io_service_
boost::asio::io_service io_service_
Definition: EvFDaqDirector.h:279
fffnaming::inputRawFileName
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
Definition: FFFNamingSchema.h:48
evf::EvFDaqDirector::lockInitLock
void lockInitLock()
Definition: EvFDaqDirector.cc:909
evf::EvFDaqDirector::stop_ls_override_
unsigned int stop_ls_override_
Definition: EvFDaqDirector.h:268
mps_check.msg
tuple msg
Definition: mps_check.py:285
FRDEventMessage.h
evf::EvFDaqDirector::fileBrokerHost_
std::string fileBrokerHost_
Definition: EvFDaqDirector.h:208
edm::ParameterSet::existsAs
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:171
evf::EvFDaqDirector::runAbort
Definition: EvFDaqDirector.h:64
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
beamerCreator.create
def create(alignables, pedeDump, additionalData, outputFile, config)
Definition: beamerCreator.py:44
fffnaming::eorFileName
std::string eorFileName(const unsigned int run)
Definition: FFFNamingSchema.h:34
newFWLiteAna.found
found
Definition: newFWLiteAna.py:118
evf::EvFDaqDirector::getOpenDatFilePath
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:421
evf::EvFDaqDirector::getFFFParamsFilePathOnBU
std::string getFFFParamsFilePathOnBU() const
Definition: EvFDaqDirector.cc:492
fffnaming::eolsFileName
std::string eolsFileName(const unsigned int run, const unsigned int ls)
Definition: FFFNamingSchema.h:20
evf::EvFDaqDirector::query_
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
Definition: EvFDaqDirector.h:281
edm::LogWarning
Log< level::Warning, false > LogWarning
Definition: MessageLogger.h:122
Json::Reader
Unserialize a JSON document into a Value.
Definition: reader.h:16
evf::EvFDaqDirector::baseRunDir
std::string & baseRunDir()
Definition: EvFDaqDirector.h:76
evf::EvFDaqDirector::getRunOpenDirPath
std::string getRunOpenDirPath() const
Definition: EvFDaqDirector.h:106
evf::EvFDaqDirector::getDatFilePath
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:417
evf::EvFDaqDirector::preBeginJob
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
Definition: EvFDaqDirector.cc:359
myMessageLogger_cff.destinations
destinations
Definition: myMessageLogger_cff.py:36
evf::EvFDaqDirector::getInitFilePath
std::string getInitFilePath(std::string const &stream) const
Definition: EvFDaqDirector.cc:445
getHLTprescales.readIndex
def readIndex()
Definition: getHLTprescales.py:36
evf::FastMonitoringService::accumulateFileSize
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
Definition: FastMonitoringService.cc:668
evf::EvFDaqDirector::openFULockfileStream
void openFULockfileStream(bool create)
Definition: EvFDaqDirector.cc:890
contentValuesCheck.ss
ss
Definition: contentValuesCheck.py:33
evf::FastMonitoringService
Definition: FastMonitoringService.h:153
evf::EvFDaqDirector::updateFuLock
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
Definition: EvFDaqDirector.cc:503
evf::EvFDaqDirector::getEoLSFilePathOnFU
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:480
FRDFileHeader.h
RPCNoise_example.check
check
Definition: RPCNoise_example.py:71
evf::EvFDaqDirector::stopFilePath_
std::string stopFilePath_
Definition: EvFDaqDirector.h:266
edm::ConfigurationDescriptions::add
void add(std::string const &label, ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:57
Calorimetry_cff.dp
dp
Definition: Calorimetry_cff.py:158
edm::ActivityRegistry::watchPreBeginJob
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
Definition: ActivityRegistry.h:151
DQM.reader
reader
Definition: DQM.py:105
Service.h
evf::EvFDaqDirector::parseFRDFileHeader
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
Definition: EvFDaqDirector.cc:961
evf::EvFDaqDirector::getOpenInputJsonFilePath
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
Definition: EvFDaqDirector.cc:413
evf::EvFDaqDirector::selectedTransferMode_
std::string selectedTransferMode_
Definition: EvFDaqDirector.h:215
evf::EvFDaqDirector::fms_
evf::FastMonitoringService * fms_
Definition: EvFDaqDirector.h:254
edm::ActivityRegistry
Definition: ActivityRegistry.h:134
summarizeEdmComparisonLogfiles.success
success
Definition: summarizeEdmComparisonLogfiles.py:115
corrVsCorr.filename
filename
Definition: corrVsCorr.py:123
Json::Value::asInt
Int asInt() const
eostools.mkdir
def mkdir(path)
Definition: eostools.py:251
fffnaming::streamerDataFileNameWithInstance
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:81
evf::EvFDaqDirector::noFile
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::getMergedProtocolBufferHistogramFilePath
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:459
evf::EvFDaqDirector::run_
unsigned int run_
Definition: EvFDaqDirector.h:205
edm::ProcessContext::parameterSetID
ParameterSetID const & parameterSetID() const
Definition: ProcessContext.h:32
ParameterSetDescription.h
evf::EvFDaqDirector::eolsNFilesIndex_
unsigned int eolsNFilesIndex_
Definition: EvFDaqDirector.h:265
unpackData-CaloStage2.pname
pname
Definition: unpackData-CaloStage2.py:76
evf::EvFDaqDirector::pid_
std::string pid_
Definition: EvFDaqDirector.h:225
evf::EvFDaqDirector::fileBrokerUseLocalLock_
bool fileBrokerUseLocalLock_
Definition: EvFDaqDirector.h:211
mitigatedMETSequence_cff.U
U
Definition: mitigatedMETSequence_cff.py:36
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
evf::EvFDaqDirector::MergeTypeNames_
static const std::vector< std::string > MergeTypeNames_
Definition: EvFDaqDirector.h:274
evf::EvFDaqDirector::fileDeleteLockPtr_
std::mutex * fileDeleteLockPtr_
Definition: EvFDaqDirector.h:256
evf::EvFDaqDirector::getOutputJsonFilePath
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:429
cppFunctionSkipper.exception
exception
Definition: cppFunctionSkipper.py:10
funct::true
true
Definition: Factorize.h:173
evf::EvFDaqDirector::fulocal_rwlock_fd2_
int fulocal_rwlock_fd2_
Definition: EvFDaqDirector.h:235
fffnaming::rootHistogramFileNameWithPid
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:139
edm::GlobalContext
Definition: GlobalContext.h:29
edm::service::SystemBounds
Definition: SystemBounds.h:29
LogDebug
#define LogDebug(id)
Definition: MessageLogger.h:233
fffnaming::initFileNameWithPid
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:55
edm::ParameterSet
Definition: ParameterSet.h:47
edm::service::SystemBounds::maxNumberOfThreads
unsigned int maxNumberOfThreads() const
Definition: SystemBounds.h:38
Json::Value::append
Value & append(const Value &value)
Append value to array at the end.
evf::EvFDaqDirector::preBeginRun
void preBeginRun(edm::GlobalContext const &globalContext)
Definition: EvFDaqDirector.cc:364
evf::EvFDaqDirector::fileBrokerPort_
std::string fileBrokerPort_
Definition: EvFDaqDirector.h:209
GlobalContext.h
evf::EvFDaqDirector::getMergedDatChecksumFilePath
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:437
evf::EvFDaqDirector::getNextFromFileBroker
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
Definition: EvFDaqDirector.cc:1710
SiStripPI::max
Definition: SiStripPayloadInspectorHelper.h:169
evf::EvFDaqDirector::~EvFDaqDirector
~EvFDaqDirector()
Definition: EvFDaqDirector.cc:301
eostools.chmod
def chmod(path, mode)
Definition: eostools.py:294
SiStripCommissioningSource_FromEDM_cfg.EvFDaqDirector
EvFDaqDirector
Definition: SiStripCommissioningSource_FromEDM_cfg.py:57
evf::EvFDaqDirector::mergeTypeMap_
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
Definition: EvFDaqDirector.h:271
evf::EvFDaqDirector::unlockFULocal
void unlockFULocal()
Definition: EvFDaqDirector.cc:918
fffnaming::streamerJsonFileNameWithPid
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:101
recoMuon::in
Definition: RecoMuonEnumerators.h:6
gainCalibHelper::gainCalibPI::type
type
Definition: SiPixelGainCalibHelper.h:40
evf::EvFDaqDirector::getOpenInitFilePath
std::string getOpenInitFilePath(std::string const &stream) const
Definition: EvFDaqDirector.cc:441
evf::EvFDaqDirector::run_dir_
std::string run_dir_
Definition: EvFDaqDirector.h:226
edm::Service
Definition: Service.h:30
createfilelist.int
int
Definition: createfilelist.py:10
EvFDaqDirector.h
FrontierConditions_GlobalTag_cff.file
file
Definition: FrontierConditions_GlobalTag_cff.py:13
FastMonitoringService.h
evf::EvFDaqDirector::getMergedRootHistogramFilePath
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:472
edm::LuminosityBlockID::luminosityBlock
LuminosityBlockNumber_t luminosityBlock() const
Definition: LuminosityBlockID.h:42
evf::DirManager::findHighestRunDir
std::string findHighestRunDir()
Definition: DirManager.cc:23
evf::EvFDaqDirector::fuLockPollInterval_
unsigned int fuLockPollInterval_
Definition: EvFDaqDirector.h:212
evf::EvFDaqDirector::lockFULocal
void lockFULocal()
Definition: EvFDaqDirector.cc:913
evf::EvFDaqDirector::getOpenProtocolBufferHistogramFilePath
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:449
evf::EvFDaqDirector
Definition: EvFDaqDirector.h:62
evf::EvFDaqDirector::bu_base_dir_
std::string bu_base_dir_
Definition: EvFDaqDirector.h:204
edm::service::SystemBounds::maxNumberOfStreams
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:35
jsoncollector::DataPoint
Definition: DataPoint.h:36
DBConfiguration_cff.connect
connect
Definition: DBConfiguration_cff.py:18
evf::EvFDaqDirector::createRunOpendirMaybe
void createRunOpendirMaybe()
Definition: EvFDaqDirector.cc:1838
Json::Value::begin
const_iterator begin() const
evf::EvFDaqDirector::getEoRFilePathOnFU
std::string getEoRFilePathOnFU() const
Definition: EvFDaqDirector.cc:490
edm::LogError
Log< level::Error, false > LogError
Definition: MessageLogger.h:123
evf::EvFDaqDirector::fu_rw_fulk
struct flock fu_rw_fulk
Definition: EvFDaqDirector.h:252
res
Definition: Electron.h:6
evf::EvFDaqDirector::createLumiSectionFiles
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS=true)
Definition: EvFDaqDirector.cc:943
evf::EvFDaqDirector::hltSourceDirectory_
std::string hltSourceDirectory_
Definition: EvFDaqDirector.h:218
evf::EvFDaqDirector::startFromLS_
unsigned int startFromLS_
Definition: EvFDaqDirector.h:220
visDQMUpload.buf
buf
Definition: visDQMUpload.py:154
FedRawDataInputSource.h
jsoncollector
Definition: DataPoint.h:26
evf::EvFDaqDirector::readLastLSEntry
int readLastLSEntry(std::string const &file)
Definition: EvFDaqDirector.cc:1848
jsoncollector::DataPointDefinition
Definition: DataPointDefinition.h:20
readEcalDQMStatus.read
read
Definition: readEcalDQMStatus.py:38
evf::EvFDaqDirector::getMergedDatFilePath
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:433
writeEcalDQMStatus.write
write
Definition: writeEcalDQMStatus.py:48
edm::ActivityRegistry::watchPreGlobalEndLumi
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:405
edm::ActivityRegistry::watchPreallocate
void watchPreallocate(Preallocate::slot_type const &iSlot)
Definition: ActivityRegistry.h:144
FRDFileHeader_v1
Definition: FRDFileHeader.h:22
evf::EvFDaqDirector::fu_rw_flk
struct flock fu_rw_flk
Definition: EvFDaqDirector.h:251
DataPointDefinition.h
evf::EvFDaqDirector::run_string_
std::string run_string_
Definition: EvFDaqDirector.h:223
edm::getParameterSet
ParameterSet const & getParameterSet(ParameterSetID const &id)
Definition: ParameterSet.cc:862
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
evf::EvFDaqDirector::socket_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
Definition: EvFDaqDirector.h:283
std
Definition: JetResolutionObject.h:76
evf::EvFDaqDirector::createProcessingNotificationMaybe
void createProcessingNotificationMaybe() const
Definition: EvFDaqDirector.cc:2017
evf::EvFDaqDirector::initRun
void initRun()
Definition: EvFDaqDirector.cc:149
evf::MergeType
MergeType
Definition: EvFDaqDirector.h:58
evf::EvFDaqDirector::rawFileHasHeader
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
Definition: EvFDaqDirector.cc:1057
edm::PathsAndConsumesOfModulesBase
Definition: PathsAndConsumesOfModulesBase.h:35
fffnaming::rootHistogramFileNameWithInstance
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:148
fffnaming::inputJsonFileName
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
Definition: FFFNamingSchema.h:41
Exception
Definition: hltDiff.cc:245
evf::EvFDaqDirector::fulockfile_
std::string fulockfile_
Definition: EvFDaqDirector.h:229
MatrixUtil.remove
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:219
evf
Definition: fillJson.h:27
evf::EvFDaqDirector::preGlobalEndLumi
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
Definition: EvFDaqDirector.cc:383
evf::EvFDaqDirector::transferSystemJson_
std::shared_ptr< Json::Value > transferSystemJson_
Definition: EvFDaqDirector.h:270
evf::EvFDaqDirector::getInputJsonFilePath
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
Definition: EvFDaqDirector.cc:401
evf::EvFDaqDirector::hostname_
std::string hostname_
Definition: EvFDaqDirector.h:222
evf::EvFDaqDirector::getOpenRootHistogramFilePath
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:464
evf::EvFDaqDirector::dpd_
jsoncollector::DataPointDefinition * dpd_
Definition: EvFDaqDirector.h:277
edm::ParameterSet::getParameter
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
Exception.h
evf::EvFDaqDirector::resolver_
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
Definition: EvFDaqDirector.h:280
evf::EvFDaqDirector::removeFile
void removeFile(unsigned int ls, unsigned int index)
Definition: EvFDaqDirector.cc:501
fffnaming::bolsFileName
std::string bolsFileName(const unsigned int run, const unsigned int ls)
Definition: FFFNamingSchema.h:27
evf::EvFDaqDirector::nStreams_
unsigned int nStreams_
Definition: EvFDaqDirector.h:261
data
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
evf::EvFDaqDirector::grabNextJsonFileAndUnlock
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
Definition: EvFDaqDirector.cc:1358
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
timingPdfMaker.infile
infile
Definition: timingPdfMaker.py:350
evf::EvFDaqDirector::fu_readwritelock_fd_
int fu_readwritelock_fd_
Definition: EvFDaqDirector.h:233
RecoTauValidation_cfi.header
header
Definition: RecoTauValidation_cfi.py:292
cond::uint64_t
unsigned long long uint64_t
Definition: Time.h:13
evf::EvFDaqDirector::bu_run_dir_
std::string bu_run_dir_
Definition: EvFDaqDirector.h:227
evf::EvFDaqDirector::getBoLSFilePathOnFU
std::string getBoLSFilePathOnFU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:484
spu::def
int def(FILE *, FILE *, int)
Definition: SherpackUtilities.cc:14
mps_fire.result
result
Definition: mps_fire.py:311
evf::EvFDaqDirector::base_dir_
std::string base_dir_
Definition: EvFDaqDirector.h:203
cms::Exception
Definition: Exception.h:70
timingPdfMaker.outfile
outfile
Definition: timingPdfMaker.py:351
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
edm::ActivityRegistry::watchPreGlobalBeginRun
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
Definition: ActivityRegistry.h:317
evf::EvFDaqDirector::fu_rw_lock_stream
FILE * fu_rw_lock_stream
Definition: EvFDaqDirector.h:239
command_line.start
start
Definition: command_line.py:167
fffnaming::protocolBufferHistogramFileNameWithPid
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:120
evf::EvFDaqDirector::getOpenRawFilePath
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
Definition: EvFDaqDirector.cc:409
evf::EvFDaqDirector::stopFilePathPid_
std::string stopFilePathPid_
Definition: EvFDaqDirector.h:267
edm_modernize_messagelogger.stat
stat
Definition: edm_modernize_messagelogger.py:27
evf::EvFDaqDirector::unlockInitLock
void unlockInitLock()
Definition: EvFDaqDirector.cc:911
evf::EvFDaqDirector::run_nstring_
std::string run_nstring_
Definition: EvFDaqDirector.h:224
ProcessContext.h
evf::EvFDaqDirector::postEndRun
void postEndRun(edm::GlobalContext const &globalContext)
Definition: EvFDaqDirector.cc:374
edm::Log
Definition: MessageLogger.h:70
StreamID.h
evf::EvFDaqDirector::FileStatus
FileStatus
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::lockFULocal2
void lockFULocal2()
Definition: EvFDaqDirector.cc:923
SystemBounds.h
evf::EvFDaqDirector::grabNextJsonFile
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
Definition: EvFDaqDirector.cc:1178
evf::EvFDaqDirector::getNFilesFromEoLS
int getNFilesFromEoLS(std::string BUEoLSFile)
Definition: EvFDaqDirector.cc:718
edm::ParameterSet::getParameterSet
ParameterSet const & getParameterSet(std::string const &) const
Definition: ParameterSet.cc:2128
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
Json::Value
Represents a JSON value.
Definition: value.h:99
evf::EvFDaqDirector::fillDescriptions
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: EvFDaqDirector.cc:327
evf::EvFDaqDirector::bu_w_lock_stream
FILE * bu_w_lock_stream
Definition: EvFDaqDirector.h:237
evf::EvFDaqDirector::fileBrokerHostFromCfg_
bool fileBrokerHostFromCfg_
Definition: EvFDaqDirector.h:207
mps_fire.dest
dest
Definition: mps_fire.py:179
evf::EvFDaqDirector::newFile
Definition: EvFDaqDirector.h:64
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
evf::EvFDaqDirector::grabNextJsonFromRaw
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
Definition: EvFDaqDirector.cc:1096
evf::EvFDaqDirector::checkMergeTypePSet
void checkMergeTypePSet(edm::ProcessContext const &pc)
Definition: EvFDaqDirector.cc:1985