CMS 3D CMS Logo

EvFDaqDirector.cc
Go to the documentation of this file.
10 
18 
19 #include <iostream>
20 #include <fstream>
21 #include <sstream>
22 #include <sys/time.h>
23 #include <unistd.h>
24 #include <cstdio>
25 #include <boost/lexical_cast.hpp>
26 #include <boost/algorithm/string.hpp>
27 
28 //using boost::asio::ip::tcp;
29 
30 //#define DEBUG
31 
32 using namespace jsoncollector;
33 
34 namespace evf {
35 
36  //for enum MergeType
37  const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
38 
40  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
41  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
42  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
43  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
44  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
45  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
46  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
47  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
48  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
49  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
50  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
51  requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
52  selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
53  mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
54  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
55  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
56  hostname_(""),
57  bu_readlock_fd_(-1),
58  bu_writelock_fd_(-1),
59  fu_readwritelock_fd_(-1),
60  fulocal_rwlock_fd_(-1),
61  fulocal_rwlock_fd2_(-1),
62  bu_w_lock_stream(nullptr),
63  bu_r_lock_stream(nullptr),
64  fu_rw_lock_stream(nullptr),
65  dirManager_(base_dir_),
66  previousFileSize_(0),
67  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
68  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
69  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
70  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
71  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
72  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
78 
79  //save hostname for later
80  char hostname[33];
81  gethostname(hostname, 32);
82  hostname_ = hostname;
83 
84  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
85  if (fuLockPollIntervalPtr) {
86  try {
87  fuLockPollInterval_ = boost::lexical_cast<unsigned int>(std::string(fuLockPollIntervalPtr));
88  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
89  << " us";
90  } catch (boost::bad_lexical_cast const&) {
91  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
92  }
93  }
94 
95  //override file service parameter if specified by environment
96  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
97  if (fileBrokerParamPtr) {
98  try {
99  useFileBroker_ = (boost::lexical_cast<unsigned int>(std::string(fileBrokerParamPtr))) > 0;
100  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
101  } catch (boost::bad_lexical_cast const&) {
102  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
103  }
104  }
105  if (useFileBroker_) {
107  //find BU data address from hltd configuration
109  struct stat buf;
110  if (stat("/etc/appliance/bus.config", &buf) == 0) {
111  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
112  std::getline(busconfig, fileBrokerHost_);
113  }
114  if (fileBrokerHost_.empty())
115  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
116  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
117  throw cms::Exception("EvFDaqDirector")
118  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
119 
120  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
121  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
122  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
123  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
124  }
125 
126  char* startFromLSPtr = std::getenv("FFF_STARTFROMLS");
127  if (startFromLSPtr) {
128  try {
129  startFromLS_ = boost::lexical_cast<unsigned int>(std::string(startFromLSPtr));
130  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
131  } catch (boost::bad_lexical_cast const&) {
132  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
133  }
134  }
135 
136  //override file service parameter if specified by environment
137  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
138  if (fileBrokerUseLockParamPtr) {
139  try {
140  fileBrokerUseLocalLock_ = (boost::lexical_cast<unsigned int>(std::string(fileBrokerUseLockParamPtr))) > 0;
141  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
143  } catch (boost::bad_lexical_cast const&) {
144  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
145  }
146  }
147  }
148 
150  std::stringstream ss;
151  ss << "run" << std::setfill('0') << std::setw(6) << run_;
152  run_string_ = ss.str();
153  ss = std::stringstream();
154  ss << run_;
155  run_nstring_ = ss.str();
156  run_dir_ = base_dir_ + "/" + run_string_;
157  ss = std::stringstream();
158  ss << getpid();
159  pid_ = ss.str();
160 
161  // check if base dir exists or create it accordingly
162  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
163  if (retval != 0 && errno != EEXIST) {
164  throw cms::Exception("DaqDirector")
165  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
166  }
167 
168  //create run dir in base dir
169  umask(0);
170  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
171  if (retval != 0 && errno != EEXIST) {
172  throw cms::Exception("DaqDirector")
173  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
174  }
175 
176  //create fu-local.lock in run open dir
177  if (!directorBU_) {
179  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
181  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
182  if (fulocal_rwlock_fd_ == -1)
183  throw cms::Exception("DaqDirector")
184  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
185  chmod(fulocal_lock_.c_str(), 0777);
186  fsync(fulocal_rwlock_fd_);
187  //open second fd for another input source thread
189  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
190  if (fulocal_rwlock_fd2_ == -1)
191  throw cms::Exception("DaqDirector")
192  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
193  }
194 
195  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
196  //for BU, it is created at this point
197  if (directorBU_) {
199  std::string bulockfile = bu_run_dir_ + "/bu.lock";
200  fulockfile_ = bu_run_dir_ + "/fu.lock";
201 
202  //make or find bu run dir
203  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
204  if (retval != 0 && errno != EEXIST) {
205  throw cms::Exception("DaqDirector")
206  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno) << "\n";
207  }
208  bu_run_open_dir_ = bu_run_dir_ + "/open";
209  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
210  if (retval != 0 && errno != EEXIST) {
211  throw cms::Exception("DaqDirector")
212  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno) << "\n";
213  }
214 
215  // the BU director does not need to know about the fu lock
216  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
217  if (bu_writelock_fd_ == -1)
218  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
219  else
220  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
221  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
222  if (bu_w_lock_stream == nullptr)
223  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
224 
225  // BU INITIALIZES LOCK FILE
226  // FU LOCK FILE OPEN
227  openFULockfileStream(true);
229  fflush(fu_rw_lock_stream);
230  close(fu_readwritelock_fd_);
231 
232  if (!hltSourceDirectory_.empty()) {
233  struct stat buf;
234  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
235  std::string hltdir = bu_run_dir_ + "/hlt";
236  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
237  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
238  if (retval != 0 && errno != EEXIST)
239  throw cms::Exception("DaqDirector")
240  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno) << "\n";
241 
242  std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
243  std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
244 
245  std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
246  for (auto& optfile : optfiles) {
247  try {
248  std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
249  } catch (...) {
250  }
251  }
252 
253  std::filesystem::rename(tmphltdir, hltdir);
254  } else
255  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
256  }
257  //else{}//no configuration specified
258  } else {
259  // for FU, check if bu base dir exists
260 
261  retval = mkdir(bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
262  if (retval != 0 && errno != EEXIST) {
263  throw cms::Exception("DaqDirector")
264  << " Error checking for bu base dir -: " << bu_base_dir_ << " mkdir error:" << strerror(errno) << "\n";
265  }
266 
268  fulockfile_ = bu_run_dir_ + "/fu.lock";
269  openFULockfileStream(false);
270  }
271 
272  pthread_mutex_init(&init_lock_, nullptr);
273 
274  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
275  std::stringstream sstp;
276  sstp << stopFilePath_ << "_pid" << pid_;
277  stopFilePathPid_ = sstp.str();
278 
279  if (!directorBU_) {
280  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
281  struct stat statbuf;
282  if (!stat(defPath.c_str(), &statbuf))
283  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
284  else {
285  //look in source directory if not present in ramdisk
286  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
287  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
288  if (stat(defPath.c_str(), &statbuf)) {
289  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
290  if (stat(defPath.c_str(), &statbuf)) {
291  defPath = defPathSuffix;
292  }
293  }
294  }
295  dpd_ = new DataPointDefinition();
296  std::string defLabel = "data";
297  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
298  }
299  }
300 
302  //close server connection
303  if (socket_.get() && socket_->is_open()) {
304  boost::system::error_code ec;
305  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
306  socket_->close(ec);
307  }
308 
309  if (fulocal_rwlock_fd_ != -1) {
310  unlockFULocal();
311  close(fulocal_rwlock_fd_);
312  }
313 
314  if (fulocal_rwlock_fd2_ != -1) {
315  unlockFULocal2();
316  close(fulocal_rwlock_fd2_);
317  }
318  }
319 
321  initRun();
322 
323  nThreads_ = bounds.maxNumberOfStreams();
324  nStreams_ = bounds.maxNumberOfThreads();
325  }
326 
329  desc.setComment(
330  "Service used for file locking arbitration and for propagating information between other EvF components");
331  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
332  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
333  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
334  desc.addUntracked<bool>("useFileBroker", false)
335  ->setComment("Use BU file service to grab input data instead of NFS file locking");
336  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
337  ->setComment("Allow service to discover BU address from hltd configuration");
338  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
339  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
340  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
341  ->setComment("Use keep alive to avoid using large number of sockets");
342  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
343  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
344  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
345  ->setComment("Lock polling interval in microseconds for the input directory file lock");
346  desc.addUntracked<bool>("outputAdler32Recheck", false)
347  ->setComment("Check Adler32 of per-process output files while micro-merging");
348  desc.addUntracked<bool>("requireTransfersPSet", false)
349  ->setComment("Require complete transferSystem PSet in the process configuration");
350  desc.addUntracked<std::string>("selectedTransferMode", "")
351  ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
352  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
353  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
354  desc.addUntracked<std::string>("mergingPset", "")
355  ->setComment("Name of merging PSet to look for merging type definitions for streams");
356  descriptions.add("EvFDaqDirector", desc);
357  }
358 
361  checkMergeTypePSet(pc);
362  }
363 
364  void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
365  //assert(run_ == id.run());
366 
367  // check if the requested run is the latest one - issue a warning if it isn't
369  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
370  << ". This is not the highest run " << dirManager_.findHighestRunDir();
371  }
372  }
373 
374  void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
375  close(bu_readlock_fd_);
376  close(bu_writelock_fd_);
377  if (directorBU_) {
378  std::string filename = bu_run_dir_ + "/bu.lock";
380  }
381  }
382 
384  //delete all files belonging to just closed lumi
385  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
387  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
388  return;
389  }
390 
391  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
392  auto it = filesToDeletePtr_->begin();
393  while (it != filesToDeletePtr_->end()) {
394  if (it->second->lumi_ == ls) {
395  it = filesToDeletePtr_->erase(it);
396  } else
397  it++;
398  }
399  }
400 
401  std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
403  }
404 
405  std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
407  }
408 
409  std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
410  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
411  }
412 
413  std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
414  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
415  }
416 
417  std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
419  }
420 
423  }
424 
427  }
428 
431  }
432 
435  }
436 
439  }
440 
442  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
443  }
444 
447  }
448 
450  std::string const& stream) const {
452  }
453 
455  std::string const& stream) const {
457  }
458 
460  std::string const& stream) const {
462  }
463 
466  }
467 
470  }
471 
474  }
475 
477  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
478  }
479 
481  return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
482  }
483 
485  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
486  }
487 
489 
491 
492  std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
493 
495  int retval = remove(filename.c_str());
496  if (retval != 0)
497  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
498  << ". error = " << strerror(errno);
499  }
500 
501  void EvFDaqDirector::removeFile(unsigned int ls, unsigned int index) { removeFile(getRawFilePath(ls, index)); }
502 
504  std::string& nextFile,
505  uint32_t& fsize,
506  uint16_t& rawHeaderSize,
507  uint64_t& lockWaitTime,
508  bool& setExceptionState) {
509  EvFDaqDirector::FileStatus fileStatus = noFile;
510  rawHeaderSize = 0;
511 
512  int retval = -1;
513  int lock_attempts = 0;
514  long total_lock_attempts = 0;
515 
516  struct stat buf;
517  int stopFileLS = -1;
518  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
519  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
520  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
521  if (stopFileCheck == 0)
522  stopFileLS = readLastLSEntry(stopFilePath_);
523  else
524  stopFileLS = 1; //stop without drain if only pid is stopped
525  if (!stop_ls_override_) {
526  //if lumisection is higher than in stop file, should quit at next from current
527  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
528  stopFileLS = stop_ls_override_ = ls;
529  } else
530  stopFileLS = stop_ls_override_;
531  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
532  << stopFileLS;
533  //return runEnded;
534  } else //if file was removed before reaching stop condition, reset this
535  stop_ls_override_ = 0;
536 
537  timeval ts_lockbegin;
538  gettimeofday(&ts_lockbegin, nullptr);
539 
540  while (retval == -1) {
541  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
542  if (retval == -1)
543  usleep(fuLockPollInterval_);
544  else
545  continue;
546 
547  lock_attempts += fuLockPollInterval_;
548  total_lock_attempts += fuLockPollInterval_;
549  if (lock_attempts > 5000000 || errno == 116) {
550  if (errno == 116)
551  edm::LogWarning("EvFDaqDirector")
552  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
553  else
554  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
555  "fu.lock file are present -: errno "
556  << errno << ":" << strerror(errno) << std::endl;
557 
558  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
559  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
560  ls++;
561  return noFile;
562  }
563 
564  if (stat(bu_run_dir_.c_str(), &buf) != 0)
565  return runEnded;
566  if (stat(fulockfile_.c_str(), &buf) != 0)
567  return runEnded;
568 
569  lock_attempts = 0;
570  }
571  if (total_lock_attempts > 5 * 60000000) {
572  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
573  return runAbort;
574  }
575  }
576 
577  timeval ts_lockend;
578  gettimeofday(&ts_lockend, nullptr);
579  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
580  if (deltat > 0.)
581  lockWaitTime = deltat;
582 
583  if (retval != 0)
584  return fileStatus;
585 
586 #ifdef DEBUG
587  timeval ts_lockend;
588  gettimeofday(&ts_lockend, 0);
589 #endif
590 
591  //open another lock file FD after the lock using main fd has been acquired
592  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
593  if (fu_readwritelock_fd2 == -1)
594  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
595  << " create. error:" << strerror(errno);
596 
597  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
598 
599  // if the stream is readable
600  if (fu_rw_lock_stream2 != nullptr) {
601  unsigned int readLs, readIndex;
602  int check = 0;
603  // rewind the stream
604  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
605  // if rewinded ok
606  if (check == 0) {
607  // read its' values
608  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
609  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
610 
611  unsigned int currentLs = readLs;
612  bool bumpedOk = false;
613  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
614  //no lock file write in this case
615  if (ls && ls + 1 < currentLs)
616  ls++;
617  else {
618  // try to bump (look for new index or EoLS file)
619  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
620  //avoid 2 lumisections jump
621  if (ls && readLs > currentLs && currentLs > ls) {
622  ls++;
623  readLs = currentLs = ls;
624  readIndex = 0;
625  bumpedOk = false;
626  //no write to lock file
627  } else {
628  if (ls == 0 && readLs > currentLs) {
629  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
630  //in this case there is no new file in the same LS
631  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
632  readLs = currentLs;
633  readIndex = 0;
634  bumpedOk = false;
635  //no write to lock file
636  }
637  //update return LS value
638  ls = readLs;
639  }
640  }
641  if (bumpedOk) {
642  // there is a new index file to grab, lock file needs to be updated
643  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
644  if (check == 0) {
645  ftruncate(fu_readwritelock_fd2, 0);
646  // write next index in the file, which is the file the next process should take
647  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
648  fflush(fu_rw_lock_stream2);
649  fsync(fu_readwritelock_fd2);
650  fileStatus = newFile;
651  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
652  } else {
653  edm::LogError("EvFDaqDirector")
654  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
655  setExceptionState = true;
656  return noFile;
657  }
658  } else if (currentLs < readLs) {
659  //there is no new file in next LS (yet), but lock file can be updated to the next LS
660  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
661  if (check == 0) {
662  ftruncate(fu_readwritelock_fd2, 0);
663  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
664  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
665  fflush(fu_rw_lock_stream2);
666  fsync(fu_readwritelock_fd2);
667  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
668  } else {
669  edm::LogError("EvFDaqDirector")
670  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
671  setExceptionState = true;
672  return noFile;
673  }
674  }
675  } else {
676  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
677  << strerror(errno);
678  }
679  } else {
680  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
681  }
682  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
683 
684 #ifdef DEBUG
685  timeval ts_preunlock;
686  gettimeofday(&ts_preunlock, 0);
687  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
688  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
689 #endif
690 
691  //if new json is present, lock file which FedRawDataInputSource will later unlock
692  if (fileStatus == newFile)
693  lockFULocal();
694 
695  //release lock at this point
696  int retvalu = -1;
697  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
698  if (retvalu == -1)
699  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
700 
701 #ifdef DEBUG
702  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
703 #endif
704 
705  if (fileStatus == noFile) {
706  struct stat buf;
707  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
708  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
709  fileStatus = runEnded;
710  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
711  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
712  fileStatus = runEnded;
713  }
714  }
715  return fileStatus;
716  }
717 
719  std::ifstream ij(BUEoLSFile);
720  Json::Value deserializeRoot;
722 
723  if (!reader.parse(ij, deserializeRoot)) {
724  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
725  return -1;
726  }
727 
729  DataPoint dp;
730  dp.deserialize(deserializeRoot);
731 
732  //read definition
733  if (readEolsDefinition_) {
734  //std::string def = boost::algorithm::trim(dp.getDefinition());
735  std::string def = dp.getDefinition();
736  if (def.empty())
737  readEolsDefinition_ = false;
738  while (!def.empty()) {
740  if (def.find('/') == 0)
741  fullpath = def;
742  else
743  fullpath = bu_run_dir_ + '/' + def;
744  struct stat buf;
745  if (stat(fullpath.c_str(), &buf) == 0) {
746  DataPointDefinition eolsDpd;
747  std::string defLabel = "legend";
748  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
749  if (eolsDpd.getNames().empty()) {
750  //try with "data" label if "legend" format is not used
751  eolsDpd = DataPointDefinition();
752  defLabel = "data";
753  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
754  }
755  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
756  if (eolsDpd.getNames().at(i) == "NFiles")
758  readEolsDefinition_ = false;
759  break;
760  }
761  //check if we can still find definition
762  if (def.size() <= 1 || def.find('/') == std::string::npos) {
763  readEolsDefinition_ = false;
764  break;
765  }
766  def = def.substr(def.find('/') + 1);
767  }
768  }
769 
770  if (dp.getData().size() > eolsNFilesIndex_)
771  data = dp.getData()[eolsNFilesIndex_];
772  else {
773  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
774  return -1;
775  }
776  return boost::lexical_cast<int>(data);
777  }
778 
779  bool EvFDaqDirector::bumpFile(unsigned int& ls,
780  unsigned int& index,
781  std::string& nextFile,
782  uint32_t& fsize,
783  uint16_t& rawHeaderSize,
784  int maxLS,
785  bool& setExceptionState) {
786  if (previousFileSize_ != 0) {
787  if (!fms_) {
789  }
790  if (fms_)
792  previousFileSize_ = 0;
793  }
794  nextFile = "";
795 
796  //reached limit
797  if (maxLS >= 0 && ls > (unsigned int)maxLS)
798  return false;
799 
800  struct stat buf;
801  std::stringstream ss;
802  unsigned int nextIndex = index;
803  nextIndex++;
804 
805  // 1. Check suggested file
806  std::string nextFileJson = getInputJsonFilePath(ls, index);
807  if (stat(nextFileJson.c_str(), &buf) == 0) {
808  fsize = previousFileSize_ = buf.st_size;
809  nextFile = nextFileJson;
810  return true;
811  }
812  // 2. No file -> lumi ended? (and how many?)
813  else {
814  // 3. No file -> check for standalone raw file
815  std::string nextFileRaw = getRawFilePath(ls, index);
816  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
817  fsize = previousFileSize_ = buf.st_size;
818  nextFile = nextFileRaw;
819  return true;
820  }
821 
822  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
823 
824  if (stat(BUEoLSFile.c_str(), &buf) == 0) {
825  // recheck that no raw file appeared in the meantime
826  if (stat(nextFileJson.c_str(), &buf) == 0) {
827  fsize = previousFileSize_ = buf.st_size;
828  nextFile = nextFileJson;
829  return true;
830  }
831  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
832  fsize = previousFileSize_ = buf.st_size;
833  nextFile = nextFileRaw;
834  return true;
835  }
836 
837  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
838  if (indexFilesInLS < 0)
839  //parsing failed
840  return false;
841  else {
842  //check index
843  if ((int)index < indexFilesInLS) {
844  //we have 2 files, and check for 1 failed... retry (2 will never be here)
845  edm::LogError("EvFDaqDirector")
846  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
847  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
848  setExceptionState = true;
849  return false;
850  }
851  }
852  // this lumi ended, check for files
853  ++ls;
854  index = 0;
855 
856  //reached limit
857  if (maxLS >= 0 && ls > (unsigned int)maxLS)
858  return false;
859 
860  nextFileJson = getInputJsonFilePath(ls, 0);
861  nextFileRaw = getRawFilePath(ls, 0);
862  if (stat(nextFileJson.c_str(), &buf) == 0) {
863  // a new file was found at new lumisection, index 0
864  fsize = previousFileSize_ = buf.st_size;
865  nextFile = nextFileJson;
866  return true;
867  }
868  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
869  fsize = previousFileSize_ = buf.st_size;
870  nextFile = nextFileRaw;
871  return true;
872  }
873  return false;
874  }
875  }
876  // no new file found
877  return false;
878  }
879 
881  if (fu_rw_lock_stream == nullptr)
882  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
883  else {
884  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
885  unsigned int readLs = 1, readIndex = 0;
886  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
887  }
888  }
889 
891  if (create) {
893  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
894  chmod(fulockfile_.c_str(), 0766);
895  } else {
896  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
897  }
898  if (fu_readwritelock_fd_ == -1)
899  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
900  << " create:" << create << " error:" << strerror(errno);
901  else
902  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
903 
904  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
905  if (fu_rw_lock_stream == nullptr)
906  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
907  }
908 
909  void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
910 
911  void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
912 
914  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
915  flock(fulocal_rwlock_fd_, LOCK_SH);
916  }
917 
919  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
920  flock(fulocal_rwlock_fd_, LOCK_UN);
921  }
922 
924  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
925  flock(fulocal_rwlock_fd2_, LOCK_EX);
926  }
927 
929  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
930  flock(fulocal_rwlock_fd2_, LOCK_UN);
931  }
932 
933  void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
934  //used for backpressure mechanisms and monitoring
935  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
936  struct stat buf;
937  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
938  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
939  close(bol_fd);
940  }
941  }
942 
943  void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
944  const uint32_t currentLumiSection,
945  bool doCreateBoLS,
946  bool doCreateEoLS) {
947  if (currentLumiSection > 0) {
948  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
949  struct stat buf;
950  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
951  if (!found) {
952  if (doCreateEoLS) {
953  int eol_fd =
954  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
955  close(eol_fd);
956  }
957  if (doCreateBoLS)
958  createBoLSFile(lumiSection, false);
959  }
960  } else if (doCreateBoLS) {
961  createBoLSFile(lumiSection, true); //needed for initial lumisection
962  }
963  }
964 
966  int& rawFd,
967  uint16_t& rawHeaderSize,
968  uint32_t& lsFromHeader,
969  int32_t& eventsFromHeader,
970  int64_t& fileSizeFromHeader,
971  bool requireHeader,
972  bool retry,
973  bool closeFile) {
974  int infile;
975 
976  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
977  if (retry) {
978  edm::LogWarning("EvFDaqDirector")
979  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
980  return parseFRDFileHeader(rawSourcePath,
981  rawFd,
982  rawHeaderSize,
983  lsFromHeader,
984  eventsFromHeader,
985  fileSizeFromHeader,
986  requireHeader,
987  false,
988  closeFile);
989  } else {
990  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
991  edm::LogError("EvFDaqDirector")
992  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
993  if (errno == ENOENT)
994  return 1; // error && file not found
995  else
996  return -1;
997  }
998  }
999  }
1000 
1001  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
1002  FRDFileHeader_v1 fileHead;
1003 
1004  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1005  if (closeFile) {
1006  close(infile);
1007  infile = -1;
1008  }
1009 
1010  if (sz_read < 0) {
1011  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - unable to read " << rawSourcePath << " : "
1012  << strerror(errno);
1013  if (infile != -1)
1014  close(infile);
1015  return -1;
1016  }
1017  if ((size_t)sz_read < buf_sz) {
1018  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - file smaller than header: " << rawSourcePath;
1019  if (infile != -1)
1020  close(infile);
1021  return -1;
1022  }
1023 
1024  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1025 
1026  if (frd_version == 0) {
1027  //no header (specific sequence not detected)
1028  if (requireHeader) {
1029  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1030  if (infile != -1)
1031  close(infile);
1032  return -1;
1033  } else {
1034  //no header, but valid file
1035  lseek(infile, 0, SEEK_SET);
1036  rawHeaderSize = 0;
1037  lsFromHeader = 0;
1038  eventsFromHeader = -1;
1039  fileSizeFromHeader = -1;
1040  }
1041  } else {
1042  //version 1 header
1043  uint32_t headerSizeRaw = fileHead.headerSize_;
1044  if (headerSizeRaw < buf_sz) {
1045  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1046  << " v:" << frd_version;
1047  if (infile != -1)
1048  close(infile);
1049  return -1;
1050  }
1051  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1052  lsFromHeader = fileHead.lumiSection_;
1053  eventsFromHeader = (int32_t)fileHead.eventCount_;
1054  fileSizeFromHeader = (int64_t)fileHead.fileSize_;
1055  rawHeaderSize = fileHead.headerSize_;
1056  }
1057  rawFd = infile;
1058  return 0; //OK
1059  }
1060 
1061  bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
1062  int infile;
1063  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1064  edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1065  << strerror(errno);
1066  return false;
1067  }
1068  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
1069  FRDFileHeader_v1 fileHead;
1070 
1071  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1072 
1073  if (sz_read < 0) {
1074  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << rawSourcePath << " : "
1075  << strerror(errno);
1076  if (infile != -1)
1077  close(infile);
1078  return false;
1079  }
1080  if ((size_t)sz_read < buf_sz) {
1081  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << rawSourcePath;
1082  if (infile != -1)
1083  close(infile);
1084  return false;
1085  }
1086 
1087  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1088 
1089  close(infile);
1090 
1091  if (frd_version > 0) {
1092  rawHeaderSize = fileHead.headerSize_;
1093  return true;
1094  }
1095 
1096  rawHeaderSize = 0;
1097  return false;
1098  }
1099 
1101  int& rawFd,
1102  uint16_t& rawHeaderSize,
1103  int64_t& fileSizeFromHeader,
1104  bool& fileFound,
1105  uint32_t serverLS,
1106  bool closeFile) {
1107  fileFound = true;
1108 
1109  //take only first three tokens delimited by "_" in the renamed raw file name
1110  std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1111  size_t pos = 0, n_tokens = 0;
1112  while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1113  }
1114  std::string reducedJsonStem = jsonStem.substr(0, pos);
1115 
1116  std::ostringstream fileNameWithPID;
1117  //should be ported to use fffnaming
1118  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1119 
1120  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1121 
1122  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1123 
1124  //parse RAW file header if it exists
1125  uint32_t lsFromRaw;
1126  int32_t nbEventsWrittenRaw;
1127  int64_t fileSizeFromRaw;
1128  auto ret = parseFRDFileHeader(
1129  rawSourcePath, rawFd, rawHeaderSize, lsFromRaw, nbEventsWrittenRaw, fileSizeFromRaw, true, true, closeFile);
1130  if (ret != 0) {
1131  if (ret == 1)
1132  fileFound = false;
1133  return -1;
1134  }
1135 
1136  int outfile;
1137  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1138  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1139  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1140  if (errno == EEXIST) {
1141  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1142  << " : ";
1143  return -1;
1144  }
1145  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1146  << strerror(errno);
1147  struct stat out_stat;
1148  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1149  edm::LogWarning("EvFDaqDirector")
1150  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1151  << jsonDestPath;
1152  if (unlink(jsonDestPath.c_str()) == -1) {
1153  edm::LogWarning("EvFDaqDirector")
1154  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1155  }
1156  }
1157  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1158  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1159  << jsonDestPath << " : " << strerror(errno);
1160  return -1;
1161  }
1162  }
1163  //write JSON file (TODO: use jsoncpp)
1164  std::stringstream ss;
1165  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1166  std::string sstr = ss.str();
1167 
1168  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1169  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1170  << " : " << strerror(errno);
1171  return -1;
1172  }
1173  close(outfile);
1174  if (serverLS && serverLS != lsFromRaw)
1175  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1176  << " and raw file header LS " << lsFromRaw;
1177 
1178  fileSizeFromHeader = fileSizeFromRaw;
1179  return nbEventsWrittenRaw;
1180  }
1181 
1183  std::string const& rawSourcePath,
1184  int64_t& fileSizeFromJson,
1185  bool& fileFound) {
1186  fileFound = true;
1187 
1188  //should be ported to use fffnaming
1189  std::ostringstream fileNameWithPID;
1190  fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1191  << std::setw(5) << pid_ << ".jsn";
1192 
1193  // assemble json destination path
1194  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1195 
1196  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1197 
1198  int infile = -1, outfile = -1;
1199 
1200  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1201  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1202  << strerror(errno);
1203  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1204  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1205  << jsonSourcePath << " : " << strerror(errno);
1206  if (errno == ENOENT)
1207  fileFound = false;
1208  return -1;
1209  }
1210  }
1211 
1212  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1213  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1214  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1215  if (errno == EEXIST) {
1216  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1217  << " : ";
1218  ::close(infile);
1219  return -1;
1220  }
1221  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1222  << strerror(errno);
1223  struct stat out_stat;
1224  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1225  edm::LogWarning("EvFDaqDirector")
1226  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1227  if (unlink(jsonDestPath.c_str()) == -1) {
1228  edm::LogWarning("EvFDaqDirector")
1229  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1230  }
1231  }
1232  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1233  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1234  << jsonDestPath << " : " << strerror(errno);
1235  ::close(infile);
1236  return -1;
1237  }
1238  }
1239  //copy contents
1240  const std::size_t buf_sz = 512;
1241  std::size_t tot_written = 0;
1242  std::unique_ptr<char> buf(new char[buf_sz]);
1243 
1244  ssize_t sz, sz_read = 1, sz_write;
1245  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1246  sz_write = 0;
1247  do {
1248  assert(sz_read - sz_write > 0);
1249  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1250  sz_read = sz; // cause read loop termination
1251  break;
1252  }
1253  assert(sz > 0);
1254  sz_write += sz;
1255  tot_written += sz;
1256  } while (sz_write < sz_read);
1257  }
1258  close(infile);
1259  close(outfile);
1260 
1261  if (tot_written > 0) {
1262  //leave file if it was empty for diagnosis
1263  if (unlink(jsonSourcePath.c_str()) == -1) {
1264  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1265  << strerror(errno);
1266  return -1;
1267  }
1268  } else {
1269  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1270  << jsonSourcePath;
1271  return -1;
1272  }
1273 
1274  Json::Value deserializeRoot;
1276 
1277  std::string data;
1278  std::stringstream ss;
1279  bool result;
1280  try {
1281  if (tot_written <= buf_sz) {
1282  result = reader.parse(buf.get(), deserializeRoot);
1283  } else {
1284  //json will normally not be bigger than buf_sz bytes
1285  try {
1286  std::ifstream ij(jsonDestPath);
1287  ss << ij.rdbuf();
1288  } catch (std::filesystem::filesystem_error const& ex) {
1289  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1290  return -1;
1291  }
1292  result = reader.parse(ss.str(), deserializeRoot);
1293  }
1294  if (!result) {
1295  if (tot_written <= buf_sz)
1296  ss << buf.get();
1297  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1298  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1299  << ss.str() << ".";
1300  return -1;
1301  }
1302 
1303  //read BU JSON
1304  DataPoint dp;
1305  dp.deserialize(deserializeRoot);
1306  bool success = false;
1307  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1308  if (dpd_->getNames().at(i) == "NEvents")
1309  if (i < dp.getData().size()) {
1310  data = dp.getData()[i];
1311  success = true;
1312  break;
1313  }
1314  }
1315  if (!success) {
1316  if (!dp.getData().empty())
1317  data = dp.getData()[0];
1318  else {
1319  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1320  << "grabNextJsonFile - "
1321  << " error reading number of events from BU JSON; No input value. data -: " << data;
1322  return -1;
1323  }
1324  }
1325 
1326  //try to read raw file size
1327  fileSizeFromJson = -1;
1328  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1329  if (dpd_->getNames().at(i) == "NBytes") {
1330  if (i < dp.getData().size()) {
1331  std::string dataSize = dp.getData()[i];
1332  try {
1333  fileSizeFromJson = boost::lexical_cast<long>(dataSize);
1334  } catch (boost::bad_lexical_cast const&) {
1335  //non-fatal currently, processing can continue without this value
1336  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1337  << "Input value is -: " << dataSize;
1338  }
1339  break;
1340  }
1341  }
1342  }
1343  return boost::lexical_cast<int>(data);
1344  } catch (boost::bad_lexical_cast const& e) {
1345  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1346  << "Input value is -: " << data;
1347  } catch (std::runtime_error const& e) {
1348  //Can be thrown by Json parser
1349  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1350  }
1351 
1352  catch (std::exception const& e) {
1353  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1354  } catch (...) {
1355  //unknown exception
1356  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1357  }
1358 
1359  return -1;
1360  }
1361 
1363  std::string data;
1364  try {
1365  // assemble json destination path
1366  std::filesystem::path jsonDestPath(baseRunDir());
1367 
1368  //should be ported to use fffnaming
1369  std::ostringstream fileNameWithPID;
1370  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1371  << ".jsn";
1372  jsonDestPath /= fileNameWithPID.str();
1373 
1374  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1375  try {
1376  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1377  } catch (std::filesystem::filesystem_error const& ex) {
1378  // Input dir gone?
1379  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1380  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1381  sleep(1);
1382  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1383  }
1384  unlockFULocal();
1385 
1386  try {
1387  //sometimes this fails but file gets deleted
1388  std::filesystem::remove(jsonSourcePath);
1389  } catch (std::filesystem::filesystem_error const& ex) {
1390  // Input dir gone?
1391  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1392  } catch (std::exception const& ex) {
1393  // Input dir gone?
1394  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1395  }
1396 
1397  std::ifstream ij(jsonDestPath);
1398  Json::Value deserializeRoot;
1400 
1401  std::stringstream ss;
1402  ss << ij.rdbuf();
1403  if (!reader.parse(ss.str(), deserializeRoot)) {
1404  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1405  << "\nERROR:\n"
1406  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1407  << ss.str() << ".";
1408  throw std::runtime_error("Cannot deserialize input JSON file");
1409  }
1410 
1411  //read BU JSON
1412  std::string data;
1413  DataPoint dp;
1414  dp.deserialize(deserializeRoot);
1415  bool success = false;
1416  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1417  if (dpd_->getNames().at(i) == "NEvents")
1418  if (i < dp.getData().size()) {
1419  data = dp.getData()[i];
1420  success = true;
1421  }
1422  }
1423  if (!success) {
1424  if (!dp.getData().empty())
1425  data = dp.getData()[0];
1426  else
1427  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1428  << " error reading number of events from BU JSON -: No input value " << data;
1429  }
1430  return boost::lexical_cast<int>(data);
1431  } catch (std::filesystem::filesystem_error const& ex) {
1432  // Input dir gone?
1433  unlockFULocal();
1434  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1435  } catch (std::runtime_error const& e) {
1436  // Another process grabbed the file and NFS did not register this
1437  unlockFULocal();
1438  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1439  } catch (boost::bad_lexical_cast const&) {
1440  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1441  << "Input value is -: " << data;
1442  } catch (std::exception const& e) {
1443  // BU run directory disappeared?
1444  unlockFULocal();
1445  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1446  }
1447 
1448  return -1;
1449  }
1450 
1452  bool& serverError,
1453  uint32_t& serverLS,
1454  uint32_t& closedServerLS,
1455  std::string& nextFileJson,
1456  std::string& nextFileRaw,
1457  bool& rawHeader,
1458  int maxLS) {
1459  EvFDaqDirector::FileStatus fileStatus = noFile;
1460  serverError = false;
1461 
1462  boost::system::error_code ec;
1463  try {
1464  while (true) {
1465  //socket connect
1466  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1468 
1469  if (ec) {
1470  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1471  serverError = true;
1472  break;
1473  }
1474  }
1475 
1476  boost::asio::streambuf request;
1477  std::ostream request_stream(&request);
1478  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1479  if (maxLS >= 0) {
1480  std::stringstream spath;
1481  spath << path << "&stopls=" << maxLS;
1482  path = spath.str();
1483  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1484  }
1485  request_stream << "GET " << path << " HTTP/1.1\r\n";
1486  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1487  request_stream << "Accept: */*\r\n";
1488  request_stream << "Connection: keep-alive\r\n\r\n";
1489 
1490  boost::asio::write(*socket_, request, ec);
1491  if (ec) {
1492  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1493  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1494  //we got disconnected, try to reconnect to the server before writing the request
1496  if (ec) {
1497  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1498  serverError = true;
1499  break;
1500  }
1501  continue;
1502  }
1503  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1504  serverError = true;
1505  break;
1506  }
1507 
1508  boost::asio::streambuf response;
1509  boost::asio::read_until(*socket_, response, "\r\n", ec);
1510  if (ec) {
1511  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1512  serverError = true;
1513  break;
1514  }
1515 
1516  std::istream response_stream(&response);
1517 
1518  std::string http_version;
1519  response_stream >> http_version;
1520 
1521  response_stream >> serverHttpStatus;
1522 
1523  std::string status_message;
1524  std::getline(response_stream, status_message);
1525  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1526  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1527  serverError = true;
1528  break;
1529  }
1530  if (serverHttpStatus != 200) {
1531  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1532  serverError = true;
1533  break;
1534  }
1535 
1536  // Process the response headers.
1538  while (std::getline(response_stream, header) && header != "\r") {
1539  }
1540 
1541  std::string fileInfo;
1542  std::map<std::string, std::string> serverMap;
1543  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1544  auto pos = fileInfo.find('=');
1545  if (pos == std::string::npos)
1546  continue;
1547  auto stitle = fileInfo.substr(0, pos);
1548  auto svalue = fileInfo.substr(pos + 1);
1549  serverMap[stitle] = svalue;
1550  }
1551 
1552  //check that response run number if correct
1553  auto server_version = serverMap.find("version");
1554  assert(server_version != serverMap.end());
1555 
1556  auto server_run = serverMap.find("runnumber");
1557  assert(server_run != serverMap.end());
1558  assert(run_nstring_ == server_run->second);
1559 
1560  auto server_state = serverMap.find("state");
1561  assert(server_state != serverMap.end());
1562 
1563  auto server_eols = serverMap.find("lasteols");
1564  assert(server_eols != serverMap.end());
1565 
1566  auto server_ls = serverMap.find("lumisection");
1567 
1568  int version_maj = 1;
1569  int version_min = 0;
1570  int version_rev = 0;
1571  {
1572  auto* s_ptr = server_version->second.c_str();
1573  if (!server_version->second.empty() && server_version->second[0] == '"')
1574  s_ptr++;
1575  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1576  if (res < 3) {
1577  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1578  if (res < 2) {
1579  res = sscanf(s_ptr, "%d", &version_maj);
1580  if (res < 1) {
1581  //expecting at least 1 number (major version)
1582  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1583  }
1584  }
1585  }
1586  }
1587 
1588  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1589  if (server_ls != serverMap.end())
1590  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1591  else
1592  serverLS = closedServerLS + 1;
1593 
1594  std::string s_state = server_state->second;
1595  if (s_state == "STARTING") //initial, always empty starting with LS 1
1596  {
1597  auto server_file = serverMap.find("file");
1598  assert(server_file == serverMap.end()); //no file with starting state
1599  fileStatus = noFile;
1600  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1601  } else if (s_state == "READY") {
1602  auto server_file = serverMap.find("file");
1603  if (server_file == serverMap.end()) {
1604  //can be returned by server if files from new LS already appeared but LS is not yet closed
1605  if (serverLS <= closedServerLS)
1606  serverLS = closedServerLS + 1;
1607  fileStatus = noFile;
1608  edm::LogInfo("EvFDaqDirector")
1609  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1610  } else {
1611  std::string filestem;
1612  std::string fileprefix;
1613  auto server_fileprefix = serverMap.find("fileprefix");
1614 
1615  if (server_fileprefix != serverMap.end()) {
1616  auto pssize = server_fileprefix->second.size();
1617  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1618  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1619  else
1620  fileprefix = server_fileprefix->second;
1621  }
1622 
1623  //remove string literals
1624  auto ssize = server_file->second.size();
1625  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1626  filestem = server_file->second.substr(1, ssize - 2);
1627  else
1628  filestem = server_file->second;
1629  assert(!filestem.empty());
1630  if (version_maj > 1) {
1631  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1632  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1633  nextFileJson = "";
1634  rawHeader = true;
1635  } else {
1636  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1637  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1638  nextFileJson = filestem + ".jsn";
1639  rawHeader = false;
1640  }
1641  fileStatus = newFile;
1642  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1643  << serverLS << " file:" << filestem;
1644  }
1645  } else if (s_state == "EOLS") {
1646  serverLS = closedServerLS + 1;
1647  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1648  fileStatus = noFile;
1649  } else if (s_state == "EOR") {
1650  //server_eor = serverMap.find("iseor");
1651  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1652  fileStatus = runEnded;
1653  } else if (s_state == "NORUN") {
1654  auto err_msg = serverMap.find("errormessage");
1655  if (err_msg != serverMap.end())
1656  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1657  else
1658  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1659  edm::LogWarning("EvFDaqDirector") << "executing run end";
1660  fileStatus = runEnded;
1661  } else if (s_state == "ERROR") {
1662  auto err_msg = serverMap.find("errormessage");
1663  if (err_msg != serverMap.end())
1664  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1665  else
1666  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1667  fileStatus = noFile;
1668  serverError = true;
1669  } else {
1670  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1671  fileStatus = noFile;
1672  serverError = true;
1673  }
1674 
1675  // Read until EOF, writing data to output as we go.
1676  if (!fileBrokerKeepAlive_) {
1677  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1678  }
1679  if (ec != boost::asio::error::eof) {
1680  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1681  serverError = true;
1682  }
1683  }
1684 
1685  break;
1686  }
1687 
1688  } catch (std::exception const& e) {
1689  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1690  serverError = true;
1691  }
1692 
1693  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1694  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1695  if (ec) {
1696  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1697  }
1698  socket_->close(ec);
1699  if (ec) {
1700  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1701  }
1702  }
1703 
1704  if (serverError) {
1705  if (socket_->is_open())
1706  socket_->close(ec);
1707  if (ec) {
1708  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1709  }
1710  fileStatus = noFile;
1711  sleep(1); //back-off if error detected
1712  }
1713 
1714  return fileStatus;
1715  }
1716 
1718  unsigned int& ls,
1719  std::string& nextFileRaw,
1720  int& rawFd,
1721  uint16_t& rawHeaderSize,
1722  int32_t& serverEventsInNewFile,
1723  int64_t& fileSizeFromMetadata,
1724  uint64_t& thisLockWaitTimeUs) {
1725  EvFDaqDirector::FileStatus fileStatus = noFile;
1726 
1727  //int retval = -1;
1728  //int lock_attempts = 0;
1729  //long total_lock_attempts = 0;
1730 
1731  struct stat buf;
1732  int stopFileLS = -1;
1733  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1734  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1735  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1736  if (stopFileCheck == 0)
1737  stopFileLS = readLastLSEntry(stopFilePath_);
1738  else
1739  stopFileLS = 1; //stop without drain if only pid is stopped
1740  if (!stop_ls_override_) {
1741  //if lumisection is higher than in stop file, should quit at next from current
1742  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1743  stopFileLS = stop_ls_override_ = ls;
1744  } else
1745  stopFileLS = stop_ls_override_;
1746  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1747  << stopFileLS;
1748  //return runEnded;
1749  } else //if file was removed before reaching stop condition, reset this
1750  stop_ls_override_ = 0;
1751 
1752  /* look for EoLS
1753  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1754  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1755  ls++;
1756  return noFile;
1757  }
1758  */
1759 
1760  timeval ts_lockbegin;
1761  gettimeofday(&ts_lockbegin, nullptr);
1762 
1763  std::string nextFileJson;
1764  uint32_t serverLS, closedServerLS;
1765  unsigned int serverHttpStatus;
1766  bool serverError;
1767 
1768  //local lock to force index json and EoLS files to appear in order
1770  lockFULocal();
1771 
1772  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1773  bool rawHeader = false;
1774  fileStatus = contactFileBroker(
1775  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1776 
1777  if (serverError) {
1778  //do not update anything
1780  unlockFULocal();
1781  return noFile;
1782  }
1783 
1784  //handle creation of BoLS files if lumisection has changed
1785  if (currentLumiSection == 0) {
1786  if (fileStatus == runEnded)
1787  createLumiSectionFiles(closedServerLS, 0, true, false);
1788  else
1789  createLumiSectionFiles(serverLS, 0, true, false);
1790  } else {
1791  if (closedServerLS >= currentLumiSection) {
1792  //only BoLS files
1793  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1794  createLumiSectionFiles(i + 1, i, true, false);
1795  }
1796  }
1797 
1798  bool fileFound = true;
1799 
1800  if (fileStatus == newFile) {
1801  if (rawHeader > 0)
1802  serverEventsInNewFile =
1803  grabNextJsonFromRaw(nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false);
1804  else
1805  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1806  }
1807  //closing file in case of any error
1808  if (serverEventsInNewFile < 0 && rawFd != -1) {
1809  close(rawFd);
1810  rawFd = -1;
1811  }
1812 
1813  //can unlock because all files have been created locally
1815  unlockFULocal();
1816 
1817  if (!fileFound) {
1818  //catch condition where directory got deleted
1819  fileStatus = noFile;
1820  struct stat buf;
1821  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1822  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1823  fileStatus = runEnded;
1824  }
1825  }
1826 
1827  //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1828  //so that EoLS files can not appear locally before index files
1829  if (currentLumiSection == 0) {
1830  lockFULocal2();
1831  if (fileStatus == runEnded) {
1832  createLumiSectionFiles(closedServerLS, 0, false, true);
1833  createLumiSectionFiles(serverLS, closedServerLS, false, true); // +1
1834  } else {
1835  createLumiSectionFiles(serverLS, 0, false, true);
1836  }
1837  unlockFULocal2();
1838  } else {
1839  if (closedServerLS >= currentLumiSection) {
1840  //lock exclusive to create EoLS files
1841  lockFULocal2();
1842  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1843  createLumiSectionFiles(i + 1, i, false, true);
1844  unlockFULocal2();
1845  }
1846  }
1847 
1848  if (fileStatus == runEnded)
1849  ls = std::max(currentLumiSection, serverLS);
1850  else if (fileStatus == newFile) {
1851  assert(serverLS >= ls);
1852  ls = serverLS;
1853  } else if (fileStatus == noFile) {
1854  if (serverLS >= ls)
1855  ls = serverLS;
1856  else {
1857  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1858  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1859  sleep(1);
1860  }
1861  }
1862 
1863  return fileStatus;
1864  }
1865 
1867  // create open dir if not already there
1868 
1870  if (!std::filesystem::is_directory(openPath)) {
1871  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1872  std::filesystem::create_directories(openPath);
1873  }
1874  }
1875 
1877  std::ifstream ij(file);
1878  Json::Value deserializeRoot;
1880 
1881  if (!reader.parse(ij, deserializeRoot)) {
1882  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1883  return -1;
1884  }
1885 
1886  int ret = deserializeRoot.get("lastLS", "").asInt();
1887  return ret;
1888  }
1889 
1891  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1893  struct stat buf;
1894  unsigned int lscount = startFromLS_;
1895  do {
1896  std::stringstream ss;
1897  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1898  fullpath = ss.str();
1899  lscount++;
1900  } while (stat(fullpath.c_str(), &buf) == 0);
1901  return lscount - 1;
1902  }
1903 
1904  //if transferSystem PSet is present in the menu, we require it to be complete and consistent for all specified streams
1906  if (transferSystemJson_)
1907  return;
1908 
1909  transferSystemJson_.reset(new Json::Value);
1910  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1911  if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
1912  const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
1913 
1914  Json::Value destinationsVal(Json::arrayValue);
1915  std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
1916  for (auto& dest : destinations)
1917  destinationsVal.append(dest);
1918  (*transferSystemJson_)["destinations"] = destinationsVal;
1919 
1920  Json::Value modesVal(Json::arrayValue);
1921  std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
1922  for (auto& mode : modes)
1923  modesVal.append(mode);
1924  (*transferSystemJson_)["transferModes"] = modesVal;
1925 
1926  for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
1927  if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
1928  const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
1929  Json::Value streamVal;
1930  for (auto& mode : modes) {
1931  //validation
1932  if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
1933  throw cms::Exception("EvFDaqDirector")
1934  << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
1935  << ")";
1936  std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
1937 
1938  Json::Value sDestsValue(Json::arrayValue);
1939 
1940  if (streamDestinations.empty())
1941  throw cms::Exception("EvFDaqDirector")
1942  << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;
1943 
1944  for (auto& sdest : streamDestinations) {
1945  bool sDestValid = false;
1946  sDestsValue.append(sdest);
1947  for (auto& dest : destinations) {
1948  if (dest == sdest)
1949  sDestValid = true;
1950  }
1951  if (!sDestValid)
1952  throw cms::Exception("EvFDaqDirector")
1953  << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
1954  << ", dest:" << sdest;
1955  }
1956  streamVal[mode] = sDestsValue;
1957  }
1958  (*transferSystemJson_)[psKeyItr->first] = streamVal;
1959  }
1960  }
1961  } else {
1962  if (requireTSPSet_)
1963  throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
1964  }
1965  }
1966 
1968  std::string streamRequestName;
1969  if (transferSystemJson_->isMember(stream.c_str()))
1970  streamRequestName = stream;
1971  else {
1972  std::stringstream msg;
1973  msg << "Transfer system mode definitions missing for -: " << stream;
1974  if (requireTSPSet_)
1975  throw cms::Exception("EvFDaqDirector") << msg.str();
1976  else {
1977  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1978  return std::string("Failsafe");
1979  }
1980  }
1981  //return empty if strict check parameter is not on
1982  if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1983  edm::LogWarning("EvFDaqDirector")
1984  << "Selected mode string is not provided as DaqDirector parameter."
1985  << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1986  return std::string("Failsafe");
1987  }
1988  if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1989  throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
1990  }
1991  //check if stream has properly listed transfer stream
1992  if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
1993  std::stringstream msg;
1994  msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
1995  if (requireTSPSet_)
1996  throw cms::Exception("EvFDaqDirector") << msg.str();
1997  else
1998  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1999  return std::string("Failsafe");
2000  }
2001  Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");
2002 
2003  //flatten string json::Array into CSV std::string
2004  std::string ret;
2005  for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
2006  if (!ret.empty())
2007  ret += ",";
2008  ret += (*it).asString();
2009  }
2010  return ret;
2011  }
2012 
2014  if (mergeTypePset_.empty())
2015  return;
2016  if (!mergeTypeMap_.empty())
2017  return;
2018  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
2019  if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
2020  const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
2021  for (const std::string& pname : tsPset.getParameterNames()) {
2022  std::string streamType = tsPset.getParameter<std::string>(pname);
2023  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2024  mergeTypeMap_.insert(ac, pname);
2025  ac->second = streamType;
2026  ac.release();
2027  }
2028  }
2029  }
2030 
2032  tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
2033  if (mergeTypeMap_.find(search_ac, stream))
2034  return search_ac->second;
2035 
2036  edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
2037  std::string defaultName = MergeTypeNames_[defaultType];
2038  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2039  mergeTypeMap_.insert(ac, stream);
2040  ac->second = defaultName;
2041  ac.release();
2042  return defaultName;
2043  }
2044 
2046  std::string proc_flag = run_dir_ + "/processing";
2047  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2048  close(proc_flag_fd);
2049  }
2050 
2051  struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
2052 #ifdef __APPLE__
2053  return {start, len, pid, type, whence};
2054 #else
2055  return {type, whence, start, len, pid};
2056 #endif
2057  }
2058 
2059 } // namespace evf
ConfigurationDescriptions.h
runTheMatrix.ret
ret
prodAgent to be discontinued
Definition: runTheMatrix.py:543
evf::EvFDaqDirector::getOpenOutputJsonFilePath
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:425
edm::GlobalContext::luminosityBlockID
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:60
evf::EvFDaqDirector::preallocate
void preallocate(edm::service::SystemBounds const &bounds)
Definition: EvFDaqDirector.cc:320
evf::EvFDaqDirector::directorBU_
bool directorBU_
Definition: EvFDaqDirector.h:218
evf::EvFDaqDirector::previousFileSize_
unsigned long previousFileSize_
Definition: EvFDaqDirector.h:246
evf::EvFDaqDirector::bumpFile
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
Definition: EvFDaqDirector.cc:779
evf::EvFDaqDirector::requireTSPSet_
bool requireTSPSet_
Definition: EvFDaqDirector.h:215
evf::EvFDaqDirector::unlockFULocal2
void unlockFULocal2()
Definition: EvFDaqDirector.cc:928
evf::EvFDaqDirector::endpoint_iterator_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
Definition: EvFDaqDirector.h:283
eostools.ls
def ls(path, rec=False)
Definition: eostools.py:349
evf::EvFDaqDirector::dirManager_
DirManager dirManager_
Definition: EvFDaqDirector.h:244
Json::ValueIterator
Iterator for object and array value.
Definition: value.h:908
evf::EvFDaqDirector::createBoLSFile
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
Definition: EvFDaqDirector.cc:933
evf::EvFDaqDirector::fileBrokerKeepAlive_
bool fileBrokerKeepAlive_
Definition: EvFDaqDirector.h:211
evf::EvFDaqDirector::getRawFilePath
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
Definition: EvFDaqDirector.cc:405
electrons_cff.bool
bool
Definition: electrons_cff.py:366
mps_fire.i
i
Definition: mps_fire.py:428
evf::EvFDaqDirector::fulocal_rwlock_fd_
int fulocal_rwlock_fd_
Definition: EvFDaqDirector.h:235
getFRDFileHeaderVersion
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:43
DataPoint.h
evf::EvFDaqDirector::runEnded
Definition: EvFDaqDirector.h:64
MessageLogger.h
evf::EvFDaqDirector::useFileBroker_
bool useFileBroker_
Definition: EvFDaqDirector.h:207
evf::EvFDaqDirector::init_lock_
pthread_mutex_t init_lock_
Definition: EvFDaqDirector.h:260
evf::EvFDaqDirector::getEoLSFilePathOnBU
std::string getEoLSFilePathOnBU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:476
funct::false
false
Definition: Factorize.h:29
evf::EvFDaqDirector::filesToDeletePtr_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
Definition: EvFDaqDirector.h:258
evf::EvFDaqDirector::getEoRFilePath
std::string getEoRFilePath() const
Definition: EvFDaqDirector.cc:488
evf::EvFDaqDirector::readEolsDefinition_
bool readEolsDefinition_
Definition: EvFDaqDirector.h:265
evf::EvFDaqDirector::mergeTypePset_
std::string mergeTypePset_
Definition: EvFDaqDirector.h:217
reco_skim_cfg_mod.fullpath
fullpath
Definition: reco_skim_cfg_mod.py:202
filterCSVwithJSON.copy
copy
Definition: filterCSVwithJSON.py:36
edm::ActivityRegistry::watchPostGlobalEndRun
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
Definition: ActivityRegistry.h:350
evf::EvFDaqDirector::bu_run_open_dir_
std::string bu_run_open_dir_
Definition: EvFDaqDirector.h:229
fffnaming::streamerDataFileNameWithPid
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:72
fffnaming::streamerDataChecksumFileNameWithInstance
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:91
Json::Value::end
const_iterator end() const
evf::EvFDaqDirector::bu_readlock_fd_
int bu_readlock_fd_
Definition: EvFDaqDirector.h:232
evf::EvFDaqDirector::nThreads_
unsigned int nThreads_
Definition: EvFDaqDirector.h:263
Json::arrayValue
array value (ordered list)
Definition: value.h:30
evf::EvFDaqDirector::getStreamMergeType
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
Definition: EvFDaqDirector.cc:2031
evf::EvFDaqDirector::getLumisectionToStart
unsigned int getLumisectionToStart() const
Definition: EvFDaqDirector.cc:1890
edm::ProcessContext
Definition: ProcessContext.h:27
evf::EvFDaqDirector::checkTransferSystemPSet
void checkTransferSystemPSet(edm::ProcessContext const &pc)
Definition: EvFDaqDirector.cc:1905
Json::Value::get
Value get(UInt index, const Value &defaultValue) const
pos
Definition: PixelAliasList.h:18
evf::EvFDaqDirector::getRootHistogramFilePath
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:468
ALCARECOPromptCalibProdSiPixelAli0T_cff.mode
mode
Definition: ALCARECOPromptCalibProdSiPixelAli0T_cff.py:96
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
evf::EvFDaqDirector::bu_writelock_fd_
int bu_writelock_fd_
Definition: EvFDaqDirector.h:233
evf::EvFDaqDirector::getProtocolBufferHistogramFilePath
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:454
evf::EvFDaqDirector::createLumiSectionFiles
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
Definition: EvFDaqDirector.cc:943
cms::cuda::stream
uint32_t const T *__restrict__ const uint32_t *__restrict__ int32_t int Histo::index_type cudaStream_t stream
Definition: HistoContainer.h:51
evf::EvFDaqDirector::contactFileBroker
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
Definition: EvFDaqDirector.cc:1451
fffnaming::protocolBufferHistogramFileNameWithInstance
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:129
evf::EvFDaqDirector::getStreamDestinations
std::string getStreamDestinations(std::string const &stream) const
Definition: EvFDaqDirector.cc:1967
evf::EvFDaqDirector::tryInitializeFuLockFile
void tryInitializeFuLockFile()
Definition: EvFDaqDirector.cc:880
cms::cuda::assert
assert(be >=bs)
evf::EvFDaqDirector::io_service_
boost::asio::io_service io_service_
Definition: EvFDaqDirector.h:280
fffnaming::inputRawFileName
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
Definition: FFFNamingSchema.h:48
evf::EvFDaqDirector::lockInitLock
void lockInitLock()
Definition: EvFDaqDirector.cc:909
evf::EvFDaqDirector::stop_ls_override_
unsigned int stop_ls_override_
Definition: EvFDaqDirector.h:269
mps_check.msg
tuple msg
Definition: mps_check.py:285
FRDEventMessage.h
evf::EvFDaqDirector::fileBrokerHost_
std::string fileBrokerHost_
Definition: EvFDaqDirector.h:209
edm::ParameterSet::existsAs
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:171
evf::EvFDaqDirector::runAbort
Definition: EvFDaqDirector.h:64
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
beamerCreator.create
def create(alignables, pedeDump, additionalData, outputFile, config)
Definition: beamerCreator.py:44
fffnaming::eorFileName
std::string eorFileName(const unsigned int run)
Definition: FFFNamingSchema.h:34
newFWLiteAna.found
found
Definition: newFWLiteAna.py:118
evf::EvFDaqDirector::getOpenDatFilePath
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:421
evf::EvFDaqDirector::getFFFParamsFilePathOnBU
std::string getFFFParamsFilePathOnBU() const
Definition: EvFDaqDirector.cc:492
fffnaming::eolsFileName
std::string eolsFileName(const unsigned int run, const unsigned int ls)
Definition: FFFNamingSchema.h:20
evf::EvFDaqDirector::query_
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
Definition: EvFDaqDirector.h:282
edm::LogWarning
Log< level::Warning, false > LogWarning
Definition: MessageLogger.h:122
Json::Reader
Unserialize a JSON document into a Value.
Definition: reader.h:16
evf::EvFDaqDirector::baseRunDir
std::string & baseRunDir()
Definition: EvFDaqDirector.h:76
evf::EvFDaqDirector::getRunOpenDirPath
std::string getRunOpenDirPath() const
Definition: EvFDaqDirector.h:106
evf::EvFDaqDirector::getDatFilePath
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:417
evf::EvFDaqDirector::preBeginJob
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
Definition: EvFDaqDirector.cc:359
myMessageLogger_cff.destinations
destinations
Definition: myMessageLogger_cff.py:36
evf::EvFDaqDirector::getInitFilePath
std::string getInitFilePath(std::string const &stream) const
Definition: EvFDaqDirector.cc:445
getHLTprescales.readIndex
def readIndex()
Definition: getHLTprescales.py:36
evf::FastMonitoringService::accumulateFileSize
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
Definition: FastMonitoringService.cc:668
evf::EvFDaqDirector::openFULockfileStream
void openFULockfileStream(bool create)
Definition: EvFDaqDirector.cc:890
contentValuesCheck.ss
ss
Definition: contentValuesCheck.py:33
evf::FastMonitoringService
Definition: FastMonitoringService.h:153
evf::EvFDaqDirector::updateFuLock
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
Definition: EvFDaqDirector.cc:503
evf::EvFDaqDirector::getEoLSFilePathOnFU
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:480
FRDFileHeader.h
GlobalPosition_Frontier_DevDB_cff.connect
connect
Definition: GlobalPosition_Frontier_DevDB_cff.py:8
RPCNoise_example.check
check
Definition: RPCNoise_example.py:71
evf::EvFDaqDirector::stopFilePath_
std::string stopFilePath_
Definition: EvFDaqDirector.h:267
edm::ConfigurationDescriptions::add
void add(std::string const &label, ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:57
Calorimetry_cff.dp
dp
Definition: Calorimetry_cff.py:158
edm::ActivityRegistry::watchPreBeginJob
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
Definition: ActivityRegistry.h:151
DQM.reader
reader
Definition: DQM.py:105
Service.h
evf::EvFDaqDirector::parseFRDFileHeader
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
Definition: EvFDaqDirector.cc:965
evf::EvFDaqDirector::getOpenInputJsonFilePath
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
Definition: EvFDaqDirector.cc:413
evf::EvFDaqDirector::selectedTransferMode_
std::string selectedTransferMode_
Definition: EvFDaqDirector.h:216
evf::EvFDaqDirector::fms_
evf::FastMonitoringService * fms_
Definition: EvFDaqDirector.h:255
edm::ActivityRegistry
Definition: ActivityRegistry.h:134
summarizeEdmComparisonLogfiles.success
success
Definition: summarizeEdmComparisonLogfiles.py:114
corrVsCorr.filename
filename
Definition: corrVsCorr.py:123
Json::Value::asInt
Int asInt() const
eostools.mkdir
def mkdir(path)
Definition: eostools.py:251
fffnaming::streamerDataFileNameWithInstance
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:81
evf::EvFDaqDirector::noFile
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::getMergedProtocolBufferHistogramFilePath
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:459
evf::EvFDaqDirector::run_
unsigned int run_
Definition: EvFDaqDirector.h:206
edm::ProcessContext::parameterSetID
ParameterSetID const & parameterSetID() const
Definition: ProcessContext.h:32
ParameterSetDescription.h
evf::EvFDaqDirector::eolsNFilesIndex_
unsigned int eolsNFilesIndex_
Definition: EvFDaqDirector.h:266
unpackData-CaloStage2.pname
pname
Definition: unpackData-CaloStage2.py:76
evf::EvFDaqDirector::pid_
std::string pid_
Definition: EvFDaqDirector.h:226
evf::EvFDaqDirector::fileBrokerUseLocalLock_
bool fileBrokerUseLocalLock_
Definition: EvFDaqDirector.h:212
mitigatedMETSequence_cff.U
U
Definition: mitigatedMETSequence_cff.py:36
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
jsoncollector::DataPointDefinition::getNames
std::vector< std::string > const & getNames() const
Definition: DataPointDefinition.h:44
geometryDiff.file
file
Definition: geometryDiff.py:13
evf::EvFDaqDirector::MergeTypeNames_
static const std::vector< std::string > MergeTypeNames_
Definition: EvFDaqDirector.h:275
evf::EvFDaqDirector::fileDeleteLockPtr_
std::mutex * fileDeleteLockPtr_
Definition: EvFDaqDirector.h:257
evf::EvFDaqDirector::getOutputJsonFilePath
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:429
cppFunctionSkipper.exception
exception
Definition: cppFunctionSkipper.py:10
funct::true
true
Definition: Factorize.h:173
evf::EvFDaqDirector::fulocal_rwlock_fd2_
int fulocal_rwlock_fd2_
Definition: EvFDaqDirector.h:236
fffnaming::rootHistogramFileNameWithPid
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:139
edm::GlobalContext
Definition: GlobalContext.h:29
edm::service::SystemBounds
Definition: SystemBounds.h:29
LogDebug
#define LogDebug(id)
Definition: MessageLogger.h:233
fffnaming::initFileNameWithPid
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:55
edm::ParameterSet
Definition: ParameterSet.h:47
edm::service::SystemBounds::maxNumberOfThreads
unsigned int maxNumberOfThreads() const
Definition: SystemBounds.h:38
Json::Value::append
Value & append(const Value &value)
Append value to array at the end.
evf::EvFDaqDirector::preBeginRun
void preBeginRun(edm::GlobalContext const &globalContext)
Definition: EvFDaqDirector.cc:364
evf::EvFDaqDirector::fileBrokerPort_
std::string fileBrokerPort_
Definition: EvFDaqDirector.h:210
GlobalContext.h
evf::EvFDaqDirector::getMergedDatChecksumFilePath
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:437
evf::EvFDaqDirector::getNextFromFileBroker
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
Definition: EvFDaqDirector.cc:1717
SiStripPI::max
Definition: SiStripPayloadInspectorHelper.h:169
evf::EvFDaqDirector::~EvFDaqDirector
~EvFDaqDirector()
Definition: EvFDaqDirector.cc:301
eostools.chmod
def chmod(path, mode)
Definition: eostools.py:294
SiStripCommissioningSource_FromEDM_cfg.EvFDaqDirector
EvFDaqDirector
Definition: SiStripCommissioningSource_FromEDM_cfg.py:57
evf::EvFDaqDirector::mergeTypeMap_
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
Definition: EvFDaqDirector.h:272
evf::EvFDaqDirector::unlockFULocal
void unlockFULocal()
Definition: EvFDaqDirector.cc:918
fffnaming::streamerJsonFileNameWithPid
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:101
recoMuon::in
Definition: RecoMuonEnumerators.h:6
gainCalibHelper::gainCalibPI::type
type
Definition: SiPixelGainCalibHelper.h:40
evf::EvFDaqDirector::getOpenInitFilePath
std::string getOpenInitFilePath(std::string const &stream) const
Definition: EvFDaqDirector.cc:441
evf::EvFDaqDirector::run_dir_
std::string run_dir_
Definition: EvFDaqDirector.h:227
edm::Service
Definition: Service.h:30
createfilelist.int
int
Definition: createfilelist.py:10
EvFDaqDirector.h
FastMonitoringService.h
evf::EvFDaqDirector::getMergedRootHistogramFilePath
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:472
edm::LuminosityBlockID::luminosityBlock
LuminosityBlockNumber_t luminosityBlock() const
Definition: LuminosityBlockID.h:42
evf::DirManager::findHighestRunDir
std::string findHighestRunDir()
Definition: DirManager.cc:23
evf::EvFDaqDirector::fuLockPollInterval_
unsigned int fuLockPollInterval_
Definition: EvFDaqDirector.h:213
evf::EvFDaqDirector::lockFULocal
void lockFULocal()
Definition: EvFDaqDirector.cc:913
evf::EvFDaqDirector::getOpenProtocolBufferHistogramFilePath
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:449
evf::EvFDaqDirector
Definition: EvFDaqDirector.h:62
evf::EvFDaqDirector::bu_base_dir_
std::string bu_base_dir_
Definition: EvFDaqDirector.h:205
edm::service::SystemBounds::maxNumberOfStreams
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:35
jsoncollector::DataPoint
Definition: DataPoint.h:36
evf::EvFDaqDirector::createRunOpendirMaybe
void createRunOpendirMaybe()
Definition: EvFDaqDirector.cc:1866
Json::Value::begin
const_iterator begin() const
evf::EvFDaqDirector::getEoRFilePathOnFU
std::string getEoRFilePathOnFU() const
Definition: EvFDaqDirector.cc:490
edm::LogError
Log< level::Error, false > LogError
Definition: MessageLogger.h:123
evf::EvFDaqDirector::fu_rw_fulk
struct flock fu_rw_fulk
Definition: EvFDaqDirector.h:253
res
Definition: Electron.h:6
evf::EvFDaqDirector::hltSourceDirectory_
std::string hltSourceDirectory_
Definition: EvFDaqDirector.h:219
evf::EvFDaqDirector::startFromLS_
unsigned int startFromLS_
Definition: EvFDaqDirector.h:221
visDQMUpload.buf
buf
Definition: visDQMUpload.py:160
AlCaHLTBitMon_QueryRunRegistry.string
string string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
FedRawDataInputSource.h
jsoncollector
Definition: DataPoint.h:26
evf::EvFDaqDirector::readLastLSEntry
int readLastLSEntry(std::string const &file)
Definition: EvFDaqDirector.cc:1876
jsoncollector::DataPointDefinition
Definition: DataPointDefinition.h:20
readEcalDQMStatus.read
read
Definition: readEcalDQMStatus.py:38
evf::EvFDaqDirector::getMergedDatFilePath
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:433
writeEcalDQMStatus.write
write
Definition: writeEcalDQMStatus.py:48
edm::ActivityRegistry::watchPreGlobalEndLumi
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:421
edm::ActivityRegistry::watchPreallocate
void watchPreallocate(Preallocate::slot_type const &iSlot)
Definition: ActivityRegistry.h:144
FRDFileHeader_v1
Definition: FRDFileHeader.h:24
evf::EvFDaqDirector::fu_rw_flk
struct flock fu_rw_flk
Definition: EvFDaqDirector.h:252
DataPointDefinition.h
evf::EvFDaqDirector::run_string_
std::string run_string_
Definition: EvFDaqDirector.h:224
edm::getParameterSet
ParameterSet const & getParameterSet(ParameterSetID const &id)
Definition: ParameterSet.cc:862
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
evf::EvFDaqDirector::socket_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
Definition: EvFDaqDirector.h:284
std
Definition: JetResolutionObject.h:76
evf::EvFDaqDirector::createProcessingNotificationMaybe
void createProcessingNotificationMaybe() const
Definition: EvFDaqDirector.cc:2045
evf::EvFDaqDirector::initRun
void initRun()
Definition: EvFDaqDirector.cc:149
evf::MergeType
MergeType
Definition: EvFDaqDirector.h:58
evf::EvFDaqDirector::rawFileHasHeader
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
Definition: EvFDaqDirector.cc:1061
edm::PathsAndConsumesOfModulesBase
Definition: PathsAndConsumesOfModulesBase.h:35
fffnaming::rootHistogramFileNameWithInstance
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:148
fffnaming::inputJsonFileName
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
Definition: FFFNamingSchema.h:41
Exception
Definition: hltDiff.cc:245
evf::EvFDaqDirector::fulockfile_
std::string fulockfile_
Definition: EvFDaqDirector.h:230
MatrixUtil.remove
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:219
evf
Definition: fillJson.h:27
evf::EvFDaqDirector::preGlobalEndLumi
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
Definition: EvFDaqDirector.cc:383
evf::EvFDaqDirector::transferSystemJson_
std::shared_ptr< Json::Value > transferSystemJson_
Definition: EvFDaqDirector.h:271
evf::EvFDaqDirector::getInputJsonFilePath
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
Definition: EvFDaqDirector.cc:401
evf::EvFDaqDirector::hostname_
std::string hostname_
Definition: EvFDaqDirector.h:223
evf::EvFDaqDirector::getOpenRootHistogramFilePath
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:464
evf::EvFDaqDirector::dpd_
jsoncollector::DataPointDefinition * dpd_
Definition: EvFDaqDirector.h:278
edm::ParameterSet::getParameter
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
Exception.h
evf::EvFDaqDirector::resolver_
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
Definition: EvFDaqDirector.h:281
evf::EvFDaqDirector::removeFile
void removeFile(unsigned int ls, unsigned int index)
Definition: EvFDaqDirector.cc:501
fffnaming::bolsFileName
std::string bolsFileName(const unsigned int run, const unsigned int ls)
Definition: FFFNamingSchema.h:27
evf::EvFDaqDirector::nStreams_
unsigned int nStreams_
Definition: EvFDaqDirector.h:262
data
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
evf::EvFDaqDirector::grabNextJsonFileAndUnlock
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
Definition: EvFDaqDirector.cc:1362
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
timingPdfMaker.infile
infile
Definition: timingPdfMaker.py:349
evf::EvFDaqDirector::fu_readwritelock_fd_
int fu_readwritelock_fd_
Definition: EvFDaqDirector.h:234
RecoTauValidation_cfi.header
header
Definition: RecoTauValidation_cfi.py:291
cond::uint64_t
unsigned long long uint64_t
Definition: Time.h:13
evf::EvFDaqDirector::bu_run_dir_
std::string bu_run_dir_
Definition: EvFDaqDirector.h:228
evf::EvFDaqDirector::getBoLSFilePathOnFU
std::string getBoLSFilePathOnFU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:484
spu::def
int def(FILE *, FILE *, int)
Definition: SherpackUtilities.cc:14
mps_fire.result
result
Definition: mps_fire.py:311
evf::EvFDaqDirector::base_dir_
std::string base_dir_
Definition: EvFDaqDirector.h:204
cms::Exception
Definition: Exception.h:70
timingPdfMaker.outfile
outfile
Definition: timingPdfMaker.py:350
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
edm::ActivityRegistry::watchPreGlobalBeginRun
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
Definition: ActivityRegistry.h:333
evf::EvFDaqDirector::fu_rw_lock_stream
FILE * fu_rw_lock_stream
Definition: EvFDaqDirector.h:240
command_line.start
start
Definition: command_line.py:167
fffnaming::protocolBufferHistogramFileNameWithPid
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:120
evf::EvFDaqDirector::getOpenRawFilePath
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
Definition: EvFDaqDirector.cc:409
evf::EvFDaqDirector::stopFilePathPid_
std::string stopFilePathPid_
Definition: EvFDaqDirector.h:268
edm_modernize_messagelogger.stat
stat
Definition: edm_modernize_messagelogger.py:27
evf::EvFDaqDirector::unlockInitLock
void unlockInitLock()
Definition: EvFDaqDirector.cc:911
evf::EvFDaqDirector::run_nstring_
std::string run_nstring_
Definition: EvFDaqDirector.h:225
ProcessContext.h
evf::EvFDaqDirector::postEndRun
void postEndRun(edm::GlobalContext const &globalContext)
Definition: EvFDaqDirector.cc:374
edm::Log
Definition: MessageLogger.h:70
StreamID.h
evf::EvFDaqDirector::FileStatus
FileStatus
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::lockFULocal2
void lockFULocal2()
Definition: EvFDaqDirector.cc:923
SystemBounds.h
evf::EvFDaqDirector::grabNextJsonFile
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
Definition: EvFDaqDirector.cc:1182
evf::EvFDaqDirector::getNFilesFromEoLS
int getNFilesFromEoLS(std::string BUEoLSFile)
Definition: EvFDaqDirector.cc:718
edm::ParameterSet::getParameterSet
ParameterSet const & getParameterSet(std::string const &) const
Definition: ParameterSet.cc:2128
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
Json::Value
Represents a JSON value.
Definition: value.h:99
evf::EvFDaqDirector::fillDescriptions
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: EvFDaqDirector.cc:327
evf::EvFDaqDirector::bu_w_lock_stream
FILE * bu_w_lock_stream
Definition: EvFDaqDirector.h:238
evf::EvFDaqDirector::fileBrokerHostFromCfg_
bool fileBrokerHostFromCfg_
Definition: EvFDaqDirector.h:208
mps_fire.dest
dest
Definition: mps_fire.py:179
evf::EvFDaqDirector::newFile
Definition: EvFDaqDirector.h:64
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
evf::EvFDaqDirector::grabNextJsonFromRaw
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
Definition: EvFDaqDirector.cc:1100
evf::EvFDaqDirector::checkMergeTypePSet
void checkMergeTypePSet(edm::ProcessContext const &pc)
Definition: EvFDaqDirector.cc:2013