CMS 3D CMS Logo

EvFDaqDirector.cc
Go to the documentation of this file.
10 
18 
19 #include <iostream>
20 #include <fstream>
21 #include <sstream>
22 #include <sys/time.h>
23 #include <unistd.h>
24 #include <cstdio>
25 #include <boost/lexical_cast.hpp>
26 #include <boost/algorithm/string.hpp>
27 
28 //using boost::asio::ip::tcp;
29 
30 //#define DEBUG
31 
32 using namespace jsoncollector;
33 
34 namespace evf {
35 
36  //for enum MergeType
37  const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
38 
40  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
41  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
42  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
43  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
44  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
45  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
46  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
47  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
48  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
49  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
50  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
51  requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
52  selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
53  mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
54  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
55  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
56  hostname_(""),
57  bu_readlock_fd_(-1),
58  bu_writelock_fd_(-1),
59  fu_readwritelock_fd_(-1),
60  fulocal_rwlock_fd_(-1),
61  fulocal_rwlock_fd2_(-1),
62  bu_w_lock_stream(nullptr),
63  bu_r_lock_stream(nullptr),
64  fu_rw_lock_stream(nullptr),
65  dirManager_(base_dir_),
66  previousFileSize_(0),
67  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
68  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
69  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
70  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
71  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
72  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
78 
79  //save hostname for later
80  char hostname[33];
81  gethostname(hostname, 32);
82  hostname_ = hostname;
83 
84  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
85  if (fuLockPollIntervalPtr) {
86  try {
87  fuLockPollInterval_ = boost::lexical_cast<unsigned int>(std::string(fuLockPollIntervalPtr));
88  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
89  << " us";
90  } catch (boost::bad_lexical_cast const&) {
91  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
92  }
93  }
94 
95  //override file service parameter if specified by environment
96  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
97  if (fileBrokerParamPtr) {
98  try {
99  useFileBroker_ = (boost::lexical_cast<unsigned int>(std::string(fileBrokerParamPtr))) > 0;
100  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
101  } catch (boost::bad_lexical_cast const&) {
102  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
103  }
104  }
105  if (useFileBroker_) {
107  //find BU data address from hltd configuration
109  struct stat buf;
110  if (stat("/etc/appliance/bus.config", &buf) == 0) {
111  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
112  std::getline(busconfig, fileBrokerHost_);
113  }
114  if (fileBrokerHost_.empty())
115  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
116  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
117  throw cms::Exception("EvFDaqDirector")
118  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
119 
120  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
121  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
122  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
123  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
124  }
125 
126  char* startFromLSPtr = std::getenv("FFF_STARTFROMLS");
127  if (startFromLSPtr) {
128  try {
129  startFromLS_ = boost::lexical_cast<unsigned int>(std::string(startFromLSPtr));
130  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
131  } catch (boost::bad_lexical_cast const&) {
132  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
133  }
134  }
135 
136  //override file service parameter if specified by environment
137  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
138  if (fileBrokerUseLockParamPtr) {
139  try {
140  fileBrokerUseLocalLock_ = (boost::lexical_cast<unsigned int>(std::string(fileBrokerUseLockParamPtr))) > 0;
141  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
143  } catch (boost::bad_lexical_cast const&) {
144  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
145  }
146  }
147  }
148 
150  std::stringstream ss;
151  ss << "run" << std::setfill('0') << std::setw(6) << run_;
152  run_string_ = ss.str();
153  ss = std::stringstream();
154  ss << run_;
155  run_nstring_ = ss.str();
156  run_dir_ = base_dir_ + "/" + run_string_;
157  input_throttled_file_ = run_dir_ + "/input_throttle";
158  ss = std::stringstream();
159  ss << getpid();
160  pid_ = ss.str();
161 
162  // check if base dir exists or create it accordingly
163  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
164  if (retval != 0 && errno != EEXIST) {
165  throw cms::Exception("DaqDirector")
166  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
167  }
168 
169  //create run dir in base dir
170  umask(0);
171  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
172  if (retval != 0 && errno != EEXIST) {
173  throw cms::Exception("DaqDirector")
174  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
175  }
176 
177  //create fu-local.lock in run open dir
178  if (!directorBU_) {
180  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
182  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
183  if (fulocal_rwlock_fd_ == -1)
184  throw cms::Exception("DaqDirector")
185  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
186  chmod(fulocal_lock_.c_str(), 0777);
187  fsync(fulocal_rwlock_fd_);
188  //open second fd for another input source thread
190  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
191  if (fulocal_rwlock_fd2_ == -1)
192  throw cms::Exception("DaqDirector")
193  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
194  }
195 
196  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
197  //for BU, it is created at this point
198  if (directorBU_) {
200  std::string bulockfile = bu_run_dir_ + "/bu.lock";
201  fulockfile_ = bu_run_dir_ + "/fu.lock";
202 
203  //make or find bu run dir
204  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
205  if (retval != 0 && errno != EEXIST) {
206  throw cms::Exception("DaqDirector")
207  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno) << "\n";
208  }
209  bu_run_open_dir_ = bu_run_dir_ + "/open";
210  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
211  if (retval != 0 && errno != EEXIST) {
212  throw cms::Exception("DaqDirector")
213  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno) << "\n";
214  }
215 
216  // the BU director does not need to know about the fu lock
217  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
218  if (bu_writelock_fd_ == -1)
219  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
220  else
221  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
222  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
223  if (bu_w_lock_stream == nullptr)
224  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
225 
226  // BU INITIALIZES LOCK FILE
227  // FU LOCK FILE OPEN
228  openFULockfileStream(true);
230  fflush(fu_rw_lock_stream);
231  close(fu_readwritelock_fd_);
232 
233  if (!hltSourceDirectory_.empty()) {
234  struct stat buf;
235  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
236  std::string hltdir = bu_run_dir_ + "/hlt";
237  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
238  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
239  if (retval != 0 && errno != EEXIST)
240  throw cms::Exception("DaqDirector")
241  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno) << "\n";
242 
243  std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
244  std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
245 
246  std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
247  for (auto& optfile : optfiles) {
248  try {
249  std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
250  } catch (...) {
251  }
252  }
253 
254  std::filesystem::rename(tmphltdir, hltdir);
255  } else
256  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
257  }
258  //else{}//no configuration specified
259  } else {
260  // for FU, check if bu base dir exists
261 
262  retval = mkdir(bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
263  if (retval != 0 && errno != EEXIST) {
264  throw cms::Exception("DaqDirector")
265  << " Error checking for bu base dir -: " << bu_base_dir_ << " mkdir error:" << strerror(errno) << "\n";
266  }
267 
269  fulockfile_ = bu_run_dir_ + "/fu.lock";
270  openFULockfileStream(false);
271  }
272 
273  pthread_mutex_init(&init_lock_, nullptr);
274 
275  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
276  std::stringstream sstp;
277  sstp << stopFilePath_ << "_pid" << pid_;
278  stopFilePathPid_ = sstp.str();
279 
280  if (!directorBU_) {
281  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
282  struct stat statbuf;
283  if (!stat(defPath.c_str(), &statbuf))
284  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
285  else {
286  //look in source directory if not present in ramdisk
287  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
288  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
289  if (stat(defPath.c_str(), &statbuf)) {
290  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
291  if (stat(defPath.c_str(), &statbuf)) {
292  defPath = defPathSuffix;
293  }
294  }
295  }
296  dpd_ = new DataPointDefinition();
297  std::string defLabel = "data";
298  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
299  }
300  }
301 
303  //close server connection
304  if (socket_.get() && socket_->is_open()) {
305  boost::system::error_code ec;
306  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
307  socket_->close(ec);
308  }
309 
310  if (fulocal_rwlock_fd_ != -1) {
311  unlockFULocal();
312  close(fulocal_rwlock_fd_);
313  }
314 
315  if (fulocal_rwlock_fd2_ != -1) {
316  unlockFULocal2();
317  close(fulocal_rwlock_fd2_);
318  }
319  }
320 
322  initRun();
323 
324  nThreads_ = bounds.maxNumberOfStreams();
325  nStreams_ = bounds.maxNumberOfThreads();
326  }
327 
330  desc.setComment(
331  "Service used for file locking arbitration and for propagating information between other EvF components");
332  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
333  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
334  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
335  desc.addUntracked<bool>("useFileBroker", false)
336  ->setComment("Use BU file service to grab input data instead of NFS file locking");
337  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
338  ->setComment("Allow service to discover BU address from hltd configuration");
339  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
340  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
341  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
342  ->setComment("Use keep alive to avoid using large number of sockets");
343  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
344  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
345  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
346  ->setComment("Lock polling interval in microseconds for the input directory file lock");
347  desc.addUntracked<bool>("outputAdler32Recheck", false)
348  ->setComment("Check Adler32 of per-process output files while micro-merging");
349  desc.addUntracked<bool>("requireTransfersPSet", false)
350  ->setComment("Require complete transferSystem PSet in the process configuration");
351  desc.addUntracked<std::string>("selectedTransferMode", "")
352  ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
353  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
354  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
355  desc.addUntracked<std::string>("mergingPset", "")
356  ->setComment("Name of merging PSet to look for merging type definitions for streams");
357  descriptions.add("EvFDaqDirector", desc);
358  }
359 
362  checkMergeTypePSet(pc);
363  }
364 
365  void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
366  //assert(run_ == id.run());
367 
368  // check if the requested run is the latest one - issue a warning if it isn't
370  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
371  << ". This is not the highest run " << dirManager_.findHighestRunDir();
372  }
373  }
374 
375  void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
376  close(bu_readlock_fd_);
377  close(bu_writelock_fd_);
378  if (directorBU_) {
379  std::string filename = bu_run_dir_ + "/bu.lock";
381  }
382  }
383 
385  //delete all files belonging to just closed lumi
386  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
388  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
389  return;
390  }
391 
392  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
393  auto it = filesToDeletePtr_->begin();
394  while (it != filesToDeletePtr_->end()) {
395  if (it->second->lumi_ == ls) {
396  it = filesToDeletePtr_->erase(it);
397  } else
398  it++;
399  }
400  }
401 
402  std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
404  }
405 
406  std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
408  }
409 
410  std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
411  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
412  }
413 
414  std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
415  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
416  }
417 
418  std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
420  }
421 
424  }
425 
428  }
429 
432  }
433 
436  }
437 
440  }
441 
443  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
444  }
445 
448  }
449 
451  std::string const& stream) const {
453  }
454 
456  std::string const& stream) const {
458  }
459 
461  std::string const& stream) const {
463  }
464 
467  }
468 
471  }
472 
475  }
476 
478  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
479  }
480 
482  return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
483  }
484 
486  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
487  }
488 
490 
492 
493  std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
494 
496  int retval = remove(filename.c_str());
497  if (retval != 0)
498  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
499  << ". error = " << strerror(errno);
500  }
501 
502  void EvFDaqDirector::removeFile(unsigned int ls, unsigned int index) { removeFile(getRawFilePath(ls, index)); }
503 
505  std::string& nextFile,
506  uint32_t& fsize,
507  uint16_t& rawHeaderSize,
508  uint64_t& lockWaitTime,
509  bool& setExceptionState) {
510  EvFDaqDirector::FileStatus fileStatus = noFile;
511  rawHeaderSize = 0;
512 
513  int retval = -1;
514  int lock_attempts = 0;
515  long total_lock_attempts = 0;
516 
517  struct stat buf;
518  int stopFileLS = -1;
519  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
520  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
521  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
522  if (stopFileCheck == 0)
523  stopFileLS = readLastLSEntry(stopFilePath_);
524  else
525  stopFileLS = 1; //stop without drain if only pid is stopped
526  if (!stop_ls_override_) {
527  //if lumisection is higher than in stop file, should quit at next from current
528  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
529  stopFileLS = stop_ls_override_ = ls;
530  } else
531  stopFileLS = stop_ls_override_;
532  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
533  << stopFileLS;
534  //return runEnded;
535  } else //if file was removed before reaching stop condition, reset this
536  stop_ls_override_ = 0;
537 
538  timeval ts_lockbegin;
539  gettimeofday(&ts_lockbegin, nullptr);
540 
541  while (retval == -1) {
542  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
543  if (retval == -1)
544  usleep(fuLockPollInterval_);
545  else
546  continue;
547 
548  lock_attempts += fuLockPollInterval_;
549  total_lock_attempts += fuLockPollInterval_;
550  if (lock_attempts > 5000000 || errno == 116) {
551  if (errno == 116)
552  edm::LogWarning("EvFDaqDirector")
553  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
554  else
555  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
556  "fu.lock file are present -: errno "
557  << errno << ":" << strerror(errno) << std::endl;
558 
559  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
560  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
561  ls++;
562  return noFile;
563  }
564 
565  if (stat(bu_run_dir_.c_str(), &buf) != 0)
566  return runEnded;
567  if (stat(fulockfile_.c_str(), &buf) != 0)
568  return runEnded;
569 
570  lock_attempts = 0;
571  }
572  if (total_lock_attempts > 5 * 60000000) {
573  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
574  return runAbort;
575  }
576  }
577 
578  timeval ts_lockend;
579  gettimeofday(&ts_lockend, nullptr);
580  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
581  if (deltat > 0.)
582  lockWaitTime = deltat;
583 
584  if (retval != 0)
585  return fileStatus;
586 
587 #ifdef DEBUG
588  timeval ts_lockend;
589  gettimeofday(&ts_lockend, 0);
590 #endif
591 
592  //open another lock file FD after the lock using main fd has been acquired
593  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
594  if (fu_readwritelock_fd2 == -1)
595  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
596  << " create. error:" << strerror(errno);
597 
598  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
599 
600  // if the stream is readable
601  if (fu_rw_lock_stream2 != nullptr) {
602  unsigned int readLs, readIndex;
603  int check = 0;
604  // rewind the stream
605  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
606  // if rewinded ok
607  if (check == 0) {
608  // read its' values
609  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
610  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
611 
612  unsigned int currentLs = readLs;
613  bool bumpedOk = false;
614  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
615  //no lock file write in this case
616  if (ls && ls + 1 < currentLs)
617  ls++;
618  else {
619  // try to bump (look for new index or EoLS file)
620  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
621  //avoid 2 lumisections jump
622  if (ls && readLs > currentLs && currentLs > ls) {
623  ls++;
624  readLs = currentLs = ls;
625  readIndex = 0;
626  bumpedOk = false;
627  //no write to lock file
628  } else {
629  if (ls == 0 && readLs > currentLs) {
630  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
631  //in this case there is no new file in the same LS
632  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
633  readLs = currentLs;
634  readIndex = 0;
635  bumpedOk = false;
636  //no write to lock file
637  }
638  //update return LS value
639  ls = readLs;
640  }
641  }
642  if (bumpedOk) {
643  // there is a new index file to grab, lock file needs to be updated
644  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
645  if (check == 0) {
646  ftruncate(fu_readwritelock_fd2, 0);
647  // write next index in the file, which is the file the next process should take
648  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
649  fflush(fu_rw_lock_stream2);
650  fsync(fu_readwritelock_fd2);
651  fileStatus = newFile;
652  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
653  } else {
654  edm::LogError("EvFDaqDirector")
655  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
656  setExceptionState = true;
657  return noFile;
658  }
659  } else if (currentLs < readLs) {
660  //there is no new file in next LS (yet), but lock file can be updated to the next LS
661  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
662  if (check == 0) {
663  ftruncate(fu_readwritelock_fd2, 0);
664  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
665  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
666  fflush(fu_rw_lock_stream2);
667  fsync(fu_readwritelock_fd2);
668  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
669  } else {
670  edm::LogError("EvFDaqDirector")
671  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
672  setExceptionState = true;
673  return noFile;
674  }
675  }
676  } else {
677  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
678  << strerror(errno);
679  }
680  } else {
681  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
682  }
683  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
684 
685 #ifdef DEBUG
686  timeval ts_preunlock;
687  gettimeofday(&ts_preunlock, 0);
688  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
689  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
690 #endif
691 
692  //if new json is present, lock file which FedRawDataInputSource will later unlock
693  if (fileStatus == newFile)
694  lockFULocal();
695 
696  //release lock at this point
697  int retvalu = -1;
698  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
699  if (retvalu == -1)
700  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
701 
702 #ifdef DEBUG
703  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
704 #endif
705 
706  if (fileStatus == noFile) {
707  struct stat buf;
708  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
709  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
710  fileStatus = runEnded;
711  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
712  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
713  fileStatus = runEnded;
714  }
715  }
716  return fileStatus;
717  }
718 
720  std::ifstream ij(BUEoLSFile);
721  Json::Value deserializeRoot;
723 
724  if (!reader.parse(ij, deserializeRoot)) {
725  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
726  return -1;
727  }
728 
730  DataPoint dp;
731  dp.deserialize(deserializeRoot);
732 
733  //read definition
734  if (readEolsDefinition_) {
735  //std::string def = boost::algorithm::trim(dp.getDefinition());
736  std::string def = dp.getDefinition();
737  if (def.empty())
738  readEolsDefinition_ = false;
739  while (!def.empty()) {
741  if (def.find('/') == 0)
742  fullpath = def;
743  else
744  fullpath = bu_run_dir_ + '/' + def;
745  struct stat buf;
746  if (stat(fullpath.c_str(), &buf) == 0) {
747  DataPointDefinition eolsDpd;
748  std::string defLabel = "legend";
749  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
750  if (eolsDpd.getNames().empty()) {
751  //try with "data" label if "legend" format is not used
752  eolsDpd = DataPointDefinition();
753  defLabel = "data";
754  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
755  }
756  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
757  if (eolsDpd.getNames().at(i) == "NFiles")
759  readEolsDefinition_ = false;
760  break;
761  }
762  //check if we can still find definition
763  if (def.size() <= 1 || def.find('/') == std::string::npos) {
764  readEolsDefinition_ = false;
765  break;
766  }
767  def = def.substr(def.find('/') + 1);
768  }
769  }
770 
771  if (dp.getData().size() > eolsNFilesIndex_)
772  data = dp.getData()[eolsNFilesIndex_];
773  else {
774  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
775  return -1;
776  }
777  return boost::lexical_cast<int>(data);
778  }
779 
780  bool EvFDaqDirector::bumpFile(unsigned int& ls,
781  unsigned int& index,
782  std::string& nextFile,
783  uint32_t& fsize,
784  uint16_t& rawHeaderSize,
785  int maxLS,
786  bool& setExceptionState) {
787  if (previousFileSize_ != 0) {
788  if (!fms_) {
790  }
791  if (fms_)
793  previousFileSize_ = 0;
794  }
795  nextFile = "";
796 
797  //reached limit
798  if (maxLS >= 0 && ls > (unsigned int)maxLS)
799  return false;
800 
801  struct stat buf;
802  std::stringstream ss;
803  unsigned int nextIndex = index;
804  nextIndex++;
805 
806  // 1. Check suggested file
807  std::string nextFileJson = getInputJsonFilePath(ls, index);
808  if (stat(nextFileJson.c_str(), &buf) == 0) {
809  fsize = previousFileSize_ = buf.st_size;
810  nextFile = nextFileJson;
811  return true;
812  }
813  // 2. No file -> lumi ended? (and how many?)
814  else {
815  // 3. No file -> check for standalone raw file
816  std::string nextFileRaw = getRawFilePath(ls, index);
817  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
818  fsize = previousFileSize_ = buf.st_size;
819  nextFile = nextFileRaw;
820  return true;
821  }
822 
823  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
824 
825  if (stat(BUEoLSFile.c_str(), &buf) == 0) {
826  // recheck that no raw file appeared in the meantime
827  if (stat(nextFileJson.c_str(), &buf) == 0) {
828  fsize = previousFileSize_ = buf.st_size;
829  nextFile = nextFileJson;
830  return true;
831  }
832  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
833  fsize = previousFileSize_ = buf.st_size;
834  nextFile = nextFileRaw;
835  return true;
836  }
837 
838  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
839  if (indexFilesInLS < 0)
840  //parsing failed
841  return false;
842  else {
843  //check index
844  if ((int)index < indexFilesInLS) {
845  //we have 2 files, and check for 1 failed... retry (2 will never be here)
846  edm::LogError("EvFDaqDirector")
847  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
848  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
849  setExceptionState = true;
850  return false;
851  }
852  }
853  // this lumi ended, check for files
854  ++ls;
855  index = 0;
856 
857  //reached limit
858  if (maxLS >= 0 && ls > (unsigned int)maxLS)
859  return false;
860 
861  nextFileJson = getInputJsonFilePath(ls, 0);
862  nextFileRaw = getRawFilePath(ls, 0);
863  if (stat(nextFileJson.c_str(), &buf) == 0) {
864  // a new file was found at new lumisection, index 0
865  fsize = previousFileSize_ = buf.st_size;
866  nextFile = nextFileJson;
867  return true;
868  }
869  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
870  fsize = previousFileSize_ = buf.st_size;
871  nextFile = nextFileRaw;
872  return true;
873  }
874  return false;
875  }
876  }
877  // no new file found
878  return false;
879  }
880 
882  if (fu_rw_lock_stream == nullptr)
883  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
884  else {
885  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
886  unsigned int readLs = 1, readIndex = 0;
887  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
888  }
889  }
890 
892  if (create) {
894  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
895  chmod(fulockfile_.c_str(), 0766);
896  } else {
897  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
898  }
899  if (fu_readwritelock_fd_ == -1)
900  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
901  << " create:" << create << " error:" << strerror(errno);
902  else
903  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
904 
905  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
906  if (fu_rw_lock_stream == nullptr)
907  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
908  }
909 
910  void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
911 
912  void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
913 
915  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
916  flock(fulocal_rwlock_fd_, LOCK_SH);
917  }
918 
920  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
921  flock(fulocal_rwlock_fd_, LOCK_UN);
922  }
923 
925  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
926  flock(fulocal_rwlock_fd2_, LOCK_EX);
927  }
928 
930  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
931  flock(fulocal_rwlock_fd2_, LOCK_UN);
932  }
933 
934  void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
935  //used for backpressure mechanisms and monitoring
936  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
937  struct stat buf;
938  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
939  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
940  close(bol_fd);
941  }
942  }
943 
944  void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
945  const uint32_t currentLumiSection,
946  bool doCreateBoLS,
947  bool doCreateEoLS) {
948  if (currentLumiSection > 0) {
949  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
950  struct stat buf;
951  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
952  if (!found) {
953  if (doCreateEoLS) {
954  int eol_fd =
955  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
956  close(eol_fd);
957  }
958  if (doCreateBoLS)
959  createBoLSFile(lumiSection, false);
960  }
961  } else if (doCreateBoLS) {
962  createBoLSFile(lumiSection, true); //needed for initial lumisection
963  }
964  }
965 
967  int& rawFd,
968  uint16_t& rawHeaderSize,
969  uint32_t& lsFromHeader,
970  int32_t& eventsFromHeader,
971  int64_t& fileSizeFromHeader,
972  bool requireHeader,
973  bool retry,
974  bool closeFile) {
975  int infile;
976 
977  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
978  if (retry) {
979  edm::LogWarning("EvFDaqDirector")
980  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
981  return parseFRDFileHeader(rawSourcePath,
982  rawFd,
983  rawHeaderSize,
984  lsFromHeader,
985  eventsFromHeader,
986  fileSizeFromHeader,
987  requireHeader,
988  false,
989  closeFile);
990  } else {
991  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
992  edm::LogError("EvFDaqDirector")
993  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
994  if (errno == ENOENT)
995  return 1; // error && file not found
996  else
997  return -1;
998  }
999  }
1000  }
1001 
1002  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
1003  FRDFileHeader_v1 fileHead;
1004 
1005  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1006  if (closeFile) {
1007  close(infile);
1008  infile = -1;
1009  }
1010 
1011  if (sz_read < 0) {
1012  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - unable to read " << rawSourcePath << " : "
1013  << strerror(errno);
1014  if (infile != -1)
1015  close(infile);
1016  return -1;
1017  }
1018  if ((size_t)sz_read < buf_sz) {
1019  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - file smaller than header: " << rawSourcePath;
1020  if (infile != -1)
1021  close(infile);
1022  return -1;
1023  }
1024 
1025  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1026 
1027  if (frd_version == 0) {
1028  //no header (specific sequence not detected)
1029  if (requireHeader) {
1030  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1031  if (infile != -1)
1032  close(infile);
1033  return -1;
1034  } else {
1035  //no header, but valid file
1036  lseek(infile, 0, SEEK_SET);
1037  rawHeaderSize = 0;
1038  lsFromHeader = 0;
1039  eventsFromHeader = -1;
1040  fileSizeFromHeader = -1;
1041  }
1042  } else {
1043  //version 1 header
1044  uint32_t headerSizeRaw = fileHead.headerSize_;
1045  if (headerSizeRaw < buf_sz) {
1046  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1047  << " v:" << frd_version;
1048  if (infile != -1)
1049  close(infile);
1050  return -1;
1051  }
1052  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1053  lsFromHeader = fileHead.lumiSection_;
1054  eventsFromHeader = (int32_t)fileHead.eventCount_;
1055  fileSizeFromHeader = (int64_t)fileHead.fileSize_;
1056  rawHeaderSize = fileHead.headerSize_;
1057  }
1058  rawFd = infile;
1059  return 0; //OK
1060  }
1061 
1062  bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
1063  int infile;
1064  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1065  edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1066  << strerror(errno);
1067  return false;
1068  }
1069  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
1070  FRDFileHeader_v1 fileHead;
1071 
1072  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1073 
1074  if (sz_read < 0) {
1075  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << rawSourcePath << " : "
1076  << strerror(errno);
1077  if (infile != -1)
1078  close(infile);
1079  return false;
1080  }
1081  if ((size_t)sz_read < buf_sz) {
1082  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << rawSourcePath;
1083  if (infile != -1)
1084  close(infile);
1085  return false;
1086  }
1087 
1088  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1089 
1090  close(infile);
1091 
1092  if (frd_version > 0) {
1093  rawHeaderSize = fileHead.headerSize_;
1094  return true;
1095  }
1096 
1097  rawHeaderSize = 0;
1098  return false;
1099  }
1100 
1102  int& rawFd,
1103  uint16_t& rawHeaderSize,
1104  int64_t& fileSizeFromHeader,
1105  bool& fileFound,
1106  uint32_t serverLS,
1107  bool closeFile) {
1108  fileFound = true;
1109 
1110  //take only first three tokens delimited by "_" in the renamed raw file name
1111  std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1112  size_t pos = 0, n_tokens = 0;
1113  while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1114  }
1115  std::string reducedJsonStem = jsonStem.substr(0, pos);
1116 
1117  std::ostringstream fileNameWithPID;
1118  //should be ported to use fffnaming
1119  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1120 
1121  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1122 
1123  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1124 
1125  //parse RAW file header if it exists
1126  uint32_t lsFromRaw;
1127  int32_t nbEventsWrittenRaw;
1128  int64_t fileSizeFromRaw;
1129  auto ret = parseFRDFileHeader(
1130  rawSourcePath, rawFd, rawHeaderSize, lsFromRaw, nbEventsWrittenRaw, fileSizeFromRaw, true, true, closeFile);
1131  if (ret != 0) {
1132  if (ret == 1)
1133  fileFound = false;
1134  return -1;
1135  }
1136 
1137  int outfile;
1138  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1139  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1140  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1141  if (errno == EEXIST) {
1142  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1143  << " : ";
1144  return -1;
1145  }
1146  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1147  << strerror(errno);
1148  struct stat out_stat;
1149  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1150  edm::LogWarning("EvFDaqDirector")
1151  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1152  << jsonDestPath;
1153  if (unlink(jsonDestPath.c_str()) == -1) {
1154  edm::LogWarning("EvFDaqDirector")
1155  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1156  }
1157  }
1158  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1159  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1160  << jsonDestPath << " : " << strerror(errno);
1161  return -1;
1162  }
1163  }
1164  //write JSON file (TODO: use jsoncpp)
1165  std::stringstream ss;
1166  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1167  std::string sstr = ss.str();
1168 
1169  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1170  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1171  << " : " << strerror(errno);
1172  return -1;
1173  }
1174  close(outfile);
1175  if (serverLS && serverLS != lsFromRaw)
1176  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1177  << " and raw file header LS " << lsFromRaw;
1178 
1179  fileSizeFromHeader = fileSizeFromRaw;
1180  return nbEventsWrittenRaw;
1181  }
1182 
1184  std::string const& rawSourcePath,
1185  int64_t& fileSizeFromJson,
1186  bool& fileFound) {
1187  fileFound = true;
1188 
1189  //should be ported to use fffnaming
1190  std::ostringstream fileNameWithPID;
1191  fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1192  << std::setw(5) << pid_ << ".jsn";
1193 
1194  // assemble json destination path
1195  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1196 
1197  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1198 
1199  int infile = -1, outfile = -1;
1200 
1201  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1202  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1203  << strerror(errno);
1204  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1205  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1206  << jsonSourcePath << " : " << strerror(errno);
1207  if (errno == ENOENT)
1208  fileFound = false;
1209  return -1;
1210  }
1211  }
1212 
1213  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1214  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1215  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1216  if (errno == EEXIST) {
1217  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1218  << " : ";
1219  ::close(infile);
1220  return -1;
1221  }
1222  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1223  << strerror(errno);
1224  struct stat out_stat;
1225  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1226  edm::LogWarning("EvFDaqDirector")
1227  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1228  if (unlink(jsonDestPath.c_str()) == -1) {
1229  edm::LogWarning("EvFDaqDirector")
1230  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1231  }
1232  }
1233  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1234  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1235  << jsonDestPath << " : " << strerror(errno);
1236  ::close(infile);
1237  return -1;
1238  }
1239  }
1240  //copy contents
1241  const std::size_t buf_sz = 512;
1242  std::size_t tot_written = 0;
1243  std::unique_ptr<char> buf(new char[buf_sz]);
1244 
1245  ssize_t sz, sz_read = 1, sz_write;
1246  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1247  sz_write = 0;
1248  do {
1249  assert(sz_read - sz_write > 0);
1250  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1251  sz_read = sz; // cause read loop termination
1252  break;
1253  }
1254  assert(sz > 0);
1255  sz_write += sz;
1256  tot_written += sz;
1257  } while (sz_write < sz_read);
1258  }
1259  close(infile);
1260  close(outfile);
1261 
1262  if (tot_written > 0) {
1263  //leave file if it was empty for diagnosis
1264  if (unlink(jsonSourcePath.c_str()) == -1) {
1265  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1266  << strerror(errno);
1267  return -1;
1268  }
1269  } else {
1270  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1271  << jsonSourcePath;
1272  return -1;
1273  }
1274 
1275  Json::Value deserializeRoot;
1277 
1278  std::string data;
1279  std::stringstream ss;
1280  bool result;
1281  try {
1282  if (tot_written <= buf_sz) {
1283  result = reader.parse(buf.get(), deserializeRoot);
1284  } else {
1285  //json will normally not be bigger than buf_sz bytes
1286  try {
1287  std::ifstream ij(jsonDestPath);
1288  ss << ij.rdbuf();
1289  } catch (std::filesystem::filesystem_error const& ex) {
1290  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1291  return -1;
1292  }
1293  result = reader.parse(ss.str(), deserializeRoot);
1294  }
1295  if (!result) {
1296  if (tot_written <= buf_sz)
1297  ss << buf.get();
1298  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1299  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1300  << ss.str() << ".";
1301  return -1;
1302  }
1303 
1304  //read BU JSON
1305  DataPoint dp;
1306  dp.deserialize(deserializeRoot);
1307  bool success = false;
1308  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1309  if (dpd_->getNames().at(i) == "NEvents")
1310  if (i < dp.getData().size()) {
1311  data = dp.getData()[i];
1312  success = true;
1313  break;
1314  }
1315  }
1316  if (!success) {
1317  if (!dp.getData().empty())
1318  data = dp.getData()[0];
1319  else {
1320  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1321  << "grabNextJsonFile - "
1322  << " error reading number of events from BU JSON; No input value. data -: " << data;
1323  return -1;
1324  }
1325  }
1326 
1327  //try to read raw file size
1328  fileSizeFromJson = -1;
1329  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1330  if (dpd_->getNames().at(i) == "NBytes") {
1331  if (i < dp.getData().size()) {
1332  std::string dataSize = dp.getData()[i];
1333  try {
1334  fileSizeFromJson = boost::lexical_cast<long>(dataSize);
1335  } catch (boost::bad_lexical_cast const&) {
1336  //non-fatal currently, processing can continue without this value
1337  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1338  << "Input value is -: " << dataSize;
1339  }
1340  break;
1341  }
1342  }
1343  }
1344  return boost::lexical_cast<int>(data);
1345  } catch (boost::bad_lexical_cast const& e) {
1346  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1347  << "Input value is -: " << data;
1348  } catch (std::runtime_error const& e) {
1349  //Can be thrown by Json parser
1350  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1351  }
1352 
1353  catch (std::exception const& e) {
1354  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1355  } catch (...) {
1356  //unknown exception
1357  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1358  }
1359 
1360  return -1;
1361  }
1362 
1364  std::string data;
1365  try {
1366  // assemble json destination path
1367  std::filesystem::path jsonDestPath(baseRunDir());
1368 
1369  //should be ported to use fffnaming
1370  std::ostringstream fileNameWithPID;
1371  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1372  << ".jsn";
1373  jsonDestPath /= fileNameWithPID.str();
1374 
1375  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1376  try {
1377  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1378  } catch (std::filesystem::filesystem_error const& ex) {
1379  // Input dir gone?
1380  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1381  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1382  sleep(1);
1383  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1384  }
1385  unlockFULocal();
1386 
1387  try {
1388  //sometimes this fails but file gets deleted
1389  std::filesystem::remove(jsonSourcePath);
1390  } catch (std::filesystem::filesystem_error const& ex) {
1391  // Input dir gone?
1392  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1393  } catch (std::exception const& ex) {
1394  // Input dir gone?
1395  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1396  }
1397 
1398  std::ifstream ij(jsonDestPath);
1399  Json::Value deserializeRoot;
1401 
1402  std::stringstream ss;
1403  ss << ij.rdbuf();
1404  if (!reader.parse(ss.str(), deserializeRoot)) {
1405  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1406  << "\nERROR:\n"
1407  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1408  << ss.str() << ".";
1409  throw std::runtime_error("Cannot deserialize input JSON file");
1410  }
1411 
1412  //read BU JSON
1413  std::string data;
1414  DataPoint dp;
1415  dp.deserialize(deserializeRoot);
1416  bool success = false;
1417  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1418  if (dpd_->getNames().at(i) == "NEvents")
1419  if (i < dp.getData().size()) {
1420  data = dp.getData()[i];
1421  success = true;
1422  }
1423  }
1424  if (!success) {
1425  if (!dp.getData().empty())
1426  data = dp.getData()[0];
1427  else
1428  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1429  << " error reading number of events from BU JSON -: No input value " << data;
1430  }
1431  return boost::lexical_cast<int>(data);
1432  } catch (std::filesystem::filesystem_error const& ex) {
1433  // Input dir gone?
1434  unlockFULocal();
1435  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1436  } catch (std::runtime_error const& e) {
1437  // Another process grabbed the file and NFS did not register this
1438  unlockFULocal();
1439  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1440  } catch (boost::bad_lexical_cast const&) {
1441  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1442  << "Input value is -: " << data;
1443  } catch (std::exception const& e) {
1444  // BU run directory disappeared?
1445  unlockFULocal();
1446  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1447  }
1448 
1449  return -1;
1450  }
1451 
1453  bool& serverError,
1454  uint32_t& serverLS,
1455  uint32_t& closedServerLS,
1456  std::string& nextFileJson,
1457  std::string& nextFileRaw,
1458  bool& rawHeader,
1459  int maxLS) {
1460  EvFDaqDirector::FileStatus fileStatus = noFile;
1461  serverError = false;
1462 
1463  boost::system::error_code ec;
1464  try {
1465  while (true) {
1466  //socket connect
1467  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1469 
1470  if (ec) {
1471  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1472  serverError = true;
1473  break;
1474  }
1475  }
1476 
1477  boost::asio::streambuf request;
1478  std::ostream request_stream(&request);
1479  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1480  if (maxLS >= 0) {
1481  std::stringstream spath;
1482  spath << path << "&stopls=" << maxLS;
1483  path = spath.str();
1484  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1485  }
1486  request_stream << "GET " << path << " HTTP/1.1\r\n";
1487  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1488  request_stream << "Accept: */*\r\n";
1489  request_stream << "Connection: keep-alive\r\n\r\n";
1490 
1491  boost::asio::write(*socket_, request, ec);
1492  if (ec) {
1493  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1494  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1495  //we got disconnected, try to reconnect to the server before writing the request
1497  if (ec) {
1498  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1499  serverError = true;
1500  break;
1501  }
1502  continue;
1503  }
1504  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1505  serverError = true;
1506  break;
1507  }
1508 
1509  boost::asio::streambuf response;
1510  boost::asio::read_until(*socket_, response, "\r\n", ec);
1511  if (ec) {
1512  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1513  serverError = true;
1514  break;
1515  }
1516 
1517  std::istream response_stream(&response);
1518 
1519  std::string http_version;
1520  response_stream >> http_version;
1521 
1522  response_stream >> serverHttpStatus;
1523 
1524  std::string status_message;
1525  std::getline(response_stream, status_message);
1526  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1527  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1528  serverError = true;
1529  break;
1530  }
1531  if (serverHttpStatus != 200) {
1532  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1533  serverError = true;
1534  break;
1535  }
1536 
1537  // Process the response headers.
1539  while (std::getline(response_stream, header) && header != "\r") {
1540  }
1541 
1542  std::string fileInfo;
1543  std::map<std::string, std::string> serverMap;
1544  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1545  auto pos = fileInfo.find('=');
1546  if (pos == std::string::npos)
1547  continue;
1548  auto stitle = fileInfo.substr(0, pos);
1549  auto svalue = fileInfo.substr(pos + 1);
1550  serverMap[stitle] = svalue;
1551  }
1552 
1553  //check that response run number if correct
1554  auto server_version = serverMap.find("version");
1555  assert(server_version != serverMap.end());
1556 
1557  auto server_run = serverMap.find("runnumber");
1558  assert(server_run != serverMap.end());
1559  assert(run_nstring_ == server_run->second);
1560 
1561  auto server_state = serverMap.find("state");
1562  assert(server_state != serverMap.end());
1563 
1564  auto server_eols = serverMap.find("lasteols");
1565  assert(server_eols != serverMap.end());
1566 
1567  auto server_ls = serverMap.find("lumisection");
1568 
1569  int version_maj = 1;
1570  int version_min = 0;
1571  int version_rev = 0;
1572  {
1573  auto* s_ptr = server_version->second.c_str();
1574  if (!server_version->second.empty() && server_version->second[0] == '"')
1575  s_ptr++;
1576  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1577  if (res < 3) {
1578  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1579  if (res < 2) {
1580  res = sscanf(s_ptr, "%d", &version_maj);
1581  if (res < 1) {
1582  //expecting at least 1 number (major version)
1583  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1584  }
1585  }
1586  }
1587  }
1588 
1589  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1590  if (server_ls != serverMap.end())
1591  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1592  else
1593  serverLS = closedServerLS + 1;
1594 
1595  std::string s_state = server_state->second;
1596  if (s_state == "STARTING") //initial, always empty starting with LS 1
1597  {
1598  auto server_file = serverMap.find("file");
1599  assert(server_file == serverMap.end()); //no file with starting state
1600  fileStatus = noFile;
1601  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1602  } else if (s_state == "READY") {
1603  auto server_file = serverMap.find("file");
1604  if (server_file == serverMap.end()) {
1605  //can be returned by server if files from new LS already appeared but LS is not yet closed
1606  if (serverLS <= closedServerLS)
1607  serverLS = closedServerLS + 1;
1608  fileStatus = noFile;
1609  edm::LogInfo("EvFDaqDirector")
1610  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1611  } else {
1612  std::string filestem;
1613  std::string fileprefix;
1614  auto server_fileprefix = serverMap.find("fileprefix");
1615 
1616  if (server_fileprefix != serverMap.end()) {
1617  auto pssize = server_fileprefix->second.size();
1618  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1619  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1620  else
1621  fileprefix = server_fileprefix->second;
1622  }
1623 
1624  //remove string literals
1625  auto ssize = server_file->second.size();
1626  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1627  filestem = server_file->second.substr(1, ssize - 2);
1628  else
1629  filestem = server_file->second;
1630  assert(!filestem.empty());
1631  if (version_maj > 1) {
1632  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1633  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1634  nextFileJson = "";
1635  rawHeader = true;
1636  } else {
1637  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1638  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1639  nextFileJson = filestem + ".jsn";
1640  rawHeader = false;
1641  }
1642  fileStatus = newFile;
1643  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1644  << serverLS << " file:" << filestem;
1645  }
1646  } else if (s_state == "EOLS") {
1647  serverLS = closedServerLS + 1;
1648  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1649  fileStatus = noFile;
1650  } else if (s_state == "EOR") {
1651  //server_eor = serverMap.find("iseor");
1652  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1653  fileStatus = runEnded;
1654  } else if (s_state == "NORUN") {
1655  auto err_msg = serverMap.find("errormessage");
1656  if (err_msg != serverMap.end())
1657  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1658  else
1659  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1660  edm::LogWarning("EvFDaqDirector") << "executing run end";
1661  fileStatus = runEnded;
1662  } else if (s_state == "ERROR") {
1663  auto err_msg = serverMap.find("errormessage");
1664  if (err_msg != serverMap.end())
1665  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1666  else
1667  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1668  fileStatus = noFile;
1669  serverError = true;
1670  } else {
1671  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1672  fileStatus = noFile;
1673  serverError = true;
1674  }
1675 
1676  // Read until EOF, writing data to output as we go.
1677  if (!fileBrokerKeepAlive_) {
1678  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1679  }
1680  if (ec != boost::asio::error::eof) {
1681  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1682  serverError = true;
1683  }
1684  }
1685 
1686  break;
1687  }
1688 
1689  } catch (std::exception const& e) {
1690  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1691  serverError = true;
1692  }
1693 
1694  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1695  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1696  if (ec) {
1697  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1698  }
1699  socket_->close(ec);
1700  if (ec) {
1701  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1702  }
1703  }
1704 
1705  if (serverError) {
1706  if (socket_->is_open())
1707  socket_->close(ec);
1708  if (ec) {
1709  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1710  }
1711  fileStatus = noFile;
1712  sleep(1); //back-off if error detected
1713  }
1714 
1715  return fileStatus;
1716  }
1717 
1719  unsigned int& ls,
1720  std::string& nextFileRaw,
1721  int& rawFd,
1722  uint16_t& rawHeaderSize,
1723  int32_t& serverEventsInNewFile,
1724  int64_t& fileSizeFromMetadata,
1725  uint64_t& thisLockWaitTimeUs) {
1726  EvFDaqDirector::FileStatus fileStatus = noFile;
1727 
1728  //int retval = -1;
1729  //int lock_attempts = 0;
1730  //long total_lock_attempts = 0;
1731 
1732  struct stat buf;
1733  int stopFileLS = -1;
1734  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1735  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1736  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1737  if (stopFileCheck == 0)
1738  stopFileLS = readLastLSEntry(stopFilePath_);
1739  else
1740  stopFileLS = 1; //stop without drain if only pid is stopped
1741  if (!stop_ls_override_) {
1742  //if lumisection is higher than in stop file, should quit at next from current
1743  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1744  stopFileLS = stop_ls_override_ = ls;
1745  } else
1746  stopFileLS = stop_ls_override_;
1747  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1748  << stopFileLS;
1749  //return runEnded;
1750  } else //if file was removed before reaching stop condition, reset this
1751  stop_ls_override_ = 0;
1752 
1753  /* look for EoLS
1754  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1755  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1756  ls++;
1757  return noFile;
1758  }
1759  */
1760 
1761  timeval ts_lockbegin;
1762  gettimeofday(&ts_lockbegin, nullptr);
1763 
1764  std::string nextFileJson;
1765  uint32_t serverLS, closedServerLS;
1766  unsigned int serverHttpStatus;
1767  bool serverError;
1768 
1769  //local lock to force index json and EoLS files to appear in order
1771  lockFULocal();
1772 
1773  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1774  bool rawHeader = false;
1775  fileStatus = contactFileBroker(
1776  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1777 
1778  if (serverError) {
1779  //do not update anything
1781  unlockFULocal();
1782  return noFile;
1783  }
1784 
1785  //handle creation of BoLS files if lumisection has changed
1786  if (currentLumiSection == 0) {
1787  if (fileStatus == runEnded)
1788  createLumiSectionFiles(closedServerLS, 0, true, false);
1789  else
1790  createLumiSectionFiles(serverLS, 0, true, false);
1791  } else {
1792  if (closedServerLS >= currentLumiSection) {
1793  //only BoLS files
1794  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1795  createLumiSectionFiles(i + 1, i, true, false);
1796  }
1797  }
1798 
1799  bool fileFound = true;
1800 
1801  if (fileStatus == newFile) {
1802  if (rawHeader > 0)
1803  serverEventsInNewFile =
1804  grabNextJsonFromRaw(nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false);
1805  else
1806  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1807  }
1808  //closing file in case of any error
1809  if (serverEventsInNewFile < 0 && rawFd != -1) {
1810  close(rawFd);
1811  rawFd = -1;
1812  }
1813 
1814  //can unlock because all files have been created locally
1816  unlockFULocal();
1817 
1818  if (!fileFound) {
1819  //catch condition where directory got deleted
1820  fileStatus = noFile;
1821  struct stat buf;
1822  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1823  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1824  fileStatus = runEnded;
1825  }
1826  }
1827 
1828  //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1829  //so that EoLS files can not appear locally before index files
1830  if (currentLumiSection == 0) {
1831  lockFULocal2();
1832  if (fileStatus == runEnded) {
1833  createLumiSectionFiles(closedServerLS, 0, false, true);
1834  createLumiSectionFiles(serverLS, closedServerLS, false, true); // +1
1835  } else {
1836  createLumiSectionFiles(serverLS, 0, false, true);
1837  }
1838  unlockFULocal2();
1839  } else {
1840  if (closedServerLS >= currentLumiSection) {
1841  //lock exclusive to create EoLS files
1842  lockFULocal2();
1843  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1844  createLumiSectionFiles(i + 1, i, false, true);
1845  unlockFULocal2();
1846  }
1847  }
1848 
1849  if (fileStatus == runEnded)
1850  ls = std::max(currentLumiSection, serverLS);
1851  else if (fileStatus == newFile) {
1852  assert(serverLS >= ls);
1853  ls = serverLS;
1854  } else if (fileStatus == noFile) {
1855  if (serverLS >= ls)
1856  ls = serverLS;
1857  else {
1858  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1859  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1860  sleep(1);
1861  }
1862  }
1863 
1864  return fileStatus;
1865  }
1866 
1868  // create open dir if not already there
1869 
1871  if (!std::filesystem::is_directory(openPath)) {
1872  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1873  std::filesystem::create_directories(openPath);
1874  }
1875  }
1876 
1878  std::ifstream ij(file);
1879  Json::Value deserializeRoot;
1881 
1882  if (!reader.parse(ij, deserializeRoot)) {
1883  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1884  return -1;
1885  }
1886 
1887  int ret = deserializeRoot.get("lastLS", "").asInt();
1888  return ret;
1889  }
1890 
1892  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1894  struct stat buf;
1895  unsigned int lscount = startFromLS_;
1896  do {
1897  std::stringstream ss;
1898  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1899  fullpath = ss.str();
1900  lscount++;
1901  } while (stat(fullpath.c_str(), &buf) == 0);
1902  return lscount - 1;
1903  }
1904 
1905  //if transferSystem PSet is present in the menu, we require it to be complete and consistent for all specified streams
1907  if (transferSystemJson_)
1908  return;
1909 
1910  transferSystemJson_.reset(new Json::Value);
1911  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1912  if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
1913  const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
1914 
1915  Json::Value destinationsVal(Json::arrayValue);
1916  std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
1917  for (auto& dest : destinations)
1918  destinationsVal.append(dest);
1919  (*transferSystemJson_)["destinations"] = destinationsVal;
1920 
1921  Json::Value modesVal(Json::arrayValue);
1922  std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
1923  for (auto& mode : modes)
1924  modesVal.append(mode);
1925  (*transferSystemJson_)["transferModes"] = modesVal;
1926 
1927  for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
1928  if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
1929  const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
1930  Json::Value streamVal;
1931  for (auto& mode : modes) {
1932  //validation
1933  if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
1934  throw cms::Exception("EvFDaqDirector")
1935  << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
1936  << ")";
1937  std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
1938 
1939  Json::Value sDestsValue(Json::arrayValue);
1940 
1941  if (streamDestinations.empty())
1942  throw cms::Exception("EvFDaqDirector")
1943  << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;
1944 
1945  for (auto& sdest : streamDestinations) {
1946  bool sDestValid = false;
1947  sDestsValue.append(sdest);
1948  for (auto& dest : destinations) {
1949  if (dest == sdest)
1950  sDestValid = true;
1951  }
1952  if (!sDestValid)
1953  throw cms::Exception("EvFDaqDirector")
1954  << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
1955  << ", dest:" << sdest;
1956  }
1957  streamVal[mode] = sDestsValue;
1958  }
1959  (*transferSystemJson_)[psKeyItr->first] = streamVal;
1960  }
1961  }
1962  } else {
1963  if (requireTSPSet_)
1964  throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
1965  }
1966  }
1967 
1969  std::string streamRequestName;
1970  if (transferSystemJson_->isMember(stream.c_str()))
1971  streamRequestName = stream;
1972  else {
1973  std::stringstream msg;
1974  msg << "Transfer system mode definitions missing for -: " << stream;
1975  if (requireTSPSet_)
1976  throw cms::Exception("EvFDaqDirector") << msg.str();
1977  else {
1978  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1979  return std::string("Failsafe");
1980  }
1981  }
1982  //return empty if strict check parameter is not on
1983  if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1984  edm::LogWarning("EvFDaqDirector")
1985  << "Selected mode string is not provided as DaqDirector parameter."
1986  << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1987  return std::string("Failsafe");
1988  }
1989  if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1990  throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
1991  }
1992  //check if stream has properly listed transfer stream
1993  if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
1994  std::stringstream msg;
1995  msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
1996  if (requireTSPSet_)
1997  throw cms::Exception("EvFDaqDirector") << msg.str();
1998  else
1999  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
2000  return std::string("Failsafe");
2001  }
2002  Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");
2003 
2004  //flatten string json::Array into CSV std::string
2005  std::string ret;
2006  for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
2007  if (!ret.empty())
2008  ret += ",";
2009  ret += (*it).asString();
2010  }
2011  return ret;
2012  }
2013 
2015  if (mergeTypePset_.empty())
2016  return;
2017  if (!mergeTypeMap_.empty())
2018  return;
2019  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
2020  if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
2021  const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
2022  for (const std::string& pname : tsPset.getParameterNames()) {
2023  std::string streamType = tsPset.getParameter<std::string>(pname);
2024  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2025  mergeTypeMap_.insert(ac, pname);
2026  ac->second = streamType;
2027  ac.release();
2028  }
2029  }
2030  }
2031 
2033  tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
2034  if (mergeTypeMap_.find(search_ac, stream))
2035  return search_ac->second;
2036 
2037  edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
2038  std::string defaultName = MergeTypeNames_[defaultType];
2039  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2040  mergeTypeMap_.insert(ac, stream);
2041  ac->second = defaultName;
2042  ac.release();
2043  return defaultName;
2044  }
2045 
2047  std::string proc_flag = run_dir_ + "/processing";
2048  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2049  close(proc_flag_fd);
2050  }
2051 
2052  struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
2053 #ifdef __APPLE__
2054  return {start, len, pid, type, whence};
2055 #else
2056  return {type, whence, start, len, pid};
2057 #endif
2058  }
2059 
2061  struct stat buf;
2062  return (stat(input_throttled_file_.c_str(), &buf) == 0);
2063  }
2064 
2065 } // namespace evf
ConfigurationDescriptions.h
runTheMatrix.ret
ret
prodAgent to be discontinued
Definition: runTheMatrix.py:543
evf::EvFDaqDirector::getOpenOutputJsonFilePath
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:426
edm::GlobalContext::luminosityBlockID
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:60
evf::EvFDaqDirector::preallocate
void preallocate(edm::service::SystemBounds const &bounds)
Definition: EvFDaqDirector.cc:321
evf::EvFDaqDirector::directorBU_
bool directorBU_
Definition: EvFDaqDirector.h:219
evf::EvFDaqDirector::previousFileSize_
unsigned long previousFileSize_
Definition: EvFDaqDirector.h:247
evf::EvFDaqDirector::bumpFile
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
Definition: EvFDaqDirector.cc:780
evf::EvFDaqDirector::requireTSPSet_
bool requireTSPSet_
Definition: EvFDaqDirector.h:216
evf::EvFDaqDirector::unlockFULocal2
void unlockFULocal2()
Definition: EvFDaqDirector.cc:929
evf::EvFDaqDirector::endpoint_iterator_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
Definition: EvFDaqDirector.h:284
eostools.ls
def ls(path, rec=False)
Definition: eostools.py:349
evf::EvFDaqDirector::dirManager_
DirManager dirManager_
Definition: EvFDaqDirector.h:245
Json::ValueIterator
Iterator for object and array value.
Definition: value.h:908
evf::EvFDaqDirector::createBoLSFile
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
Definition: EvFDaqDirector.cc:934
evf::EvFDaqDirector::fileBrokerKeepAlive_
bool fileBrokerKeepAlive_
Definition: EvFDaqDirector.h:212
evf::EvFDaqDirector::getRawFilePath
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
Definition: EvFDaqDirector.cc:406
electrons_cff.bool
bool
Definition: electrons_cff.py:366
mps_fire.i
i
Definition: mps_fire.py:428
evf::EvFDaqDirector::fulocal_rwlock_fd_
int fulocal_rwlock_fd_
Definition: EvFDaqDirector.h:236
getFRDFileHeaderVersion
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:43
DataPoint.h
evf::EvFDaqDirector::runEnded
Definition: EvFDaqDirector.h:64
MessageLogger.h
evf::EvFDaqDirector::useFileBroker_
bool useFileBroker_
Definition: EvFDaqDirector.h:208
evf::EvFDaqDirector::init_lock_
pthread_mutex_t init_lock_
Definition: EvFDaqDirector.h:261
evf::EvFDaqDirector::getEoLSFilePathOnBU
std::string getEoLSFilePathOnBU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:477
funct::false
false
Definition: Factorize.h:29
evf::EvFDaqDirector::filesToDeletePtr_
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
Definition: EvFDaqDirector.h:259
evf::EvFDaqDirector::getEoRFilePath
std::string getEoRFilePath() const
Definition: EvFDaqDirector.cc:489
evf::EvFDaqDirector::readEolsDefinition_
bool readEolsDefinition_
Definition: EvFDaqDirector.h:266
evf::EvFDaqDirector::mergeTypePset_
std::string mergeTypePset_
Definition: EvFDaqDirector.h:218
reco_skim_cfg_mod.fullpath
fullpath
Definition: reco_skim_cfg_mod.py:202
filterCSVwithJSON.copy
copy
Definition: filterCSVwithJSON.py:36
edm::ActivityRegistry::watchPostGlobalEndRun
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
Definition: ActivityRegistry.h:350
evf::EvFDaqDirector::bu_run_open_dir_
std::string bu_run_open_dir_
Definition: EvFDaqDirector.h:230
fffnaming::streamerDataFileNameWithPid
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:72
fffnaming::streamerDataChecksumFileNameWithInstance
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:91
Json::Value::end
const_iterator end() const
evf::EvFDaqDirector::bu_readlock_fd_
int bu_readlock_fd_
Definition: EvFDaqDirector.h:233
evf::EvFDaqDirector::nThreads_
unsigned int nThreads_
Definition: EvFDaqDirector.h:264
Json::arrayValue
array value (ordered list)
Definition: value.h:30
evf::EvFDaqDirector::getStreamMergeType
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
Definition: EvFDaqDirector.cc:2032
evf::EvFDaqDirector::getLumisectionToStart
unsigned int getLumisectionToStart() const
Definition: EvFDaqDirector.cc:1891
edm::ProcessContext
Definition: ProcessContext.h:27
evf::EvFDaqDirector::checkTransferSystemPSet
void checkTransferSystemPSet(edm::ProcessContext const &pc)
Definition: EvFDaqDirector.cc:1906
Json::Value::get
Value get(UInt index, const Value &defaultValue) const
pos
Definition: PixelAliasList.h:18
evf::EvFDaqDirector::getRootHistogramFilePath
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:469
ALCARECOPromptCalibProdSiPixelAli0T_cff.mode
mode
Definition: ALCARECOPromptCalibProdSiPixelAli0T_cff.py:96
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
evf::EvFDaqDirector::bu_writelock_fd_
int bu_writelock_fd_
Definition: EvFDaqDirector.h:234
evf::EvFDaqDirector::getProtocolBufferHistogramFilePath
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:455
evf::EvFDaqDirector::createLumiSectionFiles
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
Definition: EvFDaqDirector.cc:944
cms::cuda::stream
uint32_t const T *__restrict__ const uint32_t *__restrict__ int32_t int Histo::index_type cudaStream_t stream
Definition: HistoContainer.h:51
evf::EvFDaqDirector::contactFileBroker
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
Definition: EvFDaqDirector.cc:1452
fffnaming::protocolBufferHistogramFileNameWithInstance
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:129
evf::EvFDaqDirector::getStreamDestinations
std::string getStreamDestinations(std::string const &stream) const
Definition: EvFDaqDirector.cc:1968
evf::EvFDaqDirector::tryInitializeFuLockFile
void tryInitializeFuLockFile()
Definition: EvFDaqDirector.cc:881
cms::cuda::assert
assert(be >=bs)
evf::EvFDaqDirector::io_service_
boost::asio::io_service io_service_
Definition: EvFDaqDirector.h:281
fffnaming::inputRawFileName
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
Definition: FFFNamingSchema.h:48
evf::EvFDaqDirector::lockInitLock
void lockInitLock()
Definition: EvFDaqDirector.cc:910
evf::EvFDaqDirector::stop_ls_override_
unsigned int stop_ls_override_
Definition: EvFDaqDirector.h:270
mps_check.msg
tuple msg
Definition: mps_check.py:285
FRDEventMessage.h
evf::EvFDaqDirector::fileBrokerHost_
std::string fileBrokerHost_
Definition: EvFDaqDirector.h:210
edm::ParameterSet::existsAs
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:171
evf::EvFDaqDirector::runAbort
Definition: EvFDaqDirector.h:64
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
beamerCreator.create
def create(alignables, pedeDump, additionalData, outputFile, config)
Definition: beamerCreator.py:44
fffnaming::eorFileName
std::string eorFileName(const unsigned int run)
Definition: FFFNamingSchema.h:34
newFWLiteAna.found
found
Definition: newFWLiteAna.py:118
evf::EvFDaqDirector::getOpenDatFilePath
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:422
evf::EvFDaqDirector::getFFFParamsFilePathOnBU
std::string getFFFParamsFilePathOnBU() const
Definition: EvFDaqDirector.cc:493
fffnaming::eolsFileName
std::string eolsFileName(const unsigned int run, const unsigned int ls)
Definition: FFFNamingSchema.h:20
evf::EvFDaqDirector::query_
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
Definition: EvFDaqDirector.h:283
edm::LogWarning
Log< level::Warning, false > LogWarning
Definition: MessageLogger.h:122
Json::Reader
Unserialize a JSON document into a Value.
Definition: reader.h:16
evf::EvFDaqDirector::baseRunDir
std::string & baseRunDir()
Definition: EvFDaqDirector.h:76
evf::EvFDaqDirector::getRunOpenDirPath
std::string getRunOpenDirPath() const
Definition: EvFDaqDirector.h:106
evf::EvFDaqDirector::getDatFilePath
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:418
evf::EvFDaqDirector::preBeginJob
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
Definition: EvFDaqDirector.cc:360
myMessageLogger_cff.destinations
destinations
Definition: myMessageLogger_cff.py:36
evf::EvFDaqDirector::getInitFilePath
std::string getInitFilePath(std::string const &stream) const
Definition: EvFDaqDirector.cc:446
getHLTprescales.readIndex
def readIndex()
Definition: getHLTprescales.py:36
evf::FastMonitoringService::accumulateFileSize
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
Definition: FastMonitoringService.cc:670
evf::EvFDaqDirector::openFULockfileStream
void openFULockfileStream(bool create)
Definition: EvFDaqDirector.cc:891
contentValuesCheck.ss
ss
Definition: contentValuesCheck.py:33
evf::FastMonitoringService
Definition: FastMonitoringService.h:155
evf::EvFDaqDirector::updateFuLock
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
Definition: EvFDaqDirector.cc:504
evf::EvFDaqDirector::getEoLSFilePathOnFU
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:481
FRDFileHeader.h
GlobalPosition_Frontier_DevDB_cff.connect
connect
Definition: GlobalPosition_Frontier_DevDB_cff.py:8
RPCNoise_example.check
check
Definition: RPCNoise_example.py:71
evf::EvFDaqDirector::stopFilePath_
std::string stopFilePath_
Definition: EvFDaqDirector.h:268
edm::ConfigurationDescriptions::add
void add(std::string const &label, ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:57
Calorimetry_cff.dp
dp
Definition: Calorimetry_cff.py:158
edm::ActivityRegistry::watchPreBeginJob
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
Definition: ActivityRegistry.h:151
DQM.reader
reader
Definition: DQM.py:105
Service.h
evf::EvFDaqDirector::inputThrottled
bool inputThrottled()
Definition: EvFDaqDirector.cc:2060
evf::EvFDaqDirector::parseFRDFileHeader
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
Definition: EvFDaqDirector.cc:966
evf::EvFDaqDirector::getOpenInputJsonFilePath
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
Definition: EvFDaqDirector.cc:414
evf::EvFDaqDirector::selectedTransferMode_
std::string selectedTransferMode_
Definition: EvFDaqDirector.h:217
evf::EvFDaqDirector::fms_
evf::FastMonitoringService * fms_
Definition: EvFDaqDirector.h:256
edm::ActivityRegistry
Definition: ActivityRegistry.h:134
summarizeEdmComparisonLogfiles.success
success
Definition: summarizeEdmComparisonLogfiles.py:114
corrVsCorr.filename
filename
Definition: corrVsCorr.py:123
Json::Value::asInt
Int asInt() const
eostools.mkdir
def mkdir(path)
Definition: eostools.py:251
fffnaming::streamerDataFileNameWithInstance
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:81
evf::EvFDaqDirector::noFile
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::getMergedProtocolBufferHistogramFilePath
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:460
evf::EvFDaqDirector::run_
unsigned int run_
Definition: EvFDaqDirector.h:207
edm::ProcessContext::parameterSetID
ParameterSetID const & parameterSetID() const
Definition: ProcessContext.h:32
ParameterSetDescription.h
evf::EvFDaqDirector::eolsNFilesIndex_
unsigned int eolsNFilesIndex_
Definition: EvFDaqDirector.h:267
unpackData-CaloStage2.pname
pname
Definition: unpackData-CaloStage2.py:76
evf::EvFDaqDirector::pid_
std::string pid_
Definition: EvFDaqDirector.h:227
evf::EvFDaqDirector::fileBrokerUseLocalLock_
bool fileBrokerUseLocalLock_
Definition: EvFDaqDirector.h:213
mitigatedMETSequence_cff.U
U
Definition: mitigatedMETSequence_cff.py:36
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
jsoncollector::DataPointDefinition::getNames
std::vector< std::string > const & getNames() const
Definition: DataPointDefinition.h:44
geometryDiff.file
file
Definition: geometryDiff.py:13
evf::EvFDaqDirector::MergeTypeNames_
static const std::vector< std::string > MergeTypeNames_
Definition: EvFDaqDirector.h:276
evf::EvFDaqDirector::fileDeleteLockPtr_
std::mutex * fileDeleteLockPtr_
Definition: EvFDaqDirector.h:258
evf::EvFDaqDirector::getOutputJsonFilePath
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:430
cppFunctionSkipper.exception
exception
Definition: cppFunctionSkipper.py:10
funct::true
true
Definition: Factorize.h:173
evf::EvFDaqDirector::fulocal_rwlock_fd2_
int fulocal_rwlock_fd2_
Definition: EvFDaqDirector.h:237
fffnaming::rootHistogramFileNameWithPid
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:139
edm::GlobalContext
Definition: GlobalContext.h:29
edm::service::SystemBounds
Definition: SystemBounds.h:29
LogDebug
#define LogDebug(id)
Definition: MessageLogger.h:233
fffnaming::initFileNameWithPid
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:55
edm::ParameterSet
Definition: ParameterSet.h:47
edm::service::SystemBounds::maxNumberOfThreads
unsigned int maxNumberOfThreads() const
Definition: SystemBounds.h:38
Json::Value::append
Value & append(const Value &value)
Append value to array at the end.
evf::EvFDaqDirector::preBeginRun
void preBeginRun(edm::GlobalContext const &globalContext)
Definition: EvFDaqDirector.cc:365
evf::EvFDaqDirector::fileBrokerPort_
std::string fileBrokerPort_
Definition: EvFDaqDirector.h:211
GlobalContext.h
evf::EvFDaqDirector::getMergedDatChecksumFilePath
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:438
evf::EvFDaqDirector::getNextFromFileBroker
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
Definition: EvFDaqDirector.cc:1718
SiStripPI::max
Definition: SiStripPayloadInspectorHelper.h:169
evf::EvFDaqDirector::~EvFDaqDirector
~EvFDaqDirector()
Definition: EvFDaqDirector.cc:302
eostools.chmod
def chmod(path, mode)
Definition: eostools.py:294
SiStripCommissioningSource_FromEDM_cfg.EvFDaqDirector
EvFDaqDirector
Definition: SiStripCommissioningSource_FromEDM_cfg.py:57
evf::EvFDaqDirector::mergeTypeMap_
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
Definition: EvFDaqDirector.h:273
evf::EvFDaqDirector::unlockFULocal
void unlockFULocal()
Definition: EvFDaqDirector.cc:919
fffnaming::streamerJsonFileNameWithPid
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:101
recoMuon::in
Definition: RecoMuonEnumerators.h:6
gainCalibHelper::gainCalibPI::type
type
Definition: SiPixelGainCalibHelper.h:40
evf::EvFDaqDirector::getOpenInitFilePath
std::string getOpenInitFilePath(std::string const &stream) const
Definition: EvFDaqDirector.cc:442
evf::EvFDaqDirector::run_dir_
std::string run_dir_
Definition: EvFDaqDirector.h:228
edm::Service
Definition: Service.h:30
createfilelist.int
int
Definition: createfilelist.py:10
EvFDaqDirector.h
FastMonitoringService.h
evf::EvFDaqDirector::getMergedRootHistogramFilePath
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:473
edm::LuminosityBlockID::luminosityBlock
LuminosityBlockNumber_t luminosityBlock() const
Definition: LuminosityBlockID.h:42
evf::DirManager::findHighestRunDir
std::string findHighestRunDir()
Definition: DirManager.cc:23
evf::EvFDaqDirector::fuLockPollInterval_
unsigned int fuLockPollInterval_
Definition: EvFDaqDirector.h:214
evf::EvFDaqDirector::lockFULocal
void lockFULocal()
Definition: EvFDaqDirector.cc:914
evf::EvFDaqDirector::getOpenProtocolBufferHistogramFilePath
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:450
evf::EvFDaqDirector
Definition: EvFDaqDirector.h:62
evf::EvFDaqDirector::bu_base_dir_
std::string bu_base_dir_
Definition: EvFDaqDirector.h:206
edm::service::SystemBounds::maxNumberOfStreams
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:35
jsoncollector::DataPoint
Definition: DataPoint.h:36
evf::EvFDaqDirector::createRunOpendirMaybe
void createRunOpendirMaybe()
Definition: EvFDaqDirector.cc:1867
Json::Value::begin
const_iterator begin() const
evf::EvFDaqDirector::getEoRFilePathOnFU
std::string getEoRFilePathOnFU() const
Definition: EvFDaqDirector.cc:491
edm::LogError
Log< level::Error, false > LogError
Definition: MessageLogger.h:123
evf::EvFDaqDirector::fu_rw_fulk
struct flock fu_rw_fulk
Definition: EvFDaqDirector.h:254
res
Definition: Electron.h:6
evf::EvFDaqDirector::hltSourceDirectory_
std::string hltSourceDirectory_
Definition: EvFDaqDirector.h:220
evf::EvFDaqDirector::startFromLS_
unsigned int startFromLS_
Definition: EvFDaqDirector.h:222
visDQMUpload.buf
buf
Definition: visDQMUpload.py:160
AlCaHLTBitMon_QueryRunRegistry.string
string string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
FedRawDataInputSource.h
jsoncollector
Definition: DataPoint.h:26
evf::EvFDaqDirector::readLastLSEntry
int readLastLSEntry(std::string const &file)
Definition: EvFDaqDirector.cc:1877
jsoncollector::DataPointDefinition
Definition: DataPointDefinition.h:20
readEcalDQMStatus.read
read
Definition: readEcalDQMStatus.py:38
evf::EvFDaqDirector::getMergedDatFilePath
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:434
writeEcalDQMStatus.write
write
Definition: writeEcalDQMStatus.py:48
edm::ActivityRegistry::watchPreGlobalEndLumi
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
Definition: ActivityRegistry.h:421
edm::ActivityRegistry::watchPreallocate
void watchPreallocate(Preallocate::slot_type const &iSlot)
Definition: ActivityRegistry.h:144
FRDFileHeader_v1
Definition: FRDFileHeader.h:24
evf::EvFDaqDirector::fu_rw_flk
struct flock fu_rw_flk
Definition: EvFDaqDirector.h:253
DataPointDefinition.h
evf::EvFDaqDirector::run_string_
std::string run_string_
Definition: EvFDaqDirector.h:225
edm::getParameterSet
ParameterSet const & getParameterSet(ParameterSetID const &id)
Definition: ParameterSet.cc:862
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
evf::EvFDaqDirector::socket_
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
Definition: EvFDaqDirector.h:285
std
Definition: JetResolutionObject.h:76
evf::EvFDaqDirector::createProcessingNotificationMaybe
void createProcessingNotificationMaybe() const
Definition: EvFDaqDirector.cc:2046
evf::EvFDaqDirector::initRun
void initRun()
Definition: EvFDaqDirector.cc:149
evf::MergeType
MergeType
Definition: EvFDaqDirector.h:58
evf::EvFDaqDirector::rawFileHasHeader
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
Definition: EvFDaqDirector.cc:1062
edm::PathsAndConsumesOfModulesBase
Definition: PathsAndConsumesOfModulesBase.h:35
fffnaming::rootHistogramFileNameWithInstance
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
Definition: FFFNamingSchema.h:148
fffnaming::inputJsonFileName
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
Definition: FFFNamingSchema.h:41
Exception
Definition: hltDiff.cc:245
evf::EvFDaqDirector::fulockfile_
std::string fulockfile_
Definition: EvFDaqDirector.h:231
MatrixUtil.remove
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:219
evf
Definition: fillJson.h:27
evf::EvFDaqDirector::preGlobalEndLumi
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
Definition: EvFDaqDirector.cc:384
evf::EvFDaqDirector::transferSystemJson_
std::shared_ptr< Json::Value > transferSystemJson_
Definition: EvFDaqDirector.h:272
evf::EvFDaqDirector::getInputJsonFilePath
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
Definition: EvFDaqDirector.cc:402
evf::EvFDaqDirector::hostname_
std::string hostname_
Definition: EvFDaqDirector.h:224
evf::EvFDaqDirector::getOpenRootHistogramFilePath
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: EvFDaqDirector.cc:465
evf::EvFDaqDirector::dpd_
jsoncollector::DataPointDefinition * dpd_
Definition: EvFDaqDirector.h:279
edm::ParameterSet::getParameter
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
Exception.h
evf::EvFDaqDirector::resolver_
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
Definition: EvFDaqDirector.h:282
evf::EvFDaqDirector::removeFile
void removeFile(unsigned int ls, unsigned int index)
Definition: EvFDaqDirector.cc:502
fffnaming::bolsFileName
std::string bolsFileName(const unsigned int run, const unsigned int ls)
Definition: FFFNamingSchema.h:27
evf::EvFDaqDirector::nStreams_
unsigned int nStreams_
Definition: EvFDaqDirector.h:263
data
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
evf::EvFDaqDirector::grabNextJsonFileAndUnlock
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
Definition: EvFDaqDirector.cc:1363
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
timingPdfMaker.infile
infile
Definition: timingPdfMaker.py:349
evf::EvFDaqDirector::fu_readwritelock_fd_
int fu_readwritelock_fd_
Definition: EvFDaqDirector.h:235
RecoTauValidation_cfi.header
header
Definition: RecoTauValidation_cfi.py:291
evf::EvFDaqDirector::input_throttled_file_
std::string input_throttled_file_
Definition: EvFDaqDirector.h:287
cond::uint64_t
unsigned long long uint64_t
Definition: Time.h:13
evf::EvFDaqDirector::bu_run_dir_
std::string bu_run_dir_
Definition: EvFDaqDirector.h:229
evf::EvFDaqDirector::getBoLSFilePathOnFU
std::string getBoLSFilePathOnFU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:485
spu::def
int def(FILE *, FILE *, int)
Definition: SherpackUtilities.cc:14
mps_fire.result
result
Definition: mps_fire.py:311
evf::EvFDaqDirector::base_dir_
std::string base_dir_
Definition: EvFDaqDirector.h:205
cms::Exception
Definition: Exception.h:70
timingPdfMaker.outfile
outfile
Definition: timingPdfMaker.py:350
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
edm::ActivityRegistry::watchPreGlobalBeginRun
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
Definition: ActivityRegistry.h:333
evf::EvFDaqDirector::fu_rw_lock_stream
FILE * fu_rw_lock_stream
Definition: EvFDaqDirector.h:241
command_line.start
start
Definition: command_line.py:167
fffnaming::protocolBufferHistogramFileNameWithPid
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Definition: FFFNamingSchema.h:120
evf::EvFDaqDirector::getOpenRawFilePath
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
Definition: EvFDaqDirector.cc:410
evf::EvFDaqDirector::stopFilePathPid_
std::string stopFilePathPid_
Definition: EvFDaqDirector.h:269
edm_modernize_messagelogger.stat
stat
Definition: edm_modernize_messagelogger.py:27
evf::EvFDaqDirector::unlockInitLock
void unlockInitLock()
Definition: EvFDaqDirector.cc:912
evf::EvFDaqDirector::run_nstring_
std::string run_nstring_
Definition: EvFDaqDirector.h:226
ProcessContext.h
evf::EvFDaqDirector::postEndRun
void postEndRun(edm::GlobalContext const &globalContext)
Definition: EvFDaqDirector.cc:375
edm::Log
Definition: MessageLogger.h:70
StreamID.h
evf::EvFDaqDirector::FileStatus
FileStatus
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::lockFULocal2
void lockFULocal2()
Definition: EvFDaqDirector.cc:924
SystemBounds.h
evf::EvFDaqDirector::grabNextJsonFile
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
Definition: EvFDaqDirector.cc:1183
evf::EvFDaqDirector::getNFilesFromEoLS
int getNFilesFromEoLS(std::string BUEoLSFile)
Definition: EvFDaqDirector.cc:719
edm::ParameterSet::getParameterSet
ParameterSet const & getParameterSet(std::string const &) const
Definition: ParameterSet.cc:2128
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
Json::Value
Represents a JSON value.
Definition: value.h:99
evf::EvFDaqDirector::fillDescriptions
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: EvFDaqDirector.cc:328
evf::EvFDaqDirector::bu_w_lock_stream
FILE * bu_w_lock_stream
Definition: EvFDaqDirector.h:239
evf::EvFDaqDirector::fileBrokerHostFromCfg_
bool fileBrokerHostFromCfg_
Definition: EvFDaqDirector.h:209
mps_fire.dest
dest
Definition: mps_fire.py:179
evf::EvFDaqDirector::newFile
Definition: EvFDaqDirector.h:64
MillePedeFileConverter_cfg.e
e
Definition: MillePedeFileConverter_cfg.py:37
evf::EvFDaqDirector::grabNextJsonFromRaw
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
Definition: EvFDaqDirector.cc:1101
evf::EvFDaqDirector::checkMergeTypePSet
void checkMergeTypePSet(edm::ProcessContext const &pc)
Definition: EvFDaqDirector.cc:2014