CMS 3D CMS Logo

EvFDaqDirector.cc
Go to the documentation of this file.
11 
19 
20 #include <iostream>
21 #include <fstream>
22 #include <sstream>
23 #include <sys/time.h>
24 #include <unistd.h>
25 #include <cstdio>
26 #include <boost/algorithm/string.hpp>
27 
28 //using boost::asio::ip::tcp;
29 
30 //#define DEBUG
31 
32 using namespace jsoncollector;
33 
34 namespace evf {
35 
36  //for enum MergeType
37  const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
38 
40  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
41  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
42  bu_base_dirs_all_(pset.getUntrackedParameter<std::vector<std::string>>("buBaseDirsAll")),
43  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
44  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
45  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
46  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
47  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
48  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
49  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
50  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
51  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
52  requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
53  selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
54  mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
55  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
56  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
57  hostname_(""),
58  bu_readlock_fd_(-1),
59  bu_writelock_fd_(-1),
60  fu_readwritelock_fd_(-1),
61  fulocal_rwlock_fd_(-1),
62  fulocal_rwlock_fd2_(-1),
63  bu_w_lock_stream(nullptr),
64  bu_r_lock_stream(nullptr),
65  fu_rw_lock_stream(nullptr),
66  dirManager_(base_dir_),
67  previousFileSize_(0),
68  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
69  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
70  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
71  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
72  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
73  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
79 
80  //save hostname for later
81  char hostname[33];
82  gethostname(hostname, 32);
83  hostname_ = hostname;
84 
85  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
86  if (fuLockPollIntervalPtr) {
87  try {
88  fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
89  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
90  << " us";
91  } catch (const std::exception&) {
92  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
93  }
94  }
95 
96  //override file service parameter if specified by environment
97  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
98  if (fileBrokerParamPtr) {
99  try {
100  useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
101  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
102  } catch (const std::exception&) {
103  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
104  }
105  }
106  if (useFileBroker_) {
108  //find BU data address from hltd configuration
110  struct stat buf;
111  if (stat("/etc/appliance/bus.config", &buf) == 0) {
112  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
113  std::getline(busconfig, fileBrokerHost_);
114  }
115  if (fileBrokerHost_.empty())
116  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
117  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
118  throw cms::Exception("EvFDaqDirector")
119  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
120 
121  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
122  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
123  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
124  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
125  }
126 
127  char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
128  if (startFromLSPtr) {
129  try {
130  startFromLS_ = std::stoul(std::string(startFromLSPtr));
131  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
132  } catch (const std::exception&) {
133  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
134  }
135  }
136 
137  //override file service parameter if specified by environment
138  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
139  if (fileBrokerUseLockParamPtr) {
140  try {
141  fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
142  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
144  } catch (const std::exception&) {
145  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
146  }
147  }
148 
149  std::stringstream ss;
150  ss << "run" << std::setfill('0') << std::setw(6) << run_;
151  run_string_ = ss.str();
152  ss = std::stringstream();
153  ss << run_;
154  run_nstring_ = ss.str();
155  run_dir_ = base_dir_ + "/" + run_string_;
156  input_throttled_file_ = run_dir_ + "/input_throttle";
157  discard_ls_filestem_ = run_dir_ + "/discard_ls";
158  ss = std::stringstream();
159  ss << getpid();
160  pid_ = ss.str();
161  }
162 
164  // check if base dir exists or create it accordingly
165  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
166  if (retval != 0 && errno != EEXIST) {
167  throw cms::Exception("DaqDirector")
168  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
169  }
170 
171  //create run dir in base dir
172  umask(0);
173  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
174  if (retval != 0 && errno != EEXIST) {
175  throw cms::Exception("DaqDirector")
176  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
177  }
178 
179  //create fu-local.lock in run open dir
180  if (!directorBU_) {
182  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
184  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
185  if (fulocal_rwlock_fd_ == -1)
186  throw cms::Exception("DaqDirector")
187  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
188  chmod(fulocal_lock_.c_str(), 0777);
189  fsync(fulocal_rwlock_fd_);
190  //open second fd for another input source thread
192  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
193  if (fulocal_rwlock_fd2_ == -1)
194  throw cms::Exception("DaqDirector")
195  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
196  }
197 
198  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
199  //for BU, it is created at this point
200  if (directorBU_) {
202  std::string bulockfile = bu_run_dir_ + "/bu.lock";
203  fulockfile_ = bu_run_dir_ + "/fu.lock";
204 
205  //make or find bu run dir
206  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
207  if (retval != 0 && errno != EEXIST) {
208  throw cms::Exception("DaqDirector")
209  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno);
210  }
211  bu_run_open_dir_ = bu_run_dir_ + "/open";
212  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
213  if (retval != 0 && errno != EEXIST) {
214  throw cms::Exception("DaqDirector")
215  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno);
216  }
217 
218  // the BU director does not need to know about the fu lock
219  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
220  if (bu_writelock_fd_ == -1)
221  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
222  else
223  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
224  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
225  if (bu_w_lock_stream == nullptr)
226  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
227 
228  // BU INITIALIZES LOCK FILE
229  // FU LOCK FILE OPEN
230  openFULockfileStream(true);
232  fflush(fu_rw_lock_stream);
233  close(fu_readwritelock_fd_);
234 
235  if (!hltSourceDirectory_.empty()) {
236  struct stat buf;
237  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
238  std::string hltdir = bu_run_dir_ + "/hlt";
239  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
240  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
241  if (retval != 0 && errno != EEXIST)
242  throw cms::Exception("DaqDirector")
243  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno);
244 
245  std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
246  std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
247 
248  std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
249  for (auto& optfile : optfiles) {
250  try {
251  std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
252  } catch (...) {
253  }
254  }
255 
256  std::filesystem::rename(tmphltdir, hltdir);
257  } else
258  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
259  }
260  //else{}//no configuration specified
261  } else {
262  // for FU, check if bu base dir exists
263 
264  auto checkExists = [=](std::string const& bu_base_dir) -> void {
265  int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
266  if (retval != 0 && errno != EEXIST) {
267  throw cms::Exception("DaqDirector")
268  << " Error checking for bu base dir -: " << bu_base_dir << " mkdir error:" << strerror(errno);
269  }
270  };
271 
272  auto waitForDir = [=](std::string const& bu_base_dir) -> void {
273  int cnt = 0;
274  while (!edm::shutdown_flag.load(std::memory_order_relaxed)) {
275  int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
276  if (retval != 0 && errno != EEXIST) {
277  usleep(500000);
278  cnt++;
279  if (cnt % 20 == 0)
280  edm::LogWarning("DaqDirector") << "waiting for " << bu_base_dir;
281  if (cnt > 120)
282  throw cms::Exception("DaqDirector") << " Error checking for bu base dir after 1 minute -: " << bu_base_dir
283  << " mkdir error:" << strerror(errno);
284  }
285  break;
286  }
287  };
288 
289  if (!bu_base_dirs_all_.empty()) {
290  checkExists(bu_base_dirs_all_[0]);
292  for (unsigned int i = 1; i < bu_base_dirs_all_.size(); i++)
293  waitForDir(bu_base_dirs_all_[i]);
294  } else {
295  checkExists(bu_base_dir_);
297  }
298 
299  fulockfile_ = bu_run_dir_ + "/fu.lock";
300  if (!useFileBroker_)
301  openFULockfileStream(false);
302  }
303 
304  pthread_mutex_init(&init_lock_, nullptr);
305 
306  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
307  std::stringstream sstp;
308  sstp << stopFilePath_ << "_pid" << pid_;
309  stopFilePathPid_ = sstp.str();
310 
311  if (!directorBU_) {
312  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
313  struct stat statbuf;
314  if (!stat(defPath.c_str(), &statbuf))
315  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
316  else {
317  //look in source directory if not present in ramdisk
318  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
319  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
320  if (stat(defPath.c_str(), &statbuf)) {
321  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
322  if (stat(defPath.c_str(), &statbuf)) {
323  defPath = defPathSuffix;
324  }
325  }
326  }
327  dpd_ = new DataPointDefinition();
328  std::string defLabel = "data";
329  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
330  }
331  }
332 
334  //close server connection
335  if (socket_.get() && socket_->is_open()) {
336  boost::system::error_code ec;
337  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
338  socket_->close(ec);
339  }
340 
341  if (fulocal_rwlock_fd_ != -1) {
342  unlockFULocal();
343  close(fulocal_rwlock_fd_);
344  }
345 
346  if (fulocal_rwlock_fd2_ != -1) {
347  unlockFULocal2();
348  close(fulocal_rwlock_fd2_);
349  }
350  }
351 
353  initRun();
354 
355  nThreads_ = bounds.maxNumberOfStreams();
356  nStreams_ = bounds.maxNumberOfThreads();
357  nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
358  }
359 
362  desc.setComment(
363  "Service used for file locking arbitration and for propagating information between other EvF components");
364  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
365  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
366  desc.addUntracked<std::vector<std::string>>("buBaseDirsAll", std::vector<std::string>())
367  ->setComment("BU base ramdisk directories for multi-file DAQSource models");
368  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
369  desc.addUntracked<bool>("useFileBroker", false)
370  ->setComment("Use BU file service to grab input data instead of NFS file locking");
371  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
372  ->setComment("Allow service to discover BU address from hltd configuration");
373  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
374  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
375  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
376  ->setComment("Use keep alive to avoid using large number of sockets");
377  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
378  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
379  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
380  ->setComment("Lock polling interval in microseconds for the input directory file lock");
381  desc.addUntracked<bool>("outputAdler32Recheck", false)
382  ->setComment("Check Adler32 of per-process output files while micro-merging");
383  desc.addUntracked<bool>("requireTransfersPSet", false)
384  ->setComment("Require complete transferSystem PSet in the process configuration");
385  desc.addUntracked<std::string>("selectedTransferMode", "")
386  ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
387  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
388  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
389  desc.addUntracked<std::string>("mergingPset", "")
390  ->setComment("Name of merging PSet to look for merging type definitions for streams");
391  descriptions.add("EvFDaqDirector", desc);
392  }
393 
396  checkMergeTypePSet(pc);
397  }
398 
399  void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
400  //assert(run_ == id.run());
401 
402  // check if the requested run is the latest one - issue a warning if it isn't
404  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
405  << ". This is not the highest run " << dirManager_.findHighestRunDir();
406  }
407  }
408 
409  void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
410  close(bu_readlock_fd_);
411  close(bu_writelock_fd_);
412  if (directorBU_) {
413  std::string filename = bu_run_dir_ + "/bu.lock";
415  }
416  }
417 
419  //delete all files belonging to just closed lumi
420  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
422  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
423  return;
424  }
425 
426  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
427  auto it = filesToDeletePtr_->begin();
428  while (it != filesToDeletePtr_->end()) {
429  if (it->second->lumi_ == ls && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
430  it = filesToDeletePtr_->erase(it);
431  } else
432  it++;
433  }
434  }
435 
436  std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
438  }
439 
440  std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
442  }
443 
444  std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
445  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
446  }
447 
448  std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
449  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
450  }
451 
452  std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
454  }
455 
458  }
459 
462  }
463 
466  }
467 
470  }
471 
474  }
475 
477  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
478  }
479 
482  }
483 
486  }
487 
489  std::string const& stream) const {
491  }
492 
494  std::string const& stream) const {
496  }
497 
499  std::string const& stream) const {
501  }
502 
505  }
506 
509  }
510 
513  }
514 
516  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
517  }
518 
520  return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
521  }
522 
524  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
525  }
526 
528 
530 
532 
533  std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
534 
536  int retval = remove(filename.c_str());
537  if (retval != 0)
538  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
539  << ". error = " << strerror(errno);
540  }
541 
543  std::string& nextFile,
544  uint32_t& fsize,
545  uint16_t& rawHeaderSize,
546  uint64_t& lockWaitTime,
547  bool& setExceptionState) {
548  EvFDaqDirector::FileStatus fileStatus = noFile;
549  rawHeaderSize = 0;
550 
551  int retval = -1;
552  int lock_attempts = 0;
553  long total_lock_attempts = 0;
554 
555  struct stat buf;
556  int stopFileLS = -1;
557  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
558  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
559  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
560  if (stopFileCheck == 0)
561  stopFileLS = readLastLSEntry(stopFilePath_);
562  else
563  stopFileLS = 1; //stop without drain if only pid is stopped
564  if (!stop_ls_override_) {
565  //if lumisection is higher than in stop file, should quit at next from current
566  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
567  stopFileLS = stop_ls_override_ = ls;
568  } else
569  stopFileLS = stop_ls_override_;
570  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
571  << stopFileLS;
572  //return runEnded;
573  } else //if file was removed before reaching stop condition, reset this
574  stop_ls_override_ = 0;
575 
576  timeval ts_lockbegin;
577  gettimeofday(&ts_lockbegin, nullptr);
578 
579  while (retval == -1) {
580  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
581  if (retval == -1)
582  usleep(fuLockPollInterval_);
583  else
584  continue;
585 
586  lock_attempts += fuLockPollInterval_;
587  total_lock_attempts += fuLockPollInterval_;
588  if (lock_attempts > 5000000 || errno == 116) {
589  if (errno == 116)
590  edm::LogWarning("EvFDaqDirector")
591  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
592  else
593  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
594  "fu.lock file are present -: errno "
595  << errno << ":" << strerror(errno) << std::endl;
596 
597  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
598  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
599  ls++;
600  return noFile;
601  }
602 
603  if (stat(bu_run_dir_.c_str(), &buf) != 0)
604  return runEnded;
605  if (stat(fulockfile_.c_str(), &buf) != 0)
606  return runEnded;
607 
608  lock_attempts = 0;
609  }
610  if (total_lock_attempts > 5 * 60000000) {
611  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
612  return runAbort;
613  }
614  }
615 
616  timeval ts_lockend;
617  gettimeofday(&ts_lockend, nullptr);
618  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
619  if (deltat > 0.)
620  lockWaitTime = deltat;
621 
622  if (retval != 0)
623  return fileStatus;
624 
625 #ifdef DEBUG
626  timeval ts_lockend;
627  gettimeofday(&ts_lockend, 0);
628 #endif
629 
630  //open another lock file FD after the lock using main fd has been acquired
631  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
632  if (fu_readwritelock_fd2 == -1)
633  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
634  << " create. error:" << strerror(errno);
635 
636  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
637 
638  // if the stream is readable
639  if (fu_rw_lock_stream2 != nullptr) {
640  unsigned int readLs, readIndex;
641  int check = 0;
642  // rewind the stream
643  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
644  // if rewinded ok
645  if (check == 0) {
646  // read its' values
647  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
648  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
649 
650  unsigned int currentLs = readLs;
651  bool bumpedOk = false;
652  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
653  //no lock file write in this case
654  if (ls && ls + 1 < currentLs)
655  ls++;
656  else {
657  // try to bump (look for new index or EoLS file)
658  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
659  //avoid 2 lumisections jump
660  if (ls && readLs > currentLs && currentLs > ls) {
661  ls++;
662  readLs = currentLs = ls;
663  readIndex = 0;
664  bumpedOk = false;
665  //no write to lock file
666  } else {
667  if (ls == 0 && readLs > currentLs) {
668  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
669  //in this case there is no new file in the same LS
670  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
671  readLs = currentLs;
672  readIndex = 0;
673  bumpedOk = false;
674  //no write to lock file
675  }
676  //update return LS value
677  ls = readLs;
678  }
679  }
680  if (bumpedOk) {
681  // there is a new index file to grab, lock file needs to be updated
682  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
683  if (check == 0) {
684  ftruncate(fu_readwritelock_fd2, 0);
685  // write next index in the file, which is the file the next process should take
686  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
687  fflush(fu_rw_lock_stream2);
688  fsync(fu_readwritelock_fd2);
689  fileStatus = newFile;
690  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
691  } else {
692  edm::LogError("EvFDaqDirector")
693  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
694  setExceptionState = true;
695  return noFile;
696  }
697  } else if (currentLs < readLs) {
698  //there is no new file in next LS (yet), but lock file can be updated to the next LS
699  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
700  if (check == 0) {
701  ftruncate(fu_readwritelock_fd2, 0);
702  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
703  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
704  fflush(fu_rw_lock_stream2);
705  fsync(fu_readwritelock_fd2);
706  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
707  } else {
708  edm::LogError("EvFDaqDirector")
709  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
710  setExceptionState = true;
711  return noFile;
712  }
713  }
714  } else {
715  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
716  << strerror(errno);
717  }
718  } else {
719  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
720  }
721  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
722 
723 #ifdef DEBUG
724  timeval ts_preunlock;
725  gettimeofday(&ts_preunlock, 0);
726  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
727  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
728 #endif
729 
730  //if new json is present, lock file which FedRawDataInputSource will later unlock
731  if (fileStatus == newFile)
732  lockFULocal();
733 
734  //release lock at this point
735  int retvalu = -1;
736  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
737  if (retvalu == -1)
738  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
739 
740 #ifdef DEBUG
741  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
742 #endif
743 
744  if (fileStatus == noFile) {
745  struct stat buf;
746  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
747  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
748  fileStatus = runEnded;
749  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
750  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
751  fileStatus = runEnded;
752  }
753  }
754  return fileStatus;
755  }
756 
758  std::ifstream ij(BUEoLSFile);
759  Json::Value deserializeRoot;
761 
762  if (!reader.parse(ij, deserializeRoot)) {
763  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
764  return -1;
765  }
766 
768  DataPoint dp;
769  dp.deserialize(deserializeRoot);
770 
771  //read definition
772  if (readEolsDefinition_) {
773  //std::string def = boost::algorithm::trim(dp.getDefinition());
774  std::string def = dp.getDefinition();
775  if (def.empty())
776  readEolsDefinition_ = false;
777  while (!def.empty()) {
779  if (def.find('/') == 0)
780  fullpath = def;
781  else
782  fullpath = bu_run_dir_ + '/' + def;
783  struct stat buf;
784  if (stat(fullpath.c_str(), &buf) == 0) {
785  DataPointDefinition eolsDpd;
786  std::string defLabel = "legend";
787  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
788  if (eolsDpd.getNames().empty()) {
789  //try with "data" label if "legend" format is not used
790  eolsDpd = DataPointDefinition();
791  defLabel = "data";
792  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
793  }
794  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
795  if (eolsDpd.getNames().at(i) == "NFiles")
797  readEolsDefinition_ = false;
798  break;
799  }
800  //check if we can still find definition
801  if (def.size() <= 1 || def.find('/') == std::string::npos) {
802  readEolsDefinition_ = false;
803  break;
804  }
805  def = def.substr(def.find('/') + 1);
806  }
807  }
808 
809  if (dp.getData().size() > eolsNFilesIndex_)
810  data = dp.getData()[eolsNFilesIndex_];
811  else {
812  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
813  return -1;
814  }
815  return std::stoi(data);
816  }
817 
818  bool EvFDaqDirector::bumpFile(unsigned int& ls,
819  unsigned int& index,
820  std::string& nextFile,
821  uint32_t& fsize,
822  uint16_t& rawHeaderSize,
823  int maxLS,
824  bool& setExceptionState) {
825  if (previousFileSize_ != 0) {
826  if (!fms_) {
828  }
829  if (fms_)
831  previousFileSize_ = 0;
832  }
833  nextFile = "";
834 
835  //reached limit
836  if (maxLS >= 0 && ls > (unsigned int)maxLS)
837  return false;
838 
839  struct stat buf;
840  std::stringstream ss;
841 
842  // 1. Check suggested file
843  std::string nextFileJson = getInputJsonFilePath(ls, index);
844  if (stat(nextFileJson.c_str(), &buf) == 0) {
845  fsize = previousFileSize_ = buf.st_size;
846  nextFile = nextFileJson;
847  return true;
848  }
849  // 2. No file -> lumi ended? (and how many?)
850  else {
851  // 3. No file -> check for standalone raw file
852  std::string nextFileRaw = getRawFilePath(ls, index);
853  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
854  fsize = previousFileSize_ = buf.st_size;
855  nextFile = nextFileRaw;
856  return true;
857  }
858 
859  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
860 
861  if (stat(BUEoLSFile.c_str(), &buf) == 0) {
862  // recheck that no raw file appeared in the meantime
863  if (stat(nextFileJson.c_str(), &buf) == 0) {
864  fsize = previousFileSize_ = buf.st_size;
865  nextFile = nextFileJson;
866  return true;
867  }
868  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
869  fsize = previousFileSize_ = buf.st_size;
870  nextFile = nextFileRaw;
871  return true;
872  }
873 
874  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
875  if (indexFilesInLS < 0)
876  //parsing failed
877  return false;
878  else {
879  //check index
880  if ((int)index < indexFilesInLS) {
881  //we have 2 files, and check for 1 failed... retry (2 will never be here)
882  edm::LogError("EvFDaqDirector")
883  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
884  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
885  setExceptionState = true;
886  return false;
887  }
888  }
889  // this lumi ended, check for files
890  ++ls;
891  index = 0;
892 
893  //reached limit
894  if (maxLS >= 0 && ls > (unsigned int)maxLS)
895  return false;
896 
897  nextFileJson = getInputJsonFilePath(ls, 0);
898  nextFileRaw = getRawFilePath(ls, 0);
899  if (stat(nextFileJson.c_str(), &buf) == 0) {
900  // a new file was found at new lumisection, index 0
901  fsize = previousFileSize_ = buf.st_size;
902  nextFile = nextFileJson;
903  return true;
904  }
905  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
906  fsize = previousFileSize_ = buf.st_size;
907  nextFile = nextFileRaw;
908  return true;
909  }
910  return false;
911  }
912  }
913  // no new file found
914  return false;
915  }
916 
918  if (fu_rw_lock_stream == nullptr)
919  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
920  else {
921  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
922  unsigned int readLs = 1, readIndex = 0;
923  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
924  }
925  }
926 
928  if (create) {
930  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
931  chmod(fulockfile_.c_str(), 0766);
932  } else {
933  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
934  }
935  if (fu_readwritelock_fd_ == -1)
936  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
937  << " create:" << create << " error:" << strerror(errno);
938  else
939  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
940 
941  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
942  if (fu_rw_lock_stream == nullptr)
943  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
944  }
945 
946  void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
947 
948  void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
949 
951  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
952  flock(fulocal_rwlock_fd_, LOCK_SH);
953  }
954 
956  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
957  flock(fulocal_rwlock_fd_, LOCK_UN);
958  }
959 
961  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
962  flock(fulocal_rwlock_fd2_, LOCK_EX);
963  }
964 
966  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
967  flock(fulocal_rwlock_fd2_, LOCK_UN);
968  }
969 
970  void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
971  //used for backpressure mechanisms and monitoring
972  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
973  struct stat buf;
974  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
975  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
976  close(bol_fd);
977  }
978  }
979 
980  void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
981  const uint32_t currentLumiSection,
982  bool doCreateBoLS,
983  bool doCreateEoLS) {
984  if (currentLumiSection > 0) {
985  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
986  struct stat buf;
987  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
988  if (!found) {
989  if (doCreateEoLS) {
990  int eol_fd =
991  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
992  close(eol_fd);
993  }
994  if (doCreateBoLS)
995  createBoLSFile(lumiSection, false);
996  }
997  } else if (doCreateBoLS) {
998  createBoLSFile(lumiSection, true); //needed for initial lumisection
999  }
1000  }
1001 
1003  int& rawFd,
1004  uint16_t& rawHeaderSize,
1005  uint16_t& rawDataType,
1006  uint32_t& lsFromHeader,
1007  int32_t& eventsFromHeader,
1008  int64_t& fileSizeFromHeader,
1009  bool requireHeader,
1010  bool retry,
1011  bool closeFile) {
1012  int infile;
1013 
1014  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1015  if (retry) {
1016  edm::LogWarning("EvFDaqDirector")
1017  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1018  return parseFRDFileHeader(rawSourcePath,
1019  rawFd,
1020  rawHeaderSize,
1021  rawDataType,
1022  lsFromHeader,
1023  eventsFromHeader,
1024  fileSizeFromHeader,
1025  requireHeader,
1026  false,
1027  closeFile);
1028  } else {
1029  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1030  edm::LogError("EvFDaqDirector")
1031  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1032  if (errno == ENOENT)
1033  return 1; // error && file not found
1034  else
1035  return -1;
1036  }
1037  }
1038  }
1039 
1040  //v2 is the largest possible read
1041  char hdr[sizeof(FRDFileHeader_v2)];
1042  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1043  return -1;
1044 
1046  uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1047 
1048  if (frd_version == 0) {
1049  //no header (specific sequence not detected)
1050  if (requireHeader) {
1051  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1052  close(infile);
1053  return -1;
1054  } else {
1055  //no header, but valid file
1056  lseek(infile, 0, SEEK_SET);
1057  rawHeaderSize = 0;
1058  lsFromHeader = 0;
1059  eventsFromHeader = -1;
1060  fileSizeFromHeader = -1;
1061  }
1062  } else if (frd_version == 1) {
1063  //version 1 header
1064  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1065  return -1;
1067  uint32_t headerSizeRaw = fhContent->headerSize_;
1068  if (headerSizeRaw != sizeof(FRDFileHeader_v1)) {
1069  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1070  << " v:" << frd_version;
1071  close(infile);
1072  return -1;
1073  }
1074  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1075  rawDataType = 0;
1076  lsFromHeader = fhContent->lumiSection_;
1077  eventsFromHeader = (int32_t)fhContent->eventCount_;
1078  fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1079  rawHeaderSize = fhContent->headerSize_;
1080 
1081  } else if (frd_version == 2) {
1082  //version 2 heade
1083  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1084  return -1;
1086  uint32_t headerSizeRaw = fhContent->headerSize_;
1087  if (headerSizeRaw != sizeof(FRDFileHeader_v2)) {
1088  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1089  << " v:" << frd_version;
1090  close(infile);
1091  return -1;
1092  }
1093  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1094  rawDataType = fhContent->dataType_;
1095  lsFromHeader = fhContent->lumiSection_;
1096  eventsFromHeader = (int32_t)fhContent->eventCount_;
1097  fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1098  rawHeaderSize = fhContent->headerSize_;
1099  }
1100 
1101  if (closeFile) {
1102  close(infile);
1103  infile = -1;
1104  }
1105 
1106  rawFd = infile;
1107  return 0; //OK
1108  }
1109 
1110  bool EvFDaqDirector::checkFileRead(char* buf, int infile, std::size_t buf_sz, std::string const& path) {
1111  ssize_t sz_read = ::read(infile, buf, buf_sz);
1112  if (sz_read < 0) {
1113  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << path << " : " << strerror(errno);
1114  if (infile != -1)
1115  close(infile);
1116  return false;
1117  }
1118  if ((size_t)sz_read < buf_sz) {
1119  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << path;
1120  if (infile != -1)
1121  close(infile);
1122  return false;
1123  }
1124  return true;
1125  }
1126 
1127  bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
1128  int infile;
1129  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1130  edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1131  << strerror(errno);
1132  return false;
1133  }
1134  //try to read FRD header size (v2 is the biggest, use read buffer of that size)
1135  char hdr[sizeof(FRDFileHeader_v2)];
1136  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1137  return false;
1139  uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1140 
1141  if (frd_version == 1) {
1142  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1143  return false;
1145  rawHeaderSize = fhContent->headerSize_;
1146  close(infile);
1147  return true;
1148  } else if (frd_version == 2) {
1149  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1150  return false;
1152  rawHeaderSize = fhContent->headerSize_;
1153  close(infile);
1154  return true;
1155  } else
1156  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unknown version: " << frd_version;
1157 
1158  close(infile);
1159  rawHeaderSize = 0;
1160  return false;
1161  }
1162 
1164  int& rawFd,
1165  uint16_t& rawHeaderSize,
1166  int64_t& fileSizeFromHeader,
1167  bool& fileFound,
1168  uint32_t serverLS,
1169  bool closeFile,
1170  bool requireHeader) {
1171  fileFound = true;
1172 
1173  //take only first three tokens delimited by "_" in the renamed raw file name
1174  std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1175  size_t pos = 0, n_tokens = 0;
1176  while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1177  }
1178  std::string reducedJsonStem = jsonStem.substr(0, pos);
1179 
1180  std::ostringstream fileNameWithPID;
1181  //should be ported to use fffnaming
1182  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1183 
1184  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1185 
1186  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1187 
1188  //parse RAW file header if it exists
1189  uint32_t lsFromRaw;
1190  int32_t nbEventsWrittenRaw;
1191  int64_t fileSizeFromRaw;
1192  uint16_t rawDataType;
1193  auto ret = parseFRDFileHeader(rawSourcePath,
1194  rawFd,
1195  rawHeaderSize,
1196  rawDataType,
1197  lsFromRaw,
1198  nbEventsWrittenRaw,
1199  fileSizeFromRaw,
1200  requireHeader,
1201  true,
1202  closeFile);
1203  if (ret != 0) {
1204  if (ret == 1)
1205  fileFound = false;
1206  return -1;
1207  }
1208 
1209  int outfile;
1210  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1211  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1212  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1213  if (errno == EEXIST) {
1214  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1215  << " : ";
1216  return -1;
1217  }
1218  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1219  << strerror(errno);
1220  struct stat out_stat;
1221  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1222  edm::LogWarning("EvFDaqDirector")
1223  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1224  << jsonDestPath;
1225  if (unlink(jsonDestPath.c_str()) == -1) {
1226  edm::LogWarning("EvFDaqDirector")
1227  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1228  }
1229  }
1230  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1231  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1232  << jsonDestPath << " : " << strerror(errno);
1233  return -1;
1234  }
1235  }
1236  //write JSON file (TODO: use jsoncpp)
1237  std::stringstream ss;
1238  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1239  std::string sstr = ss.str();
1240 
1241  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1242  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1243  << " : " << strerror(errno);
1244  return -1;
1245  }
1246  close(outfile);
1247  if (serverLS && serverLS != lsFromRaw)
1248  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1249  << " and raw file header LS " << lsFromRaw;
1250 
1251  fileSizeFromHeader = fileSizeFromRaw;
1252  return nbEventsWrittenRaw;
1253  }
1254 
1256  std::string const& rawSourcePath,
1257  int64_t& fileSizeFromJson,
1258  bool& fileFound) {
1259  fileFound = true;
1260 
1261  //should be ported to use fffnaming
1262  std::ostringstream fileNameWithPID;
1263  fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1264  << std::setw(5) << pid_ << ".jsn";
1265 
1266  // assemble json destination path
1267  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1268 
1269  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1270 
1271  int infile = -1, outfile = -1;
1272 
1273  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1274  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1275  << strerror(errno);
1276  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1277  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1278  << jsonSourcePath << " : " << strerror(errno);
1279  if (errno == ENOENT)
1280  fileFound = false;
1281  return -1;
1282  }
1283  }
1284 
1285  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1286  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1287  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1288  if (errno == EEXIST) {
1289  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1290  << " : ";
1291  ::close(infile);
1292  return -1;
1293  }
1294  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1295  << strerror(errno);
1296  struct stat out_stat;
1297  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1298  edm::LogWarning("EvFDaqDirector")
1299  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1300  if (unlink(jsonDestPath.c_str()) == -1) {
1301  edm::LogWarning("EvFDaqDirector")
1302  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1303  }
1304  }
1305  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1306  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1307  << jsonDestPath << " : " << strerror(errno);
1308  ::close(infile);
1309  return -1;
1310  }
1311  }
1312  //copy contents
1313  const std::size_t buf_sz = 512;
1314  std::size_t tot_written = 0;
1315  std::unique_ptr<char[]> buf(new char[buf_sz]);
1316 
1317  ssize_t sz, sz_read = 1, sz_write;
1318  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1319  sz_write = 0;
1320  do {
1321  assert(sz_read - sz_write > 0);
1322  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1323  sz_read = sz; // cause read loop termination
1324  break;
1325  }
1326  assert(sz > 0);
1327  sz_write += sz;
1328  tot_written += sz;
1329  } while (sz_write < sz_read);
1330  }
1331  close(infile);
1332  close(outfile);
1333 
1334  if (tot_written > 0) {
1335  //leave file if it was empty for diagnosis
1336  if (unlink(jsonSourcePath.c_str()) == -1) {
1337  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1338  << strerror(errno);
1339  return -1;
1340  }
1341  } else {
1342  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1343  << jsonSourcePath;
1344  return -1;
1345  }
1346 
1347  Json::Value deserializeRoot;
1349 
1350  std::string data;
1351  std::stringstream ss;
1352  bool result;
1353  try {
1354  if (tot_written <= buf_sz) {
1355  result = reader.parse(buf.get(), deserializeRoot);
1356  } else {
1357  //json will normally not be bigger than buf_sz bytes
1358  try {
1359  std::ifstream ij(jsonDestPath);
1360  ss << ij.rdbuf();
1361  } catch (std::filesystem::filesystem_error const& ex) {
1362  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1363  return -1;
1364  }
1365  result = reader.parse(ss.str(), deserializeRoot);
1366  }
1367  if (!result) {
1368  if (tot_written <= buf_sz)
1369  ss << buf.get();
1370  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1371  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1372  << ss.str() << ".";
1373  return -1;
1374  }
1375 
1376  //read BU JSON
1377  DataPoint dp;
1378  dp.deserialize(deserializeRoot);
1379  bool success = false;
1380  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1381  if (dpd_->getNames().at(i) == "NEvents")
1382  if (i < dp.getData().size()) {
1383  data = dp.getData()[i];
1384  success = true;
1385  break;
1386  }
1387  }
1388  if (!success) {
1389  if (!dp.getData().empty())
1390  data = dp.getData()[0];
1391  else {
1392  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1393  << "grabNextJsonFile - "
1394  << " error reading number of events from BU JSON; No input value. data -: " << data;
1395  return -1;
1396  }
1397  }
1398 
1399  //try to read raw file size
1400  fileSizeFromJson = -1;
1401  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1402  if (dpd_->getNames().at(i) == "NBytes") {
1403  if (i < dp.getData().size()) {
1404  std::string dataSize = dp.getData()[i];
1405  try {
1406  fileSizeFromJson = std::stol(dataSize);
1407  } catch (const std::exception&) {
1408  //non-fatal currently, processing can continue without this value
1409  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1410  << "Input value is -: " << dataSize;
1411  }
1412  break;
1413  }
1414  }
1415  }
1416  return std::stoi(data);
1417  } catch (const std::out_of_range& e) {
1418  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1419  << "Input value is -: " << data;
1420  } catch (const std::invalid_argument& e) {
1421  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1422  << "Input value is -: " << data;
1423  } catch (std::runtime_error const& e) {
1424  //Can be thrown by Json parser
1425  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1426  }
1427 
1428  catch (std::exception const& e) {
1429  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1430  } catch (...) {
1431  //unknown exception
1432  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1433  }
1434 
1435  return -1;
1436  }
1437 
1439  std::string data;
1440  try {
1441  // assemble json destination path
1442  std::filesystem::path jsonDestPath(baseRunDir());
1443 
1444  //should be ported to use fffnaming
1445  std::ostringstream fileNameWithPID;
1446  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1447  << ".jsn";
1448  jsonDestPath /= fileNameWithPID.str();
1449 
1450  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1451  try {
1452  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1453  } catch (std::filesystem::filesystem_error const& ex) {
1454  // Input dir gone?
1455  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1456  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1457  sleep(1);
1458  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1459  }
1460  unlockFULocal();
1461 
1462  try {
1463  //sometimes this fails but file gets deleted
1464  std::filesystem::remove(jsonSourcePath);
1465  } catch (std::filesystem::filesystem_error const& ex) {
1466  // Input dir gone?
1467  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1468  } catch (std::exception const& ex) {
1469  // Input dir gone?
1470  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1471  }
1472 
1473  std::ifstream ij(jsonDestPath);
1474  Json::Value deserializeRoot;
1476 
1477  std::stringstream ss;
1478  ss << ij.rdbuf();
1479  if (!reader.parse(ss.str(), deserializeRoot)) {
1480  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1481  << "\nERROR:\n"
1482  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1483  << ss.str() << ".";
1484  throw std::runtime_error("Cannot deserialize input JSON file");
1485  }
1486 
1487  //read BU JSON
1488  std::string data;
1489  DataPoint dp;
1490  dp.deserialize(deserializeRoot);
1491  bool success = false;
1492  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1493  if (dpd_->getNames().at(i) == "NEvents")
1494  if (i < dp.getData().size()) {
1495  data = dp.getData()[i];
1496  success = true;
1497  }
1498  }
1499  if (!success) {
1500  if (!dp.getData().empty())
1501  data = dp.getData()[0];
1502  else
1503  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1504  << " error reading number of events from BU JSON -: No input value " << data;
1505  }
1506  return std::stoi(data);
1507  } catch (std::filesystem::filesystem_error const& ex) {
1508  // Input dir gone?
1509  unlockFULocal();
1510  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1511  } catch (std::runtime_error const& e) {
1512  // Another process grabbed the file and NFS did not register this
1513  unlockFULocal();
1514  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1515  } catch (const std::out_of_range&) {
1516  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1517  << "Input value is -: " << data;
1518  } catch (const std::invalid_argument&) {
1519  edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1520  << "Input value is -: " << data;
1521  } catch (std::exception const& e) {
1522  // BU run directory disappeared?
1523  unlockFULocal();
1524  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1525  }
1526 
1527  return -1;
1528  }
1529 
1531  bool& serverError,
1532  uint32_t& serverLS,
1533  uint32_t& closedServerLS,
1534  std::string& nextFileJson,
1535  std::string& nextFileRaw,
1536  bool& rawHeader,
1537  int maxLS) {
1538  EvFDaqDirector::FileStatus fileStatus = noFile;
1539  serverError = false;
1540 
1541  boost::system::error_code ec;
1542  try {
1543  while (true) {
1544  //socket connect
1545  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1547 
1548  if (ec) {
1549  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1550  serverError = true;
1551  break;
1552  }
1553  }
1554 
1555  boost::asio::streambuf request;
1556  std::ostream request_stream(&request);
1557  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1558  if (maxLS >= 0) {
1559  std::stringstream spath;
1560  spath << path << "&stopls=" << maxLS;
1561  path = spath.str();
1562  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1563  }
1564  request_stream << "GET " << path << " HTTP/1.1\r\n";
1565  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1566  request_stream << "Accept: */*\r\n";
1567  request_stream << "Connection: keep-alive\r\n\r\n";
1568 
1569  boost::asio::write(*socket_, request, ec);
1570  if (ec) {
1571  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1572  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1573  //we got disconnected, try to reconnect to the server before writing the request
1575  if (ec) {
1576  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1577  serverError = true;
1578  break;
1579  }
1580  continue;
1581  }
1582  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1583  serverError = true;
1584  break;
1585  }
1586 
1587  boost::asio::streambuf response;
1588  boost::asio::read_until(*socket_, response, "\r\n", ec);
1589  if (ec) {
1590  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1591  serverError = true;
1592  break;
1593  }
1594 
1595  std::istream response_stream(&response);
1596 
1597  std::string http_version;
1598  response_stream >> http_version;
1599 
1600  response_stream >> serverHttpStatus;
1601 
1602  std::string status_message;
1603  std::getline(response_stream, status_message);
1604  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1605  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1606  serverError = true;
1607  break;
1608  }
1609  if (serverHttpStatus != 200) {
1610  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1611  serverError = true;
1612  break;
1613  }
1614 
1615  // Process the response headers.
1617  while (std::getline(response_stream, header) && header != "\r") {
1618  }
1619 
1620  std::string fileInfo;
1621  std::map<std::string, std::string> serverMap;
1622  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1623  auto pos = fileInfo.find('=');
1624  if (pos == std::string::npos)
1625  continue;
1626  auto stitle = fileInfo.substr(0, pos);
1627  auto svalue = fileInfo.substr(pos + 1);
1628  serverMap[stitle] = svalue;
1629  }
1630 
1631  //check that response run number if correct
1632  auto server_version = serverMap.find("version");
1633  assert(server_version != serverMap.end());
1634 
1635  auto server_run = serverMap.find("runnumber");
1636  assert(server_run != serverMap.end());
1637  assert(run_nstring_ == server_run->second);
1638 
1639  auto server_state = serverMap.find("state");
1640  assert(server_state != serverMap.end());
1641 
1642  auto server_eols = serverMap.find("lasteols");
1643  assert(server_eols != serverMap.end());
1644 
1645  auto server_ls = serverMap.find("lumisection");
1646 
1647  int version_maj = 1;
1648  int version_min = 0;
1649  int version_rev = 0;
1650  {
1651  auto* s_ptr = server_version->second.c_str();
1652  if (!server_version->second.empty() && server_version->second[0] == '"')
1653  s_ptr++;
1654  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1655  if (res < 3) {
1656  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1657  if (res < 2) {
1658  res = sscanf(s_ptr, "%d", &version_maj);
1659  if (res < 1) {
1660  //expecting at least 1 number (major version)
1661  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1662  }
1663  }
1664  }
1665  }
1666 
1667  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1668  if (server_ls != serverMap.end())
1669  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1670  else
1671  serverLS = closedServerLS + 1;
1672 
1673  std::string s_state = server_state->second;
1674  if (s_state == "STARTING") //initial, always empty starting with LS 1
1675  {
1676  auto server_file = serverMap.find("file");
1677  assert(server_file == serverMap.end()); //no file with starting state
1678  fileStatus = noFile;
1679  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1680  } else if (s_state == "READY") {
1681  auto server_file = serverMap.find("file");
1682  if (server_file == serverMap.end()) {
1683  //can be returned by server if files from new LS already appeared but LS is not yet closed
1684  if (serverLS <= closedServerLS)
1685  serverLS = closedServerLS + 1;
1686  fileStatus = noFile;
1687  edm::LogInfo("EvFDaqDirector")
1688  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1689  } else {
1690  std::string filestem;
1691  std::string fileprefix;
1692  auto server_fileprefix = serverMap.find("fileprefix");
1693 
1694  if (server_fileprefix != serverMap.end()) {
1695  auto pssize = server_fileprefix->second.size();
1696  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1697  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1698  else
1699  fileprefix = server_fileprefix->second;
1700  }
1701 
1702  //remove string literals
1703  auto ssize = server_file->second.size();
1704  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1705  filestem = server_file->second.substr(1, ssize - 2);
1706  else
1707  filestem = server_file->second;
1708  assert(!filestem.empty());
1709  if (version_maj > 1) {
1710  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1711  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1712  nextFileJson = "";
1713  rawHeader = true;
1714  } else {
1715  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1716  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1717  nextFileJson = filestem + ".jsn";
1718  rawHeader = false;
1719  }
1720  fileStatus = newFile;
1721  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1722  << serverLS << " file:" << filestem;
1723  }
1724  } else if (s_state == "EOLS") {
1725  serverLS = closedServerLS + 1;
1726  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1727  fileStatus = noFile;
1728  } else if (s_state == "EOR") {
1729  //server_eor = serverMap.find("iseor");
1730  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1731  fileStatus = runEnded;
1732  } else if (s_state == "NORUN") {
1733  auto err_msg = serverMap.find("errormessage");
1734  if (err_msg != serverMap.end())
1735  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1736  else
1737  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1738  edm::LogWarning("EvFDaqDirector") << "executing run end";
1739  fileStatus = runEnded;
1740  } else if (s_state == "ERROR") {
1741  auto err_msg = serverMap.find("errormessage");
1742  if (err_msg != serverMap.end())
1743  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1744  else
1745  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1746  fileStatus = noFile;
1747  serverError = true;
1748  } else {
1749  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1750  fileStatus = noFile;
1751  serverError = true;
1752  }
1753 
1754  // Read until EOF, writing data to output as we go.
1755  if (!fileBrokerKeepAlive_) {
1756  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1757  }
1758  if (ec != boost::asio::error::eof) {
1759  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1760  serverError = true;
1761  }
1762  }
1763 
1764  break;
1765  }
1766 
1767  } catch (std::exception const& e) {
1768  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1769  serverError = true;
1770  }
1771 
1772  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1773  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1774  if (ec) {
1775  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1776  }
1777  socket_->close(ec);
1778  if (ec) {
1779  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1780  }
1781  }
1782 
1783  if (serverError) {
1784  if (socket_->is_open())
1785  socket_->close(ec);
1786  if (ec) {
1787  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1788  }
1789  fileStatus = noFile;
1790  sleep(1); //back-off if error detected
1791  }
1792 
1793  return fileStatus;
1794  }
1795 
1797  unsigned int& ls,
1798  std::string& nextFileRaw,
1799  int& rawFd,
1800  uint16_t& rawHeaderSize,
1801  int32_t& serverEventsInNewFile,
1802  int64_t& fileSizeFromMetadata,
1803  uint64_t& thisLockWaitTimeUs,
1804  bool requireHeader) {
1805  EvFDaqDirector::FileStatus fileStatus = noFile;
1806 
1807  //int retval = -1;
1808  //int lock_attempts = 0;
1809  //long total_lock_attempts = 0;
1810 
1811  struct stat buf;
1812  int stopFileLS = -1;
1813  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1814  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1815  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1816  if (stopFileCheck == 0)
1817  stopFileLS = readLastLSEntry(stopFilePath_);
1818  else
1819  stopFileLS = 1; //stop without drain if only pid is stopped
1820  if (!stop_ls_override_) {
1821  //if lumisection is higher than in stop file, should quit at next from current
1822  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1823  stopFileLS = stop_ls_override_ = ls;
1824  } else
1825  stopFileLS = stop_ls_override_;
1826  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1827  << stopFileLS;
1828  //return runEnded;
1829  } else //if file was removed before reaching stop condition, reset this
1830  stop_ls_override_ = 0;
1831 
1832  /* look for EoLS
1833  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1834  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1835  ls++;
1836  return noFile;
1837  }
1838  */
1839 
1840  timeval ts_lockbegin;
1841  gettimeofday(&ts_lockbegin, nullptr);
1842 
1843  std::string nextFileJson;
1844  uint32_t serverLS, closedServerLS;
1845  unsigned int serverHttpStatus;
1846  bool serverError;
1847 
1848  //local lock to force index json and EoLS files to appear in order
1850  lockFULocal();
1851 
1852  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1853  bool rawHeader = false;
1854  fileStatus = contactFileBroker(
1855  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1856 
1857  if (serverError) {
1858  //do not update anything
1860  unlockFULocal();
1861  return noFile;
1862  }
1863 
1864  //handle creation of BoLS files if lumisection has changed
1865  if (currentLumiSection == 0) {
1866  if (fileStatus == runEnded)
1867  createLumiSectionFiles(closedServerLS, 0, true, false);
1868  else
1869  createLumiSectionFiles(serverLS, 0, true, false);
1870  } else {
1871  if (closedServerLS >= currentLumiSection) {
1872  //only BoLS files
1873  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1874  createLumiSectionFiles(i + 1, i, true, false);
1875  }
1876  }
1877 
1878  bool fileFound = true;
1879 
1880  if (fileStatus == newFile) {
1881  if (rawHeader > 0)
1882  serverEventsInNewFile = grabNextJsonFromRaw(
1883  nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false, requireHeader);
1884  else
1885  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1886  }
1887  //closing file in case of any error
1888  if (serverEventsInNewFile < 0 && rawFd != -1) {
1889  close(rawFd);
1890  rawFd = -1;
1891  }
1892 
1893  //can unlock because all files have been created locally
1895  unlockFULocal();
1896 
1897  if (!fileFound) {
1898  //catch condition where directory got deleted
1899  fileStatus = noFile;
1900  struct stat buf;
1901  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1902  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1903  fileStatus = runEnded;
1904  }
1905  }
1906 
1907  //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1908  //so that EoLS files can not appear locally before index files
1909  if (currentLumiSection == 0) {
1910  lockFULocal2();
1911  if (fileStatus == runEnded) {
1912  createLumiSectionFiles(closedServerLS, 0, false, true);
1913  createLumiSectionFiles(serverLS, closedServerLS, false, true); // +1
1914  } else {
1915  createLumiSectionFiles(serverLS, 0, false, true);
1916  }
1917  unlockFULocal2();
1918  } else {
1919  if (closedServerLS >= currentLumiSection) {
1920  //lock exclusive to create EoLS files
1921  lockFULocal2();
1922  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1923  createLumiSectionFiles(i + 1, i, false, true);
1924  unlockFULocal2();
1925  }
1926  }
1927 
1928  if (fileStatus == runEnded)
1929  ls = std::max(currentLumiSection, serverLS);
1930  else if (fileStatus == newFile) {
1931  assert(serverLS >= ls);
1932  ls = serverLS;
1933  } else if (fileStatus == noFile) {
1934  if (serverLS >= ls)
1935  ls = serverLS;
1936  else {
1937  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1938  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1939  sleep(1);
1940  }
1941  }
1942 
1943  return fileStatus;
1944  }
1945 
1947  // create open dir if not already there
1948 
1950  if (!std::filesystem::is_directory(openPath)) {
1951  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1952  std::filesystem::create_directories(openPath);
1953  }
1954  }
1955 
1957  std::ifstream ij(file);
1958  Json::Value deserializeRoot;
1960 
1961  if (!reader.parse(ij, deserializeRoot)) {
1962  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1963  return -1;
1964  }
1965 
1966  int ret = deserializeRoot.get("lastLS", "").asInt();
1967  return ret;
1968  }
1969 
1971  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1973  struct stat buf;
1974  unsigned int lscount = 1;
1975  do {
1976  std::stringstream ss;
1977  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1978  fullpath = ss.str();
1979  lscount++;
1980  } while (stat(fullpath.c_str(), &buf) == 0);
1981  return lscount - 1;
1982  }
1983 
1984  //if transferSystem PSet is present in the menu, we require it to be complete and consistent for all specified streams
1986  if (transferSystemJson_)
1987  return;
1988 
1989  transferSystemJson_.reset(new Json::Value);
1990  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1991  if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
1992  const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
1993 
1994  Json::Value destinationsVal(Json::arrayValue);
1995  std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
1996  for (auto& dest : destinations)
1997  destinationsVal.append(dest);
1998  (*transferSystemJson_)["destinations"] = destinationsVal;
1999 
2000  Json::Value modesVal(Json::arrayValue);
2001  std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
2002  for (auto& mode : modes)
2003  modesVal.append(mode);
2004  (*transferSystemJson_)["transferModes"] = modesVal;
2005 
2006  for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
2007  if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
2008  const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
2009  Json::Value streamVal;
2010  for (auto& mode : modes) {
2011  //validation
2012  if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
2013  throw cms::Exception("EvFDaqDirector")
2014  << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
2015  << ")";
2016  std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
2017 
2018  Json::Value sDestsValue(Json::arrayValue);
2019 
2020  if (streamDestinations.empty())
2021  throw cms::Exception("EvFDaqDirector")
2022  << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;
2023 
2024  for (auto& sdest : streamDestinations) {
2025  bool sDestValid = false;
2026  sDestsValue.append(sdest);
2027  for (auto& dest : destinations) {
2028  if (dest == sdest)
2029  sDestValid = true;
2030  }
2031  if (!sDestValid)
2032  throw cms::Exception("EvFDaqDirector")
2033  << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
2034  << ", dest:" << sdest;
2035  }
2036  streamVal[mode] = sDestsValue;
2037  }
2038  (*transferSystemJson_)[psKeyItr->first] = streamVal;
2039  }
2040  }
2041  } else {
2042  if (requireTSPSet_)
2043  throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
2044  }
2045  }
2046 
2048  std::string streamRequestName;
2049  if (transferSystemJson_->isMember(stream.c_str()))
2050  streamRequestName = stream;
2051  else {
2052  std::stringstream msg;
2053  msg << "Transfer system mode definitions missing for -: " << stream;
2054  if (requireTSPSet_)
2055  throw cms::Exception("EvFDaqDirector") << msg.str();
2056  else {
2057  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
2058  return std::string("Failsafe");
2059  }
2060  }
2061  //return empty if strict check parameter is not on
2062  if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
2063  edm::LogWarning("EvFDaqDirector")
2064  << "Selected mode string is not provided as DaqDirector parameter."
2065  << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
2066  return std::string("Failsafe");
2067  }
2068  if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
2069  throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
2070  }
2071  //check if stream has properly listed transfer stream
2072  if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
2073  std::stringstream msg;
2074  msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
2075  if (requireTSPSet_)
2076  throw cms::Exception("EvFDaqDirector") << msg.str();
2077  else
2078  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
2079  return std::string("Failsafe");
2080  }
2081  Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");
2082 
2083  //flatten string json::Array into CSV std::string
2084  std::string ret;
2085  for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
2086  if (!ret.empty())
2087  ret += ",";
2088  ret += (*it).asString();
2089  }
2090  return ret;
2091  }
2092 
2094  if (mergeTypePset_.empty())
2095  return;
2096  if (!mergeTypeMap_.empty())
2097  return;
2098  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
2099  if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
2100  const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
2101  for (const std::string& pname : tsPset.getParameterNames()) {
2102  std::string streamType = tsPset.getParameter<std::string>(pname);
2103  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2104  mergeTypeMap_.insert(ac, pname);
2105  ac->second = streamType;
2106  ac.release();
2107  }
2108  }
2109  }
2110 
2112  tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
2113  if (mergeTypeMap_.find(search_ac, stream))
2114  return search_ac->second;
2115 
2116  edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
2117  std::string defaultName = MergeTypeNames_[defaultType];
2118  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2119  mergeTypeMap_.insert(ac, stream);
2120  ac->second = defaultName;
2121  ac.release();
2122  return defaultName;
2123  }
2124 
2126  std::string proc_flag = run_dir_ + "/processing";
2127  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2128  close(proc_flag_fd);
2129  }
2130 
2131  struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
2132 #ifdef __APPLE__
2133  return {start, len, pid, type, whence};
2134 #else
2135  return {type, whence, start, len, pid};
2136 #endif
2137  }
2138 
2140  struct stat buf;
2141  return (stat(input_throttled_file_.c_str(), &buf) == 0);
2142  }
2143 
2145  struct stat buf;
2146  return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2147  }
2148 
2149 } // namespace evf
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:80
unsigned int nThreads_
Definition: fillJson.h:27
int def(FILE *, FILE *, int)
struct flock fu_rw_flk
unsigned int nConcurrentLumis_
const_iterator end() const
std::vector< std::string > bu_base_dirs_all_
std::string run_string_
LuminosityBlockNumber_t luminosityBlock() const
T getParameter(std::string const &) const
Definition: ParameterSet.h:307
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::string bolsFileName(const unsigned int run, const unsigned int ls)
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
void watchPreallocate(Preallocate::slot_type const &iSlot)
boost::asio::io_service io_service_
const_iterator begin() const
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::string fulockfile_
def create(alignables, pedeDump, additionalData, outputFile, config)
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
std::shared_ptr< Json::Value > transferSystemJson_
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
static bool checkFileRead(char *buf, int infile, std::size_t buf_sz, std::string const &path)
jsoncollector::DataPointDefinition * dpd_
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::string getFFFParamsFilePathOnBU() const
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
Int asInt() const
ret
prodAgent to be discontinued
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
volatile std::atomic< bool > shutdown_flag
pthread_mutex_t init_lock_
ParameterSet const & getParameterSet(std::string const &) const
Value get(UInt index, const Value &defaultValue) const
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Value & append(const Value &value)
Append value to array at the end.
void removeFile(std::string)
bool lumisectionDiscarded(unsigned int ls)
reader
Definition: DQM.py:105
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:172
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string getOpenInitFilePath(std::string const &stream) const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
assert(be >=bs)
bool isExceptionOnData(unsigned int ls)
std::string getStreamDestinations(std::string const &stream) const
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: Electron.h:6
unsigned long previousFileSize_
static std::string to_string(const XMLCh *ch)
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
std::string getInitTempFilePath(std::string const &stream) const
std::string getEoRFilePathOnFU() const
struct flock fu_rw_fulk
std::string hltSourceDirectory_
unsigned int startFromLS_
std::array< uint8_t, 4 > id_
Definition: FRDFileHeader.h:29
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::mutex * fileDeleteLockPtr_
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
std::string mergeTypePset_
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def chmod(path, mode)
Definition: eostools.py:294
std::string getRunOpenDirPath() const
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
std::string stopFilePath_
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string bu_base_dir_
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:30
unsigned int stop_ls_override_
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::string selectedTransferMode_
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string input_throttled_file_
std::string getBoLSFilePathOnFU(const unsigned int ls) const
std::string eorFileName(const unsigned int run)
int readLastLSEntry(std::string const &file)
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string stopFilePathPid_
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:62
unsigned int eolsNFilesIndex_
std::string & baseRunDir()
std::string run_nstring_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
std::string getInitFilePath(std::string const &stream) const
void createProcessingNotificationMaybe() const
Log< level::Info, false > LogInfo
void openFULockfileStream(bool create)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
int getNFilesFromEoLS(std::string BUEoLSFile)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
def ls(path, rec=False)
Definition: eostools.py:349
void preBeginRun(edm::GlobalContext const &globalContext)
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned long long uint64_t
Definition: Time.h:13
void checkMergeTypePSet(edm::ProcessContext const &pc)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
std::vector< std::string > const & getNames() const
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
def load(fileName)
Definition: svgfig.py:547
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
std::string getEoRFileName() const
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:223
tuple msg
Definition: mps_check.py:286
evf::FastMonitoringService * fms_
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::string bu_run_dir_
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
ParameterSet const & getParameterSet(ParameterSetID const &id)
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
std::string getEoRFilePath() const
def mkdir(path)
Definition: eostools.py:251
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
std::string findHighestRunDir()
Definition: DirManager.cc:23
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
void postEndRun(edm::GlobalContext const &globalContext)
std::string discard_ls_filestem_
Unserialize a JSON document into a Value.
Definition: reader.h:16
static const std::vector< std::string > MergeTypeNames_
unsigned int getLumisectionToStart() const
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
void checkTransferSystemPSet(edm::ProcessContext const &pc)
std::string fileBrokerPort_
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
std::string initTempFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
void preallocate(edm::service::SystemBounds const &bounds)
Log< level::Warning, false > LogWarning
Iterator for object and array value.
Definition: value.h:908
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
std::string fileBrokerHost_
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned int fuLockPollInterval_
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
std::string eolsFileName(const unsigned int run, const unsigned int ls)
unsigned int nStreams_
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
ParameterSetID const & parameterSetID() const
std::string bu_run_open_dir_
#define LogDebug(id)
array value (ordered list)
Definition: value.h:30