CMS 3D CMS Logo

EvFDaqDirector.cc
Go to the documentation of this file.
11 
19 
20 #include <iostream>
21 #include <fstream>
22 #include <sstream>
23 #include <sys/time.h>
24 #include <unistd.h>
25 #include <cstdio>
26 #include <boost/algorithm/string.hpp>
27 
28 //using boost::asio::ip::tcp;
29 
30 //#define DEBUG
31 
32 using namespace jsoncollector;
33 
34 namespace evf {
35 
36  //for enum MergeType
37  const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
38 
40  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
41  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
42  bu_base_dirs_all_(pset.getUntrackedParameter<std::vector<std::string>>("buBaseDirsAll")),
43  bu_base_dirs_nSources_(pset.getUntrackedParameter<std::vector<int>>("buBaseDirsNumStreams")),
44  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
45  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
46  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
47  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
48  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
49  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
50  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
51  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
52  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
53  requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
54  selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
55  mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
56  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
57  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
58  hostname_(""),
59  bu_readlock_fd_(-1),
60  bu_writelock_fd_(-1),
61  fu_readwritelock_fd_(-1),
62  fulocal_rwlock_fd_(-1),
63  fulocal_rwlock_fd2_(-1),
64  bu_w_lock_stream(nullptr),
65  bu_r_lock_stream(nullptr),
66  fu_rw_lock_stream(nullptr),
67  dirManager_(base_dir_),
68  previousFileSize_(0),
69  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
70  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
71  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
72  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
73  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
74  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
80 
81  //save hostname for later
82  char hostname[33];
83  gethostname(hostname, 32);
84  hostname_ = hostname;
85 
86  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
87  if (fuLockPollIntervalPtr) {
88  try {
89  fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
90  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
91  << " us";
92  } catch (const std::exception&) {
93  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
94  }
95  }
96 
97  //override file service parameter if specified by environment
98  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
99  if (fileBrokerParamPtr) {
100  try {
101  useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
102  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
103  } catch (const std::exception&) {
104  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
105  }
106  }
107  if (useFileBroker_) {
109  //find BU data address from hltd configuration
111  struct stat buf;
112  if (stat("/etc/appliance/bus.config", &buf) == 0) {
113  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
114  std::getline(busconfig, fileBrokerHost_);
115  }
116  if (fileBrokerHost_.empty())
117  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
118  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
119  throw cms::Exception("EvFDaqDirector")
120  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
121 
122  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
123  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
124  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
125  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
126  }
127 
128  char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
129  if (startFromLSPtr) {
130  try {
131  startFromLS_ = std::stoul(std::string(startFromLSPtr));
132  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
133  } catch (const std::exception&) {
134  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
135  }
136  }
137 
138  //override file service parameter if specified by environment
139  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
140  if (fileBrokerUseLockParamPtr) {
141  try {
142  fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
143  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
145  } catch (const std::exception&) {
146  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
147  }
148  }
149 
150  // set number of streams in each BU's ramdisk
151  if (bu_base_dirs_nSources_.empty()) {
152  // default is 1 stream per ramdisk
153  for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
154  bu_base_dirs_nSources_.push_back(1);
155  }
156  } else if (bu_base_dirs_nSources_.size() != bu_base_dirs_all_.size()) {
157  throw cms::Exception("DaqDirector")
158  << " Error while setting number of sources: size mismatch with BU base directory vector";
159  } else {
160  for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
162  edm::LogInfo("EvFDaqDirector") << "Setting " << bu_base_dirs_nSources_[i] << " sources"
163  << " for ramdisk " << bu_base_dirs_all_[i];
164  }
165  }
166 
167  std::stringstream ss;
168  ss << "run" << std::setfill('0') << std::setw(6) << run_;
169  run_string_ = ss.str();
170  ss = std::stringstream();
171  ss << run_;
172  run_nstring_ = ss.str();
173  run_dir_ = base_dir_ + "/" + run_string_;
174  input_throttled_file_ = run_dir_ + "/input_throttle";
175  discard_ls_filestem_ = run_dir_ + "/discard_ls";
176  ss = std::stringstream();
177  ss << getpid();
178  pid_ = ss.str();
179  }
180 
182  // check if base dir exists or create it accordingly
183  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
184  if (retval != 0 && errno != EEXIST) {
185  throw cms::Exception("DaqDirector")
186  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
187  }
188 
189  //create run dir in base dir
190  umask(0);
191  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
192  if (retval != 0 && errno != EEXIST) {
193  throw cms::Exception("DaqDirector")
194  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
195  }
196 
197  //create fu-local.lock in run open dir
198  if (!directorBU_) {
200  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
202  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
203  if (fulocal_rwlock_fd_ == -1)
204  throw cms::Exception("DaqDirector")
205  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
206  chmod(fulocal_lock_.c_str(), 0777);
207  fsync(fulocal_rwlock_fd_);
208  //open second fd for another input source thread
210  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
211  if (fulocal_rwlock_fd2_ == -1)
212  throw cms::Exception("DaqDirector")
213  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
214  }
215 
216  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
217  //for BU, it is created at this point
218  if (directorBU_) {
220  std::string bulockfile = bu_run_dir_ + "/bu.lock";
221  fulockfile_ = bu_run_dir_ + "/fu.lock";
222 
223  //make or find bu run dir
224  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
225  if (retval != 0 && errno != EEXIST) {
226  throw cms::Exception("DaqDirector")
227  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno);
228  }
229  bu_run_open_dir_ = bu_run_dir_ + "/open";
230  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
231  if (retval != 0 && errno != EEXIST) {
232  throw cms::Exception("DaqDirector")
233  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno);
234  }
235 
236  // the BU director does not need to know about the fu lock
237  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
238  if (bu_writelock_fd_ == -1)
239  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
240  else
241  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
242  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
243  if (bu_w_lock_stream == nullptr)
244  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
245 
246  // BU INITIALIZES LOCK FILE
247  // FU LOCK FILE OPEN
248  openFULockfileStream(true);
250  fflush(fu_rw_lock_stream);
251  close(fu_readwritelock_fd_);
252 
253  if (!hltSourceDirectory_.empty()) {
254  struct stat buf;
255  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
256  std::string hltdir = bu_run_dir_ + "/hlt";
257  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
258  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
259  if (retval != 0 && errno != EEXIST)
260  throw cms::Exception("DaqDirector")
261  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno);
262 
263  std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
264  std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
265 
266  std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
267  for (auto& optfile : optfiles) {
268  try {
269  std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
270  } catch (...) {
271  }
272  }
273 
274  std::filesystem::rename(tmphltdir, hltdir);
275  } else
276  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
277  }
278  //else{}//no configuration specified
279  } else {
280  // for FU, check if bu base dir exists
281 
282  auto checkExists = [=](std::string const& bu_base_dir) -> void {
283  int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
284  if (retval != 0 && errno != EEXIST) {
285  throw cms::Exception("DaqDirector")
286  << " Error checking for bu base dir -: " << bu_base_dir << " mkdir error:" << strerror(errno);
287  }
288  };
289 
290  auto waitForDir = [=](std::string const& bu_base_dir) -> void {
291  int cnt = 0;
292  while (!edm::shutdown_flag.load(std::memory_order_relaxed)) {
293  int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
294  if (retval != 0 && errno != EEXIST) {
295  usleep(500000);
296  cnt++;
297  if (cnt % 20 == 0)
298  edm::LogWarning("DaqDirector") << "waiting for " << bu_base_dir;
299  if (cnt > 120)
300  throw cms::Exception("DaqDirector") << " Error checking for bu base dir after 1 minute -: " << bu_base_dir
301  << " mkdir error:" << strerror(errno);
302  }
303  break;
304  }
305  };
306 
307  if (!bu_base_dirs_all_.empty()) {
308  checkExists(bu_base_dirs_all_[0]);
310  for (unsigned int i = 1; i < bu_base_dirs_all_.size(); i++)
311  waitForDir(bu_base_dirs_all_[i]);
312  } else {
313  checkExists(bu_base_dir_);
315  }
316 
317  fulockfile_ = bu_run_dir_ + "/fu.lock";
318  if (!useFileBroker_)
319  openFULockfileStream(false);
320  }
321 
322  pthread_mutex_init(&init_lock_, nullptr);
323 
324  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
325  std::stringstream sstp;
326  sstp << stopFilePath_ << "_pid" << pid_;
327  stopFilePathPid_ = sstp.str();
328 
329  if (!directorBU_) {
330  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
331  struct stat statbuf;
332  if (!stat(defPath.c_str(), &statbuf))
333  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
334  else {
335  //look in source directory if not present in ramdisk
336  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
337  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
338  if (stat(defPath.c_str(), &statbuf)) {
339  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
340  if (stat(defPath.c_str(), &statbuf)) {
341  defPath = defPathSuffix;
342  }
343  }
344  }
345  dpd_ = new DataPointDefinition();
346  std::string defLabel = "data";
347  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
348  }
349  }
350 
352  //close server connection
353  if (socket_.get() && socket_->is_open()) {
354  boost::system::error_code ec;
355  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
356  socket_->close(ec);
357  }
358 
359  if (fulocal_rwlock_fd_ != -1) {
360  unlockFULocal();
361  close(fulocal_rwlock_fd_);
362  }
363 
364  if (fulocal_rwlock_fd2_ != -1) {
365  unlockFULocal2();
366  close(fulocal_rwlock_fd2_);
367  }
368  }
369 
371  initRun();
372 
373  nThreads_ = bounds.maxNumberOfStreams();
374  nStreams_ = bounds.maxNumberOfThreads();
375  nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
376  }
377 
380  desc.setComment(
381  "Service used for file locking arbitration and for propagating information between other EvF components");
382  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
383  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
384  desc.addUntracked<std::vector<std::string>>("buBaseDirsAll", std::vector<std::string>())
385  ->setComment("BU base ramdisk directories for multi-file DAQSource models");
386  desc.addUntracked<std::vector<int>>("buBaseDirsNumStreams", std::vector<int>())
387  ->setComment("Number of streams for each BU base ramdisk directories for multi-file DAQSource models");
388  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
389  desc.addUntracked<bool>("useFileBroker", false)
390  ->setComment("Use BU file service to grab input data instead of NFS file locking");
391  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
392  ->setComment("Allow service to discover BU address from hltd configuration");
393  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
394  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
395  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
396  ->setComment("Use keep alive to avoid using large number of sockets");
397  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
398  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
399  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
400  ->setComment("Lock polling interval in microseconds for the input directory file lock");
401  desc.addUntracked<bool>("outputAdler32Recheck", false)
402  ->setComment("Check Adler32 of per-process output files while micro-merging");
403  desc.addUntracked<bool>("requireTransfersPSet", false)
404  ->setComment("Require complete transferSystem PSet in the process configuration");
405  desc.addUntracked<std::string>("selectedTransferMode", "")
406  ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
407  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
408  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
409  desc.addUntracked<std::string>("mergingPset", "")
410  ->setComment("Name of merging PSet to look for merging type definitions for streams");
411  descriptions.add("EvFDaqDirector", desc);
412  }
413 
416  checkMergeTypePSet(pc);
417  }
418 
419  void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
420  //assert(run_ == id.run());
421 
422  // check if the requested run is the latest one - issue a warning if it isn't
424  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
425  << ". This is not the highest run " << dirManager_.findHighestRunDir();
426  }
427  }
428 
429  void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
430  close(bu_readlock_fd_);
431  close(bu_writelock_fd_);
432  if (directorBU_) {
433  std::string filename = bu_run_dir_ + "/bu.lock";
435  }
436  }
437 
439  //delete all files belonging to just closed lumi
440  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
442  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
443  return;
444  }
445 
446  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
447  auto it = filesToDeletePtr_->begin();
448  while (it != filesToDeletePtr_->end()) {
449  if (it->second->lumi_ == ls && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
450  it = filesToDeletePtr_->erase(it);
451  } else
452  it++;
453  }
454  }
455 
456  std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
458  }
459 
460  std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
462  }
463 
464  std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
465  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
466  }
467 
468  std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
469  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
470  }
471 
472  std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
474  }
475 
478  }
479 
482  }
483 
486  }
487 
490  }
491 
494  }
495 
497  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
498  }
499 
502  }
503 
506  }
507 
509  std::string const& stream) const {
511  }
512 
514  std::string const& stream) const {
516  }
517 
519  std::string const& stream) const {
521  }
522 
525  }
526 
529  }
530 
533  }
534 
536  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
537  }
538 
540  return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
541  }
542 
544  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
545  }
546 
548 
550 
552 
553  std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
554 
556  int retval = remove(filename.c_str());
557  if (retval != 0)
558  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
559  << ". error = " << strerror(errno);
560  }
561 
563  std::string& nextFile,
564  uint32_t& fsize,
565  uint16_t& rawHeaderSize,
566  uint64_t& lockWaitTime,
567  bool& setExceptionState) {
568  EvFDaqDirector::FileStatus fileStatus = noFile;
569  rawHeaderSize = 0;
570 
571  int retval = -1;
572  int lock_attempts = 0;
573  long total_lock_attempts = 0;
574 
575  struct stat buf;
576  int stopFileLS = -1;
577  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
578  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
579  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
580  if (stopFileCheck == 0)
581  stopFileLS = readLastLSEntry(stopFilePath_);
582  else
583  stopFileLS = 1; //stop without drain if only pid is stopped
584  if (!stop_ls_override_) {
585  //if lumisection is higher than in stop file, should quit at next from current
586  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
587  stopFileLS = stop_ls_override_ = ls;
588  } else
589  stopFileLS = stop_ls_override_;
590  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
591  << stopFileLS;
592  //return runEnded;
593  } else //if file was removed before reaching stop condition, reset this
594  stop_ls_override_ = 0;
595 
596  timeval ts_lockbegin;
597  gettimeofday(&ts_lockbegin, nullptr);
598 
599  while (retval == -1) {
600  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
601  if (retval == -1)
602  usleep(fuLockPollInterval_);
603  else
604  continue;
605 
606  lock_attempts += fuLockPollInterval_;
607  total_lock_attempts += fuLockPollInterval_;
608  if (lock_attempts > 5000000 || errno == 116) {
609  if (errno == 116)
610  edm::LogWarning("EvFDaqDirector")
611  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
612  else
613  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
614  "fu.lock file are present -: errno "
615  << errno << ":" << strerror(errno) << std::endl;
616 
617  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
618  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
619  ls++;
620  return noFile;
621  }
622 
623  if (stat(bu_run_dir_.c_str(), &buf) != 0)
624  return runEnded;
625  if (stat(fulockfile_.c_str(), &buf) != 0)
626  return runEnded;
627 
628  lock_attempts = 0;
629  }
630  if (total_lock_attempts > 5 * 60000000) {
631  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
632  return runAbort;
633  }
634  }
635 
636  timeval ts_lockend;
637  gettimeofday(&ts_lockend, nullptr);
638  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
639  if (deltat > 0.)
640  lockWaitTime = deltat;
641 
642  if (retval != 0)
643  return fileStatus;
644 
645 #ifdef DEBUG
646  timeval ts_lockend;
647  gettimeofday(&ts_lockend, 0);
648 #endif
649 
650  //open another lock file FD after the lock using main fd has been acquired
651  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
652  if (fu_readwritelock_fd2 == -1)
653  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
654  << " create. error:" << strerror(errno);
655 
656  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
657 
658  // if the stream is readable
659  if (fu_rw_lock_stream2 != nullptr) {
660  unsigned int readLs, readIndex;
661  int check = 0;
662  // rewind the stream
663  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
664  // if rewinded ok
665  if (check == 0) {
666  // read its' values
667  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
668  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
669 
670  unsigned int currentLs = readLs;
671  bool bumpedOk = false;
672  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
673  //no lock file write in this case
674  if (ls && ls + 1 < currentLs)
675  ls++;
676  else {
677  // try to bump (look for new index or EoLS file)
678  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
679  //avoid 2 lumisections jump
680  if (ls && readLs > currentLs && currentLs > ls) {
681  ls++;
682  readLs = currentLs = ls;
683  readIndex = 0;
684  bumpedOk = false;
685  //no write to lock file
686  } else {
687  if (ls == 0 && readLs > currentLs) {
688  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
689  //in this case there is no new file in the same LS
690  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
691  readLs = currentLs;
692  readIndex = 0;
693  bumpedOk = false;
694  //no write to lock file
695  }
696  //update return LS value
697  ls = readLs;
698  }
699  }
700  if (bumpedOk) {
701  // there is a new index file to grab, lock file needs to be updated
702  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
703  if (check == 0) {
704  ftruncate(fu_readwritelock_fd2, 0);
705  // write next index in the file, which is the file the next process should take
706  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
707  fflush(fu_rw_lock_stream2);
708  fsync(fu_readwritelock_fd2);
709  fileStatus = newFile;
710  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
711  } else {
712  edm::LogError("EvFDaqDirector")
713  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
714  setExceptionState = true;
715  return noFile;
716  }
717  } else if (currentLs < readLs) {
718  //there is no new file in next LS (yet), but lock file can be updated to the next LS
719  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
720  if (check == 0) {
721  ftruncate(fu_readwritelock_fd2, 0);
722  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
723  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
724  fflush(fu_rw_lock_stream2);
725  fsync(fu_readwritelock_fd2);
726  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
727  } else {
728  edm::LogError("EvFDaqDirector")
729  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
730  setExceptionState = true;
731  return noFile;
732  }
733  }
734  } else {
735  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
736  << strerror(errno);
737  }
738  } else {
739  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
740  }
741  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
742 
743 #ifdef DEBUG
744  timeval ts_preunlock;
745  gettimeofday(&ts_preunlock, 0);
746  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
747  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
748 #endif
749 
750  //if new json is present, lock file which FedRawDataInputSource will later unlock
751  if (fileStatus == newFile)
752  lockFULocal();
753 
754  //release lock at this point
755  int retvalu = -1;
756  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
757  if (retvalu == -1)
758  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
759 
760 #ifdef DEBUG
761  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
762 #endif
763 
764  if (fileStatus == noFile) {
765  struct stat buf;
766  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
767  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
768  fileStatus = runEnded;
769  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
770  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
771  fileStatus = runEnded;
772  }
773  }
774  return fileStatus;
775  }
776 
778  std::ifstream ij(BUEoLSFile);
779  Json::Value deserializeRoot;
781 
782  if (!reader.parse(ij, deserializeRoot)) {
783  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
784  return -1;
785  }
786 
788  DataPoint dp;
789  dp.deserialize(deserializeRoot);
790 
791  //read definition
792  if (readEolsDefinition_) {
793  //std::string def = boost::algorithm::trim(dp.getDefinition());
794  std::string def = dp.getDefinition();
795  if (def.empty())
796  readEolsDefinition_ = false;
797  while (!def.empty()) {
799  if (def.find('/') == 0)
800  fullpath = def;
801  else
802  fullpath = bu_run_dir_ + '/' + def;
803  struct stat buf;
804  if (stat(fullpath.c_str(), &buf) == 0) {
805  DataPointDefinition eolsDpd;
806  std::string defLabel = "legend";
807  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
808  if (eolsDpd.getNames().empty()) {
809  //try with "data" label if "legend" format is not used
810  eolsDpd = DataPointDefinition();
811  defLabel = "data";
812  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
813  }
814  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
815  if (eolsDpd.getNames().at(i) == "NFiles")
817  readEolsDefinition_ = false;
818  break;
819  }
820  //check if we can still find definition
821  if (def.size() <= 1 || def.find('/') == std::string::npos) {
822  readEolsDefinition_ = false;
823  break;
824  }
825  def = def.substr(def.find('/') + 1);
826  }
827  }
828 
829  if (dp.getData().size() > eolsNFilesIndex_)
830  data = dp.getData()[eolsNFilesIndex_];
831  else {
832  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
833  return -1;
834  }
835  return std::stoi(data);
836  }
837 
838  bool EvFDaqDirector::bumpFile(unsigned int& ls,
839  unsigned int& index,
840  std::string& nextFile,
841  uint32_t& fsize,
842  uint16_t& rawHeaderSize,
843  int maxLS,
844  bool& setExceptionState) {
845  if (previousFileSize_ != 0) {
846  if (!fms_) {
848  }
849  if (fms_)
851  previousFileSize_ = 0;
852  }
853  nextFile = "";
854 
855  //reached limit
856  if (maxLS >= 0 && ls > (unsigned int)maxLS)
857  return false;
858 
859  struct stat buf;
860  std::stringstream ss;
861 
862  // 1. Check suggested file
863  std::string nextFileJson = getInputJsonFilePath(ls, index);
864  if (stat(nextFileJson.c_str(), &buf) == 0) {
865  fsize = previousFileSize_ = buf.st_size;
866  nextFile = nextFileJson;
867  return true;
868  }
869  // 2. No file -> lumi ended? (and how many?)
870  else {
871  // 3. No file -> check for standalone raw file
872  std::string nextFileRaw = getRawFilePath(ls, index);
873  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
874  fsize = previousFileSize_ = buf.st_size;
875  nextFile = nextFileRaw;
876  return true;
877  }
878 
879  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
880 
881  if (stat(BUEoLSFile.c_str(), &buf) == 0) {
882  // recheck that no raw file appeared in the meantime
883  if (stat(nextFileJson.c_str(), &buf) == 0) {
884  fsize = previousFileSize_ = buf.st_size;
885  nextFile = nextFileJson;
886  return true;
887  }
888  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
889  fsize = previousFileSize_ = buf.st_size;
890  nextFile = nextFileRaw;
891  return true;
892  }
893 
894  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
895  if (indexFilesInLS < 0)
896  //parsing failed
897  return false;
898  else {
899  //check index
900  if ((int)index < indexFilesInLS) {
901  //we have 2 files, and check for 1 failed... retry (2 will never be here)
902  edm::LogError("EvFDaqDirector")
903  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
904  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
905  setExceptionState = true;
906  return false;
907  }
908  }
909  // this lumi ended, check for files
910  ++ls;
911  index = 0;
912 
913  //reached limit
914  if (maxLS >= 0 && ls > (unsigned int)maxLS)
915  return false;
916 
917  nextFileJson = getInputJsonFilePath(ls, 0);
918  nextFileRaw = getRawFilePath(ls, 0);
919  if (stat(nextFileJson.c_str(), &buf) == 0) {
920  // a new file was found at new lumisection, index 0
921  fsize = previousFileSize_ = buf.st_size;
922  nextFile = nextFileJson;
923  return true;
924  }
925  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
926  fsize = previousFileSize_ = buf.st_size;
927  nextFile = nextFileRaw;
928  return true;
929  }
930  return false;
931  }
932  }
933  // no new file found
934  return false;
935  }
936 
938  if (fu_rw_lock_stream == nullptr)
939  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
940  else {
941  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
942  unsigned int readLs = 1, readIndex = 0;
943  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
944  }
945  }
946 
948  if (create) {
950  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
951  chmod(fulockfile_.c_str(), 0766);
952  } else {
953  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
954  }
955  if (fu_readwritelock_fd_ == -1)
956  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
957  << " create:" << create << " error:" << strerror(errno);
958  else
959  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
960 
961  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
962  if (fu_rw_lock_stream == nullptr)
963  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
964  }
965 
966  void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
967 
968  void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
969 
971  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
972  flock(fulocal_rwlock_fd_, LOCK_SH);
973  }
974 
976  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
977  flock(fulocal_rwlock_fd_, LOCK_UN);
978  }
979 
981  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
982  flock(fulocal_rwlock_fd2_, LOCK_EX);
983  }
984 
986  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
987  flock(fulocal_rwlock_fd2_, LOCK_UN);
988  }
989 
990  void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
991  //used for backpressure mechanisms and monitoring
992  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
993  struct stat buf;
994  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
995  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
996  close(bol_fd);
997  }
998  }
999 
1000  void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
1001  const uint32_t currentLumiSection,
1002  bool doCreateBoLS,
1003  bool doCreateEoLS) {
1004  if (currentLumiSection > 0) {
1005  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
1006  struct stat buf;
1007  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
1008  if (!found) {
1009  if (doCreateEoLS) {
1010  int eol_fd =
1011  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1012  close(eol_fd);
1013  }
1014  if (doCreateBoLS)
1015  createBoLSFile(lumiSection, false);
1016  }
1017  } else if (doCreateBoLS) {
1018  createBoLSFile(lumiSection, true); //needed for initial lumisection
1019  }
1020  }
1021 
1023  int& rawFd,
1024  uint16_t& rawHeaderSize,
1025  uint16_t& rawDataType,
1026  uint32_t& lsFromHeader,
1027  int32_t& eventsFromHeader,
1028  int64_t& fileSizeFromHeader,
1029  bool requireHeader,
1030  bool retry,
1031  bool closeFile) {
1032  int infile;
1033 
1034  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1035  if (retry) {
1036  edm::LogWarning("EvFDaqDirector")
1037  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1038  return parseFRDFileHeader(rawSourcePath,
1039  rawFd,
1040  rawHeaderSize,
1041  rawDataType,
1042  lsFromHeader,
1043  eventsFromHeader,
1044  fileSizeFromHeader,
1045  requireHeader,
1046  false,
1047  closeFile);
1048  } else {
1049  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1050  edm::LogError("EvFDaqDirector")
1051  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1052  if (errno == ENOENT)
1053  return 1; // error && file not found
1054  else
1055  return -1;
1056  }
1057  }
1058  }
1059 
1060  //v2 is the largest possible read
1061  char hdr[sizeof(FRDFileHeader_v2)];
1062  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1063  return -1;
1064 
1066  uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1067 
1068  if (frd_version == 0) {
1069  //no header (specific sequence not detected)
1070  if (requireHeader) {
1071  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1072  close(infile);
1073  return -1;
1074  } else {
1075  //no header, but valid file
1076  lseek(infile, 0, SEEK_SET);
1077  rawHeaderSize = 0;
1078  lsFromHeader = 0;
1079  eventsFromHeader = -1;
1080  fileSizeFromHeader = -1;
1081  }
1082  } else if (frd_version == 1) {
1083  //version 1 header
1084  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1085  return -1;
1087  uint32_t headerSizeRaw = fhContent->headerSize_;
1088  if (headerSizeRaw != sizeof(FRDFileHeader_v1)) {
1089  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1090  << " v:" << frd_version;
1091  close(infile);
1092  return -1;
1093  }
1094  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1095  rawDataType = 0;
1096  lsFromHeader = fhContent->lumiSection_;
1097  eventsFromHeader = (int32_t)fhContent->eventCount_;
1098  fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1099  rawHeaderSize = fhContent->headerSize_;
1100 
1101  } else if (frd_version == 2) {
1102  //version 2 heade
1103  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1104  return -1;
1106  uint32_t headerSizeRaw = fhContent->headerSize_;
1107  if (headerSizeRaw != sizeof(FRDFileHeader_v2)) {
1108  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1109  << " v:" << frd_version;
1110  close(infile);
1111  return -1;
1112  }
1113  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1114  rawDataType = fhContent->dataType_;
1115  lsFromHeader = fhContent->lumiSection_;
1116  eventsFromHeader = (int32_t)fhContent->eventCount_;
1117  fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1118  rawHeaderSize = fhContent->headerSize_;
1119  }
1120 
1121  if (closeFile) {
1122  close(infile);
1123  infile = -1;
1124  }
1125 
1126  rawFd = infile;
1127  return 0; //OK
1128  }
1129 
1130  bool EvFDaqDirector::checkFileRead(char* buf, int infile, std::size_t buf_sz, std::string const& path) {
1131  ssize_t sz_read = ::read(infile, buf, buf_sz);
1132  if (sz_read < 0) {
1133  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << path << " : " << strerror(errno);
1134  if (infile != -1)
1135  close(infile);
1136  return false;
1137  }
1138  if ((size_t)sz_read < buf_sz) {
1139  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << path;
1140  if (infile != -1)
1141  close(infile);
1142  return false;
1143  }
1144  return true;
1145  }
1146 
1147  bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
1148  int infile;
1149  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1150  edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1151  << strerror(errno);
1152  return false;
1153  }
1154  //try to read FRD header size (v2 is the biggest, use read buffer of that size)
1155  char hdr[sizeof(FRDFileHeader_v2)];
1156  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1157  return false;
1159  uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1160 
1161  if (frd_version == 1) {
1162  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1163  return false;
1165  rawHeaderSize = fhContent->headerSize_;
1166  close(infile);
1167  return true;
1168  } else if (frd_version == 2) {
1169  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1170  return false;
1172  rawHeaderSize = fhContent->headerSize_;
1173  close(infile);
1174  return true;
1175  } else
1176  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unknown version: " << frd_version;
1177 
1178  close(infile);
1179  rawHeaderSize = 0;
1180  return false;
1181  }
1182 
1184  int& rawFd,
1185  uint16_t& rawHeaderSize,
1186  int64_t& fileSizeFromHeader,
1187  bool& fileFound,
1188  uint32_t serverLS,
1189  bool closeFile,
1190  bool requireHeader) {
1191  fileFound = true;
1192 
1193  //take only first three tokens delimited by "_" in the renamed raw file name
1194  std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1195  size_t pos = 0, n_tokens = 0;
1196  while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1197  }
1198  std::string reducedJsonStem = jsonStem.substr(0, pos);
1199 
1200  std::ostringstream fileNameWithPID;
1201  //should be ported to use fffnaming
1202  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1203 
1204  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1205 
1206  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1207 
1208  //parse RAW file header if it exists
1209  uint32_t lsFromRaw;
1210  int32_t nbEventsWrittenRaw;
1211  int64_t fileSizeFromRaw;
1212  uint16_t rawDataType;
1213  auto ret = parseFRDFileHeader(rawSourcePath,
1214  rawFd,
1215  rawHeaderSize,
1216  rawDataType,
1217  lsFromRaw,
1218  nbEventsWrittenRaw,
1219  fileSizeFromRaw,
1220  requireHeader,
1221  true,
1222  closeFile);
1223  if (ret != 0) {
1224  if (ret == 1)
1225  fileFound = false;
1226  return -1;
1227  }
1228 
1229  int outfile;
1230  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1231  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1232  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1233  if (errno == EEXIST) {
1234  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1235  << " : ";
1236  return -1;
1237  }
1238  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1239  << strerror(errno);
1240  struct stat out_stat;
1241  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1242  edm::LogWarning("EvFDaqDirector")
1243  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1244  << jsonDestPath;
1245  if (unlink(jsonDestPath.c_str()) == -1) {
1246  edm::LogWarning("EvFDaqDirector")
1247  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1248  }
1249  }
1250  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1251  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1252  << jsonDestPath << " : " << strerror(errno);
1253  return -1;
1254  }
1255  }
1256  //write JSON file (TODO: use jsoncpp)
1257  std::stringstream ss;
1258  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1259  std::string sstr = ss.str();
1260 
1261  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1262  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1263  << " : " << strerror(errno);
1264  return -1;
1265  }
1266  close(outfile);
1267  if (serverLS && serverLS != lsFromRaw)
1268  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1269  << " and raw file header LS " << lsFromRaw;
1270 
1271  fileSizeFromHeader = fileSizeFromRaw;
1272  return nbEventsWrittenRaw;
1273  }
1274 
1276  std::string const& rawSourcePath,
1277  int64_t& fileSizeFromJson,
1278  bool& fileFound) {
1279  fileFound = true;
1280 
1281  //should be ported to use fffnaming
1282  std::ostringstream fileNameWithPID;
1283  fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1284  << std::setw(5) << pid_ << ".jsn";
1285 
1286  // assemble json destination path
1287  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1288 
1289  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1290 
1291  int infile = -1, outfile = -1;
1292 
1293  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1294  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1295  << strerror(errno);
1296  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1297  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1298  << jsonSourcePath << " : " << strerror(errno);
1299  if (errno == ENOENT)
1300  fileFound = false;
1301  return -1;
1302  }
1303  }
1304 
1305  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1306  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1307  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1308  if (errno == EEXIST) {
1309  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1310  << " : ";
1311  ::close(infile);
1312  return -1;
1313  }
1314  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1315  << strerror(errno);
1316  struct stat out_stat;
1317  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1318  edm::LogWarning("EvFDaqDirector")
1319  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1320  if (unlink(jsonDestPath.c_str()) == -1) {
1321  edm::LogWarning("EvFDaqDirector")
1322  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1323  }
1324  }
1325  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1326  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1327  << jsonDestPath << " : " << strerror(errno);
1328  ::close(infile);
1329  return -1;
1330  }
1331  }
1332  //copy contents
1333  const std::size_t buf_sz = 512;
1334  std::size_t tot_written = 0;
1335  std::unique_ptr<char[]> buf(new char[buf_sz]);
1336 
1337  ssize_t sz, sz_read = 1, sz_write;
1338  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1339  sz_write = 0;
1340  do {
1341  assert(sz_read - sz_write > 0);
1342  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1343  sz_read = sz; // cause read loop termination
1344  break;
1345  }
1346  assert(sz > 0);
1347  sz_write += sz;
1348  tot_written += sz;
1349  } while (sz_write < sz_read);
1350  }
1351  close(infile);
1352  close(outfile);
1353 
1354  if (tot_written > 0) {
1355  //leave file if it was empty for diagnosis
1356  if (unlink(jsonSourcePath.c_str()) == -1) {
1357  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1358  << strerror(errno);
1359  return -1;
1360  }
1361  } else {
1362  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1363  << jsonSourcePath;
1364  return -1;
1365  }
1366 
1367  Json::Value deserializeRoot;
1369 
1370  std::string data;
1371  std::stringstream ss;
1372  bool result;
1373  try {
1374  if (tot_written <= buf_sz) {
1375  result = reader.parse(buf.get(), deserializeRoot);
1376  } else {
1377  //json will normally not be bigger than buf_sz bytes
1378  try {
1379  std::ifstream ij(jsonDestPath);
1380  ss << ij.rdbuf();
1381  } catch (std::filesystem::filesystem_error const& ex) {
1382  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1383  return -1;
1384  }
1385  result = reader.parse(ss.str(), deserializeRoot);
1386  }
1387  if (!result) {
1388  if (tot_written <= buf_sz)
1389  ss << buf.get();
1390  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1391  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1392  << ss.str() << ".";
1393  return -1;
1394  }
1395 
1396  //read BU JSON
1397  DataPoint dp;
1398  dp.deserialize(deserializeRoot);
1399  bool success = false;
1400  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1401  if (dpd_->getNames().at(i) == "NEvents")
1402  if (i < dp.getData().size()) {
1403  data = dp.getData()[i];
1404  success = true;
1405  break;
1406  }
1407  }
1408  if (!success) {
1409  if (!dp.getData().empty())
1410  data = dp.getData()[0];
1411  else {
1412  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1413  << "grabNextJsonFile - "
1414  << " error reading number of events from BU JSON; No input value. data -: " << data;
1415  return -1;
1416  }
1417  }
1418 
1419  //try to read raw file size
1420  fileSizeFromJson = -1;
1421  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1422  if (dpd_->getNames().at(i) == "NBytes") {
1423  if (i < dp.getData().size()) {
1424  std::string dataSize = dp.getData()[i];
1425  try {
1426  fileSizeFromJson = std::stol(dataSize);
1427  } catch (const std::exception&) {
1428  //non-fatal currently, processing can continue without this value
1429  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1430  << "Input value is -: " << dataSize;
1431  }
1432  break;
1433  }
1434  }
1435  }
1436  return std::stoi(data);
1437  } catch (const std::out_of_range& e) {
1438  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1439  << "Input value is -: " << data;
1440  } catch (const std::invalid_argument& e) {
1441  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1442  << "Input value is -: " << data;
1443  } catch (std::runtime_error const& e) {
1444  //Can be thrown by Json parser
1445  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1446  }
1447 
1448  catch (std::exception const& e) {
1449  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1450  } catch (...) {
1451  //unknown exception
1452  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1453  }
1454 
1455  return -1;
1456  }
1457 
1459  std::string data;
1460  try {
1461  // assemble json destination path
1462  std::filesystem::path jsonDestPath(baseRunDir());
1463 
1464  //should be ported to use fffnaming
1465  std::ostringstream fileNameWithPID;
1466  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1467  << ".jsn";
1468  jsonDestPath /= fileNameWithPID.str();
1469 
1470  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1471  try {
1472  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1473  } catch (std::filesystem::filesystem_error const& ex) {
1474  // Input dir gone?
1475  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1476  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1477  sleep(1);
1478  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1479  }
1480  unlockFULocal();
1481 
1482  try {
1483  //sometimes this fails but file gets deleted
1484  std::filesystem::remove(jsonSourcePath);
1485  } catch (std::filesystem::filesystem_error const& ex) {
1486  // Input dir gone?
1487  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1488  } catch (std::exception const& ex) {
1489  // Input dir gone?
1490  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1491  }
1492 
1493  std::ifstream ij(jsonDestPath);
1494  Json::Value deserializeRoot;
1496 
1497  std::stringstream ss;
1498  ss << ij.rdbuf();
1499  if (!reader.parse(ss.str(), deserializeRoot)) {
1500  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1501  << "\nERROR:\n"
1502  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1503  << ss.str() << ".";
1504  throw std::runtime_error("Cannot deserialize input JSON file");
1505  }
1506 
1507  //read BU JSON
1508  std::string data;
1509  DataPoint dp;
1510  dp.deserialize(deserializeRoot);
1511  bool success = false;
1512  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1513  if (dpd_->getNames().at(i) == "NEvents")
1514  if (i < dp.getData().size()) {
1515  data = dp.getData()[i];
1516  success = true;
1517  }
1518  }
1519  if (!success) {
1520  if (!dp.getData().empty())
1521  data = dp.getData()[0];
1522  else
1523  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1524  << " error reading number of events from BU JSON -: No input value " << data;
1525  }
1526  return std::stoi(data);
1527  } catch (std::filesystem::filesystem_error const& ex) {
1528  // Input dir gone?
1529  unlockFULocal();
1530  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1531  } catch (std::runtime_error const& e) {
1532  // Another process grabbed the file and NFS did not register this
1533  unlockFULocal();
1534  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1535  } catch (const std::out_of_range&) {
1536  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1537  << "Input value is -: " << data;
1538  } catch (const std::invalid_argument&) {
1539  edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1540  << "Input value is -: " << data;
1541  } catch (std::exception const& e) {
1542  // BU run directory disappeared?
1543  unlockFULocal();
1544  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1545  }
1546 
1547  return -1;
1548  }
1549 
1551  bool& serverError,
1552  uint32_t& serverLS,
1553  uint32_t& closedServerLS,
1554  std::string& nextFileJson,
1555  std::string& nextFileRaw,
1556  bool& rawHeader,
1557  int maxLS) {
1558  EvFDaqDirector::FileStatus fileStatus = noFile;
1559  serverError = false;
1560 
1561  boost::system::error_code ec;
1562  try {
1563  while (true) {
1564  //socket connect
1565  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1567 
1568  if (ec) {
1569  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1570  serverError = true;
1571  break;
1572  }
1573  }
1574 
1575  boost::asio::streambuf request;
1576  std::ostream request_stream(&request);
1577  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1578  if (maxLS >= 0) {
1579  std::stringstream spath;
1580  spath << path << "&stopls=" << maxLS;
1581  path = spath.str();
1582  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1583  }
1584  request_stream << "GET " << path << " HTTP/1.1\r\n";
1585  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1586  request_stream << "Accept: */*\r\n";
1587  request_stream << "Connection: keep-alive\r\n\r\n";
1588 
1589  boost::asio::write(*socket_, request, ec);
1590  if (ec) {
1591  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1592  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1593  //we got disconnected, try to reconnect to the server before writing the request
1595  if (ec) {
1596  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1597  serverError = true;
1598  break;
1599  }
1600  continue;
1601  }
1602  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1603  serverError = true;
1604  break;
1605  }
1606 
1607  boost::asio::streambuf response;
1608  boost::asio::read_until(*socket_, response, "\r\n", ec);
1609  if (ec) {
1610  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1611  serverError = true;
1612  break;
1613  }
1614 
1615  std::istream response_stream(&response);
1616 
1617  std::string http_version;
1618  response_stream >> http_version;
1619 
1620  response_stream >> serverHttpStatus;
1621 
1622  std::string status_message;
1623  std::getline(response_stream, status_message);
1624  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1625  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1626  serverError = true;
1627  break;
1628  }
1629  if (serverHttpStatus != 200) {
1630  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1631  serverError = true;
1632  break;
1633  }
1634 
1635  // Process the response headers.
1637  while (std::getline(response_stream, header) && header != "\r") {
1638  }
1639 
1640  std::string fileInfo;
1641  std::map<std::string, std::string> serverMap;
1642  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1643  auto pos = fileInfo.find('=');
1644  if (pos == std::string::npos)
1645  continue;
1646  auto stitle = fileInfo.substr(0, pos);
1647  auto svalue = fileInfo.substr(pos + 1);
1648  serverMap[stitle] = svalue;
1649  }
1650 
1651  //check that response run number if correct
1652  auto server_version = serverMap.find("version");
1653  assert(server_version != serverMap.end());
1654 
1655  auto server_run = serverMap.find("runnumber");
1656  assert(server_run != serverMap.end());
1657  assert(run_nstring_ == server_run->second);
1658 
1659  auto server_state = serverMap.find("state");
1660  assert(server_state != serverMap.end());
1661 
1662  auto server_eols = serverMap.find("lasteols");
1663  assert(server_eols != serverMap.end());
1664 
1665  auto server_ls = serverMap.find("lumisection");
1666 
1667  int version_maj = 1;
1668  int version_min = 0;
1669  int version_rev = 0;
1670  {
1671  auto* s_ptr = server_version->second.c_str();
1672  if (!server_version->second.empty() && server_version->second[0] == '"')
1673  s_ptr++;
1674  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1675  if (res < 3) {
1676  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1677  if (res < 2) {
1678  res = sscanf(s_ptr, "%d", &version_maj);
1679  if (res < 1) {
1680  //expecting at least 1 number (major version)
1681  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1682  }
1683  }
1684  }
1685  }
1686 
1687  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1688  if (server_ls != serverMap.end())
1689  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1690  else
1691  serverLS = closedServerLS + 1;
1692 
1693  std::string s_state = server_state->second;
1694  if (s_state == "STARTING") //initial, always empty starting with LS 1
1695  {
1696  auto server_file = serverMap.find("file");
1697  assert(server_file == serverMap.end()); //no file with starting state
1698  fileStatus = noFile;
1699  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1700  } else if (s_state == "READY") {
1701  auto server_file = serverMap.find("file");
1702  if (server_file == serverMap.end()) {
1703  //can be returned by server if files from new LS already appeared but LS is not yet closed
1704  if (serverLS <= closedServerLS)
1705  serverLS = closedServerLS + 1;
1706  fileStatus = noFile;
1707  edm::LogInfo("EvFDaqDirector")
1708  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1709  } else {
1710  std::string filestem;
1711  std::string fileprefix;
1712  auto server_fileprefix = serverMap.find("fileprefix");
1713 
1714  if (server_fileprefix != serverMap.end()) {
1715  auto pssize = server_fileprefix->second.size();
1716  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1717  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1718  else
1719  fileprefix = server_fileprefix->second;
1720  }
1721 
1722  //remove string literals
1723  auto ssize = server_file->second.size();
1724  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1725  filestem = server_file->second.substr(1, ssize - 2);
1726  else
1727  filestem = server_file->second;
1728  assert(!filestem.empty());
1729  if (version_maj > 1) {
1730  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1731  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1732  nextFileJson = "";
1733  rawHeader = true;
1734  } else {
1735  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1736  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1737  nextFileJson = filestem + ".jsn";
1738  rawHeader = false;
1739  }
1740  fileStatus = newFile;
1741  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1742  << serverLS << " file:" << filestem;
1743  }
1744  } else if (s_state == "EOLS") {
1745  serverLS = closedServerLS + 1;
1746  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1747  fileStatus = noFile;
1748  } else if (s_state == "EOR") {
1749  //server_eor = serverMap.find("iseor");
1750  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1751  fileStatus = runEnded;
1752  } else if (s_state == "NORUN") {
1753  auto err_msg = serverMap.find("errormessage");
1754  if (err_msg != serverMap.end())
1755  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1756  else
1757  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1758  edm::LogWarning("EvFDaqDirector") << "executing run end";
1759  fileStatus = runEnded;
1760  } else if (s_state == "ERROR") {
1761  auto err_msg = serverMap.find("errormessage");
1762  if (err_msg != serverMap.end())
1763  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1764  else
1765  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1766  fileStatus = noFile;
1767  serverError = true;
1768  } else {
1769  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1770  fileStatus = noFile;
1771  serverError = true;
1772  }
1773 
1774  // Read until EOF, writing data to output as we go.
1775  if (!fileBrokerKeepAlive_) {
1776  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1777  }
1778  if (ec != boost::asio::error::eof) {
1779  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1780  serverError = true;
1781  }
1782  }
1783 
1784  break;
1785  }
1786 
1787  } catch (std::exception const& e) {
1788  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1789  serverError = true;
1790  }
1791 
1792  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1793  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1794  if (ec) {
1795  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1796  }
1797  socket_->close(ec);
1798  if (ec) {
1799  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1800  }
1801  }
1802 
1803  if (serverError) {
1804  if (socket_->is_open())
1805  socket_->close(ec);
1806  if (ec) {
1807  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1808  }
1809  fileStatus = noFile;
1810  sleep(1); //back-off if error detected
1811  }
1812 
1813  return fileStatus;
1814  }
1815 
1817  unsigned int& ls,
1818  std::string& nextFileRaw,
1819  int& rawFd,
1820  uint16_t& rawHeaderSize,
1821  int32_t& serverEventsInNewFile,
1822  int64_t& fileSizeFromMetadata,
1823  uint64_t& thisLockWaitTimeUs,
1824  bool requireHeader) {
1825  EvFDaqDirector::FileStatus fileStatus = noFile;
1826 
1827  //int retval = -1;
1828  //int lock_attempts = 0;
1829  //long total_lock_attempts = 0;
1830 
1831  struct stat buf;
1832  int stopFileLS = -1;
1833  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1834  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1835  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1836  if (stopFileCheck == 0)
1837  stopFileLS = readLastLSEntry(stopFilePath_);
1838  else
1839  stopFileLS = 1; //stop without drain if only pid is stopped
1840  if (!stop_ls_override_) {
1841  //if lumisection is higher than in stop file, should quit at next from current
1842  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1843  stopFileLS = stop_ls_override_ = ls;
1844  } else
1845  stopFileLS = stop_ls_override_;
1846  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1847  << stopFileLS;
1848  //return runEnded;
1849  } else //if file was removed before reaching stop condition, reset this
1850  stop_ls_override_ = 0;
1851 
1852  /* look for EoLS
1853  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1854  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1855  ls++;
1856  return noFile;
1857  }
1858  */
1859 
1860  timeval ts_lockbegin;
1861  gettimeofday(&ts_lockbegin, nullptr);
1862 
1863  std::string nextFileJson;
1864  uint32_t serverLS, closedServerLS;
1865  unsigned int serverHttpStatus;
1866  bool serverError;
1867 
1868  //local lock to force index json and EoLS files to appear in order
1870  lockFULocal();
1871 
1872  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1873  bool rawHeader = false;
1874  fileStatus = contactFileBroker(
1875  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1876 
1877  if (serverError) {
1878  //do not update anything
1880  unlockFULocal();
1881  return noFile;
1882  }
1883 
1884  //handle creation of BoLS files if lumisection has changed
1885  if (currentLumiSection == 0) {
1886  if (fileStatus == runEnded)
1887  createLumiSectionFiles(closedServerLS, 0, true, false);
1888  else
1889  createLumiSectionFiles(serverLS, 0, true, false);
1890  } else {
1891  if (closedServerLS >= currentLumiSection) {
1892  //only BoLS files
1893  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1894  createLumiSectionFiles(i + 1, i, true, false);
1895  }
1896  }
1897 
1898  bool fileFound = true;
1899 
1900  if (fileStatus == newFile) {
1901  if (rawHeader > 0)
1902  serverEventsInNewFile = grabNextJsonFromRaw(
1903  nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false, requireHeader);
1904  else
1905  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1906  }
1907  //closing file in case of any error
1908  if (serverEventsInNewFile < 0 && rawFd != -1) {
1909  close(rawFd);
1910  rawFd = -1;
1911  }
1912 
1913  //can unlock because all files have been created locally
1915  unlockFULocal();
1916 
1917  if (!fileFound) {
1918  //catch condition where directory got deleted
1919  fileStatus = noFile;
1920  struct stat buf;
1921  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1922  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1923  fileStatus = runEnded;
1924  }
1925  }
1926 
1927  //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1928  //so that EoLS files can not appear locally before index files
1929  if (currentLumiSection == 0) {
1930  lockFULocal2();
1931  if (fileStatus == runEnded) {
1932  createLumiSectionFiles(closedServerLS, 0, false, true);
1933  createLumiSectionFiles(serverLS, closedServerLS, false, true); // +1
1934  } else {
1935  createLumiSectionFiles(serverLS, 0, false, true);
1936  }
1937  unlockFULocal2();
1938  } else {
1939  if (closedServerLS >= currentLumiSection) {
1940  //lock exclusive to create EoLS files
1941  lockFULocal2();
1942  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1943  createLumiSectionFiles(i + 1, i, false, true);
1944  unlockFULocal2();
1945  }
1946  }
1947 
1948  if (fileStatus == runEnded)
1949  ls = std::max(currentLumiSection, serverLS);
1950  else if (fileStatus == newFile) {
1951  assert(serverLS >= ls);
1952  ls = serverLS;
1953  } else if (fileStatus == noFile) {
1954  if (serverLS >= ls)
1955  ls = serverLS;
1956  else {
1957  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1958  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1959  sleep(1);
1960  }
1961  }
1962 
1963  return fileStatus;
1964  }
1965 
1967  // create open dir if not already there
1968 
1970  if (!std::filesystem::is_directory(openPath)) {
1971  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1972  std::filesystem::create_directories(openPath);
1973  }
1974  }
1975 
1977  std::ifstream ij(file);
1978  Json::Value deserializeRoot;
1980 
1981  if (!reader.parse(ij, deserializeRoot)) {
1982  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1983  return -1;
1984  }
1985 
1986  int ret = deserializeRoot.get("lastLS", "").asInt();
1987  return ret;
1988  }
1989 
1991  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1993  struct stat buf;
1994  unsigned int lscount = 1;
1995  do {
1996  std::stringstream ss;
1997  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1998  fullpath = ss.str();
1999  lscount++;
2000  } while (stat(fullpath.c_str(), &buf) == 0);
2001  return lscount - 1;
2002  }
2003 
2004  //if transferSystem PSet is present in the menu, we require it to be complete and consistent for all specified streams
2006  if (transferSystemJson_)
2007  return;
2008 
2009  transferSystemJson_.reset(new Json::Value);
2010  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
2011  if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
2012  const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
2013 
2014  Json::Value destinationsVal(Json::arrayValue);
2015  std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
2016  for (auto& dest : destinations)
2017  destinationsVal.append(dest);
2018  (*transferSystemJson_)["destinations"] = destinationsVal;
2019 
2020  Json::Value modesVal(Json::arrayValue);
2021  std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
2022  for (auto& mode : modes)
2023  modesVal.append(mode);
2024  (*transferSystemJson_)["transferModes"] = modesVal;
2025 
2026  for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
2027  if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
2028  const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
2029  Json::Value streamVal;
2030  for (auto& mode : modes) {
2031  //validation
2032  if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
2033  throw cms::Exception("EvFDaqDirector")
2034  << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
2035  << ")";
2036  std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
2037 
2038  Json::Value sDestsValue(Json::arrayValue);
2039 
2040  if (streamDestinations.empty())
2041  throw cms::Exception("EvFDaqDirector")
2042  << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;
2043 
2044  for (auto& sdest : streamDestinations) {
2045  bool sDestValid = false;
2046  sDestsValue.append(sdest);
2047  for (auto& dest : destinations) {
2048  if (dest == sdest)
2049  sDestValid = true;
2050  }
2051  if (!sDestValid)
2052  throw cms::Exception("EvFDaqDirector")
2053  << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
2054  << ", dest:" << sdest;
2055  }
2056  streamVal[mode] = sDestsValue;
2057  }
2058  (*transferSystemJson_)[psKeyItr->first] = streamVal;
2059  }
2060  }
2061  } else {
2062  if (requireTSPSet_)
2063  throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
2064  }
2065  }
2066 
2068  std::string streamRequestName;
2069  if (transferSystemJson_->isMember(stream.c_str()))
2070  streamRequestName = stream;
2071  else {
2072  std::stringstream msg;
2073  msg << "Transfer system mode definitions missing for -: " << stream;
2074  if (requireTSPSet_)
2075  throw cms::Exception("EvFDaqDirector") << msg.str();
2076  else {
2077  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
2078  return std::string("Failsafe");
2079  }
2080  }
2081  //return empty if strict check parameter is not on
2082  if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
2083  edm::LogWarning("EvFDaqDirector")
2084  << "Selected mode string is not provided as DaqDirector parameter."
2085  << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
2086  return std::string("Failsafe");
2087  }
2088  if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
2089  throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
2090  }
2091  //check if stream has properly listed transfer stream
2092  if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
2093  std::stringstream msg;
2094  msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
2095  if (requireTSPSet_)
2096  throw cms::Exception("EvFDaqDirector") << msg.str();
2097  else
2098  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
2099  return std::string("Failsafe");
2100  }
2101  Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");
2102 
2103  //flatten string json::Array into CSV std::string
2104  std::string ret;
2105  for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
2106  if (!ret.empty())
2107  ret += ",";
2108  ret += (*it).asString();
2109  }
2110  return ret;
2111  }
2112 
2114  if (mergeTypePset_.empty())
2115  return;
2116  if (!mergeTypeMap_.empty())
2117  return;
2118  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
2119  if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
2120  const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
2121  for (const std::string& pname : tsPset.getParameterNames()) {
2122  std::string streamType = tsPset.getParameter<std::string>(pname);
2123  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2124  mergeTypeMap_.insert(ac, pname);
2125  ac->second = streamType;
2126  ac.release();
2127  }
2128  }
2129  }
2130 
2132  tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
2133  if (mergeTypeMap_.find(search_ac, stream))
2134  return search_ac->second;
2135 
2136  edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
2137  std::string defaultName = MergeTypeNames_[defaultType];
2138  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2139  mergeTypeMap_.insert(ac, stream);
2140  ac->second = defaultName;
2141  ac.release();
2142  return defaultName;
2143  }
2144 
2146  std::string proc_flag = run_dir_ + "/processing";
2147  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2148  close(proc_flag_fd);
2149  }
2150 
2151  struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
2152 #ifdef __APPLE__
2153  return {start, len, pid, type, whence};
2154 #else
2155  return {type, whence, start, len, pid};
2156 #endif
2157  }
2158 
2160  struct stat buf;
2161  return (stat(input_throttled_file_.c_str(), &buf) == 0);
2162  }
2163 
2165  struct stat buf;
2166  return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2167  }
2168 
2169 } // namespace evf
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:80
unsigned int nThreads_
Definition: fillJson.h:27
int def(FILE *, FILE *, int)
struct flock fu_rw_flk
unsigned int nConcurrentLumis_
const_iterator end() const
std::vector< std::string > bu_base_dirs_all_
std::string run_string_
LuminosityBlockNumber_t luminosityBlock() const
T getParameter(std::string const &) const
Definition: ParameterSet.h:307
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::string bolsFileName(const unsigned int run, const unsigned int ls)
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
void watchPreallocate(Preallocate::slot_type const &iSlot)
boost::asio::io_service io_service_
const_iterator begin() const
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::string fulockfile_
def create(alignables, pedeDump, additionalData, outputFile, config)
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
std::shared_ptr< Json::Value > transferSystemJson_
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
static bool checkFileRead(char *buf, int infile, std::size_t buf_sz, std::string const &path)
jsoncollector::DataPointDefinition * dpd_
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::string getFFFParamsFilePathOnBU() const
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
Int asInt() const
ret
prodAgent to be discontinued
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
volatile std::atomic< bool > shutdown_flag
pthread_mutex_t init_lock_
ParameterSet const & getParameterSet(std::string const &) const
Value get(UInt index, const Value &defaultValue) const
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Value & append(const Value &value)
Append value to array at the end.
void removeFile(std::string)
bool lumisectionDiscarded(unsigned int ls)
reader
Definition: DQM.py:105
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:172
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string getOpenInitFilePath(std::string const &stream) const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
assert(be >=bs)
bool isExceptionOnData(unsigned int ls)
std::string getStreamDestinations(std::string const &stream) const
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: Electron.h:6
unsigned long previousFileSize_
static std::string to_string(const XMLCh *ch)
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
std::string getInitTempFilePath(std::string const &stream) const
std::string getEoRFilePathOnFU() const
struct flock fu_rw_fulk
std::string hltSourceDirectory_
unsigned int startFromLS_
std::array< uint8_t, 4 > id_
Definition: FRDFileHeader.h:29
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::mutex * fileDeleteLockPtr_
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
std::string mergeTypePset_
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def chmod(path, mode)
Definition: eostools.py:294
std::string getRunOpenDirPath() const
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
std::string stopFilePath_
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string bu_base_dir_
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:30
unsigned int stop_ls_override_
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::string selectedTransferMode_
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string input_throttled_file_
std::string getBoLSFilePathOnFU(const unsigned int ls) const
std::string eorFileName(const unsigned int run)
int readLastLSEntry(std::string const &file)
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string stopFilePathPid_
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:62
unsigned int eolsNFilesIndex_
std::string & baseRunDir()
std::string run_nstring_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
std::string getInitFilePath(std::string const &stream) const
void createProcessingNotificationMaybe() const
Log< level::Info, false > LogInfo
void openFULockfileStream(bool create)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
int getNFilesFromEoLS(std::string BUEoLSFile)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
def ls(path, rec=False)
Definition: eostools.py:349
void preBeginRun(edm::GlobalContext const &globalContext)
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned long long uint64_t
Definition: Time.h:13
void checkMergeTypePSet(edm::ProcessContext const &pc)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
std::vector< std::string > const & getNames() const
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
def load(fileName)
Definition: svgfig.py:547
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
std::string getEoRFileName() const
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:223
tuple msg
Definition: mps_check.py:286
evf::FastMonitoringService * fms_
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::string bu_run_dir_
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
ParameterSet const & getParameterSet(ParameterSetID const &id)
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
std::string getEoRFilePath() const
def mkdir(path)
Definition: eostools.py:251
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
std::string findHighestRunDir()
Definition: DirManager.cc:23
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
void postEndRun(edm::GlobalContext const &globalContext)
std::string discard_ls_filestem_
Unserialize a JSON document into a Value.
Definition: reader.h:16
static const std::vector< std::string > MergeTypeNames_
unsigned int getLumisectionToStart() const
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
void checkTransferSystemPSet(edm::ProcessContext const &pc)
std::string fileBrokerPort_
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
std::string initTempFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
void preallocate(edm::service::SystemBounds const &bounds)
Log< level::Warning, false > LogWarning
Iterator for object and array value.
Definition: value.h:908
std::vector< int > bu_base_dirs_nSources_
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
std::string fileBrokerHost_
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned int fuLockPollInterval_
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
std::string eolsFileName(const unsigned int run, const unsigned int ls)
unsigned int nStreams_
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
ParameterSetID const & parameterSetID() const
std::string bu_run_open_dir_
#define LogDebug(id)
array value (ordered list)
Definition: value.h:30