CMS 3D CMS Logo

EvFDaqDirector.cc
Go to the documentation of this file.
10 
18 
19 #include <iostream>
20 #include <fstream>
21 #include <sstream>
22 #include <sys/time.h>
23 #include <unistd.h>
24 #include <cstdio>
25 #include <boost/algorithm/string.hpp>
26 
27 //using boost::asio::ip::tcp;
28 
29 //#define DEBUG
30 
31 using namespace jsoncollector;
32 
33 namespace evf {
34 
35  //for enum MergeType
36  const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
37 
39  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
40  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
41  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
42  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
43  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
44  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
45  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
46  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
47  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
48  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
49  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
50  requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
51  selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
52  mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
53  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
54  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
55  hostname_(""),
56  bu_readlock_fd_(-1),
57  bu_writelock_fd_(-1),
58  fu_readwritelock_fd_(-1),
59  fulocal_rwlock_fd_(-1),
60  fulocal_rwlock_fd2_(-1),
61  bu_w_lock_stream(nullptr),
62  bu_r_lock_stream(nullptr),
63  fu_rw_lock_stream(nullptr),
64  dirManager_(base_dir_),
65  previousFileSize_(0),
66  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
67  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
68  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
69  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
70  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
71  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
77 
78  //save hostname for later
79  char hostname[33];
80  gethostname(hostname, 32);
81  hostname_ = hostname;
82 
83  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
84  if (fuLockPollIntervalPtr) {
85  try {
86  fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
87  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
88  << " us";
89  } catch (const std::exception&) {
90  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
91  }
92  }
93 
94  //override file service parameter if specified by environment
95  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
96  if (fileBrokerParamPtr) {
97  try {
98  useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
99  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
100  } catch (const std::exception&) {
101  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
102  }
103  }
104  if (useFileBroker_) {
106  //find BU data address from hltd configuration
108  struct stat buf;
109  if (stat("/etc/appliance/bus.config", &buf) == 0) {
110  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
111  std::getline(busconfig, fileBrokerHost_);
112  }
113  if (fileBrokerHost_.empty())
114  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
115  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
116  throw cms::Exception("EvFDaqDirector")
117  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
118 
119  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
120  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
121  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
122  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
123  }
124 
125  char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
126  if (startFromLSPtr) {
127  try {
128  startFromLS_ = std::stoul(std::string(startFromLSPtr));
129  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
130  } catch (const std::exception&) {
131  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
132  }
133  }
134 
135  //override file service parameter if specified by environment
136  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
137  if (fileBrokerUseLockParamPtr) {
138  try {
139  fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
140  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
142  } catch (const std::exception&) {
143  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
144  }
145  }
146 
147  std::stringstream ss;
148  ss << "run" << std::setfill('0') << std::setw(6) << run_;
149  run_string_ = ss.str();
150  ss = std::stringstream();
151  ss << run_;
152  run_nstring_ = ss.str();
153  run_dir_ = base_dir_ + "/" + run_string_;
154  input_throttled_file_ = run_dir_ + "/input_throttle";
155  discard_ls_filestem_ = run_dir_ + "/discard_ls";
156  ss = std::stringstream();
157  ss << getpid();
158  pid_ = ss.str();
159  }
160 
162  // check if base dir exists or create it accordingly
163  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
164  if (retval != 0 && errno != EEXIST) {
165  throw cms::Exception("DaqDirector")
166  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
167  }
168 
169  //create run dir in base dir
170  umask(0);
171  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
172  if (retval != 0 && errno != EEXIST) {
173  throw cms::Exception("DaqDirector")
174  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
175  }
176 
177  //create fu-local.lock in run open dir
178  if (!directorBU_) {
180  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
182  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
183  if (fulocal_rwlock_fd_ == -1)
184  throw cms::Exception("DaqDirector")
185  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
186  chmod(fulocal_lock_.c_str(), 0777);
187  fsync(fulocal_rwlock_fd_);
188  //open second fd for another input source thread
190  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
191  if (fulocal_rwlock_fd2_ == -1)
192  throw cms::Exception("DaqDirector")
193  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
194  }
195 
196  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
197  //for BU, it is created at this point
198  if (directorBU_) {
200  std::string bulockfile = bu_run_dir_ + "/bu.lock";
201  fulockfile_ = bu_run_dir_ + "/fu.lock";
202 
203  //make or find bu run dir
204  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
205  if (retval != 0 && errno != EEXIST) {
206  throw cms::Exception("DaqDirector")
207  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno) << "\n";
208  }
209  bu_run_open_dir_ = bu_run_dir_ + "/open";
210  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
211  if (retval != 0 && errno != EEXIST) {
212  throw cms::Exception("DaqDirector")
213  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno) << "\n";
214  }
215 
216  // the BU director does not need to know about the fu lock
217  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
218  if (bu_writelock_fd_ == -1)
219  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
220  else
221  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
222  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
223  if (bu_w_lock_stream == nullptr)
224  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
225 
226  // BU INITIALIZES LOCK FILE
227  // FU LOCK FILE OPEN
228  openFULockfileStream(true);
230  fflush(fu_rw_lock_stream);
231  close(fu_readwritelock_fd_);
232 
233  if (!hltSourceDirectory_.empty()) {
234  struct stat buf;
235  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
236  std::string hltdir = bu_run_dir_ + "/hlt";
237  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
238  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
239  if (retval != 0 && errno != EEXIST)
240  throw cms::Exception("DaqDirector")
241  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno) << "\n";
242 
243  std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
244  std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
245 
246  std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
247  for (auto& optfile : optfiles) {
248  try {
249  std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
250  } catch (...) {
251  }
252  }
253 
254  std::filesystem::rename(tmphltdir, hltdir);
255  } else
256  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
257  }
258  //else{}//no configuration specified
259  } else {
260  // for FU, check if bu base dir exists
261 
262  retval = mkdir(bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
263  if (retval != 0 && errno != EEXIST) {
264  throw cms::Exception("DaqDirector")
265  << " Error checking for bu base dir -: " << bu_base_dir_ << " mkdir error:" << strerror(errno) << "\n";
266  }
267 
269  fulockfile_ = bu_run_dir_ + "/fu.lock";
270  openFULockfileStream(false);
271  }
272 
273  pthread_mutex_init(&init_lock_, nullptr);
274 
275  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
276  std::stringstream sstp;
277  sstp << stopFilePath_ << "_pid" << pid_;
278  stopFilePathPid_ = sstp.str();
279 
280  if (!directorBU_) {
281  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
282  struct stat statbuf;
283  if (!stat(defPath.c_str(), &statbuf))
284  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
285  else {
286  //look in source directory if not present in ramdisk
287  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
288  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
289  if (stat(defPath.c_str(), &statbuf)) {
290  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
291  if (stat(defPath.c_str(), &statbuf)) {
292  defPath = defPathSuffix;
293  }
294  }
295  }
296  dpd_ = new DataPointDefinition();
297  std::string defLabel = "data";
298  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
299  }
300  }
301 
303  //close server connection
304  if (socket_.get() && socket_->is_open()) {
305  boost::system::error_code ec;
306  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
307  socket_->close(ec);
308  }
309 
310  if (fulocal_rwlock_fd_ != -1) {
311  unlockFULocal();
312  close(fulocal_rwlock_fd_);
313  }
314 
315  if (fulocal_rwlock_fd2_ != -1) {
316  unlockFULocal2();
317  close(fulocal_rwlock_fd2_);
318  }
319  }
320 
322  initRun();
323 
324  nThreads_ = bounds.maxNumberOfStreams();
325  nStreams_ = bounds.maxNumberOfThreads();
326  nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
327  }
328 
331  desc.setComment(
332  "Service used for file locking arbitration and for propagating information between other EvF components");
333  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
334  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
335  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
336  desc.addUntracked<bool>("useFileBroker", false)
337  ->setComment("Use BU file service to grab input data instead of NFS file locking");
338  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
339  ->setComment("Allow service to discover BU address from hltd configuration");
340  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
341  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
342  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
343  ->setComment("Use keep alive to avoid using large number of sockets");
344  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
345  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
346  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
347  ->setComment("Lock polling interval in microseconds for the input directory file lock");
348  desc.addUntracked<bool>("outputAdler32Recheck", false)
349  ->setComment("Check Adler32 of per-process output files while micro-merging");
350  desc.addUntracked<bool>("requireTransfersPSet", false)
351  ->setComment("Require complete transferSystem PSet in the process configuration");
352  desc.addUntracked<std::string>("selectedTransferMode", "")
353  ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
354  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
355  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
356  desc.addUntracked<std::string>("mergingPset", "")
357  ->setComment("Name of merging PSet to look for merging type definitions for streams");
358  descriptions.add("EvFDaqDirector", desc);
359  }
360 
363  checkMergeTypePSet(pc);
364  }
365 
366  void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
367  //assert(run_ == id.run());
368 
369  // check if the requested run is the latest one - issue a warning if it isn't
371  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
372  << ". This is not the highest run " << dirManager_.findHighestRunDir();
373  }
374  }
375 
376  void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
377  close(bu_readlock_fd_);
378  close(bu_writelock_fd_);
379  if (directorBU_) {
380  std::string filename = bu_run_dir_ + "/bu.lock";
382  }
383  }
384 
386  //delete all files belonging to just closed lumi
387  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
389  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
390  return;
391  }
392 
393  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
394  auto it = filesToDeletePtr_->begin();
395  while (it != filesToDeletePtr_->end()) {
396  if (it->second->lumi_ == ls && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
397  it = filesToDeletePtr_->erase(it);
398  } else
399  it++;
400  }
401  }
402 
403  std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
405  }
406 
407  std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
409  }
410 
411  std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
412  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
413  }
414 
415  std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
416  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
417  }
418 
419  std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
421  }
422 
425  }
426 
429  }
430 
433  }
434 
437  }
438 
441  }
442 
444  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
445  }
446 
449  }
450 
453  }
454 
456  std::string const& stream) const {
458  }
459 
461  std::string const& stream) const {
463  }
464 
466  std::string const& stream) const {
468  }
469 
472  }
473 
476  }
477 
480  }
481 
483  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
484  }
485 
487  return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
488  }
489 
491  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
492  }
493 
495 
497 
498  std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
499 
501  int retval = remove(filename.c_str());
502  if (retval != 0)
503  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
504  << ". error = " << strerror(errno);
505  }
506 
507  void EvFDaqDirector::removeFile(unsigned int ls, unsigned int index) { removeFile(getRawFilePath(ls, index)); }
508 
510  std::string& nextFile,
511  uint32_t& fsize,
512  uint16_t& rawHeaderSize,
513  uint64_t& lockWaitTime,
514  bool& setExceptionState) {
515  EvFDaqDirector::FileStatus fileStatus = noFile;
516  rawHeaderSize = 0;
517 
518  int retval = -1;
519  int lock_attempts = 0;
520  long total_lock_attempts = 0;
521 
522  struct stat buf;
523  int stopFileLS = -1;
524  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
525  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
526  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
527  if (stopFileCheck == 0)
528  stopFileLS = readLastLSEntry(stopFilePath_);
529  else
530  stopFileLS = 1; //stop without drain if only pid is stopped
531  if (!stop_ls_override_) {
532  //if lumisection is higher than in stop file, should quit at next from current
533  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
534  stopFileLS = stop_ls_override_ = ls;
535  } else
536  stopFileLS = stop_ls_override_;
537  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
538  << stopFileLS;
539  //return runEnded;
540  } else //if file was removed before reaching stop condition, reset this
541  stop_ls_override_ = 0;
542 
543  timeval ts_lockbegin;
544  gettimeofday(&ts_lockbegin, nullptr);
545 
546  while (retval == -1) {
547  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
548  if (retval == -1)
549  usleep(fuLockPollInterval_);
550  else
551  continue;
552 
553  lock_attempts += fuLockPollInterval_;
554  total_lock_attempts += fuLockPollInterval_;
555  if (lock_attempts > 5000000 || errno == 116) {
556  if (errno == 116)
557  edm::LogWarning("EvFDaqDirector")
558  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
559  else
560  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
561  "fu.lock file are present -: errno "
562  << errno << ":" << strerror(errno) << std::endl;
563 
564  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
565  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
566  ls++;
567  return noFile;
568  }
569 
570  if (stat(bu_run_dir_.c_str(), &buf) != 0)
571  return runEnded;
572  if (stat(fulockfile_.c_str(), &buf) != 0)
573  return runEnded;
574 
575  lock_attempts = 0;
576  }
577  if (total_lock_attempts > 5 * 60000000) {
578  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
579  return runAbort;
580  }
581  }
582 
583  timeval ts_lockend;
584  gettimeofday(&ts_lockend, nullptr);
585  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
586  if (deltat > 0.)
587  lockWaitTime = deltat;
588 
589  if (retval != 0)
590  return fileStatus;
591 
592 #ifdef DEBUG
593  timeval ts_lockend;
594  gettimeofday(&ts_lockend, 0);
595 #endif
596 
597  //open another lock file FD after the lock using main fd has been acquired
598  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
599  if (fu_readwritelock_fd2 == -1)
600  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
601  << " create. error:" << strerror(errno);
602 
603  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
604 
605  // if the stream is readable
606  if (fu_rw_lock_stream2 != nullptr) {
607  unsigned int readLs, readIndex;
608  int check = 0;
609  // rewind the stream
610  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
611  // if rewinded ok
612  if (check == 0) {
613  // read its' values
614  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
615  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
616 
617  unsigned int currentLs = readLs;
618  bool bumpedOk = false;
619  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
620  //no lock file write in this case
621  if (ls && ls + 1 < currentLs)
622  ls++;
623  else {
624  // try to bump (look for new index or EoLS file)
625  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
626  //avoid 2 lumisections jump
627  if (ls && readLs > currentLs && currentLs > ls) {
628  ls++;
629  readLs = currentLs = ls;
630  readIndex = 0;
631  bumpedOk = false;
632  //no write to lock file
633  } else {
634  if (ls == 0 && readLs > currentLs) {
635  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
636  //in this case there is no new file in the same LS
637  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
638  readLs = currentLs;
639  readIndex = 0;
640  bumpedOk = false;
641  //no write to lock file
642  }
643  //update return LS value
644  ls = readLs;
645  }
646  }
647  if (bumpedOk) {
648  // there is a new index file to grab, lock file needs to be updated
649  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
650  if (check == 0) {
651  ftruncate(fu_readwritelock_fd2, 0);
652  // write next index in the file, which is the file the next process should take
653  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
654  fflush(fu_rw_lock_stream2);
655  fsync(fu_readwritelock_fd2);
656  fileStatus = newFile;
657  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
658  } else {
659  edm::LogError("EvFDaqDirector")
660  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
661  setExceptionState = true;
662  return noFile;
663  }
664  } else if (currentLs < readLs) {
665  //there is no new file in next LS (yet), but lock file can be updated to the next LS
666  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
667  if (check == 0) {
668  ftruncate(fu_readwritelock_fd2, 0);
669  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
670  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
671  fflush(fu_rw_lock_stream2);
672  fsync(fu_readwritelock_fd2);
673  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
674  } else {
675  edm::LogError("EvFDaqDirector")
676  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
677  setExceptionState = true;
678  return noFile;
679  }
680  }
681  } else {
682  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
683  << strerror(errno);
684  }
685  } else {
686  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
687  }
688  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
689 
690 #ifdef DEBUG
691  timeval ts_preunlock;
692  gettimeofday(&ts_preunlock, 0);
693  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
694  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
695 #endif
696 
697  //if new json is present, lock file which FedRawDataInputSource will later unlock
698  if (fileStatus == newFile)
699  lockFULocal();
700 
701  //release lock at this point
702  int retvalu = -1;
703  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
704  if (retvalu == -1)
705  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
706 
707 #ifdef DEBUG
708  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
709 #endif
710 
711  if (fileStatus == noFile) {
712  struct stat buf;
713  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
714  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
715  fileStatus = runEnded;
716  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
717  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
718  fileStatus = runEnded;
719  }
720  }
721  return fileStatus;
722  }
723 
725  std::ifstream ij(BUEoLSFile);
726  Json::Value deserializeRoot;
728 
729  if (!reader.parse(ij, deserializeRoot)) {
730  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
731  return -1;
732  }
733 
735  DataPoint dp;
736  dp.deserialize(deserializeRoot);
737 
738  //read definition
739  if (readEolsDefinition_) {
740  //std::string def = boost::algorithm::trim(dp.getDefinition());
741  std::string def = dp.getDefinition();
742  if (def.empty())
743  readEolsDefinition_ = false;
744  while (!def.empty()) {
746  if (def.find('/') == 0)
747  fullpath = def;
748  else
749  fullpath = bu_run_dir_ + '/' + def;
750  struct stat buf;
751  if (stat(fullpath.c_str(), &buf) == 0) {
752  DataPointDefinition eolsDpd;
753  std::string defLabel = "legend";
754  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
755  if (eolsDpd.getNames().empty()) {
756  //try with "data" label if "legend" format is not used
757  eolsDpd = DataPointDefinition();
758  defLabel = "data";
759  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
760  }
761  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
762  if (eolsDpd.getNames().at(i) == "NFiles")
764  readEolsDefinition_ = false;
765  break;
766  }
767  //check if we can still find definition
768  if (def.size() <= 1 || def.find('/') == std::string::npos) {
769  readEolsDefinition_ = false;
770  break;
771  }
772  def = def.substr(def.find('/') + 1);
773  }
774  }
775 
776  if (dp.getData().size() > eolsNFilesIndex_)
777  data = dp.getData()[eolsNFilesIndex_];
778  else {
779  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
780  return -1;
781  }
782  return std::stoi(data);
783  }
784 
785  bool EvFDaqDirector::bumpFile(unsigned int& ls,
786  unsigned int& index,
787  std::string& nextFile,
788  uint32_t& fsize,
789  uint16_t& rawHeaderSize,
790  int maxLS,
791  bool& setExceptionState) {
792  if (previousFileSize_ != 0) {
793  if (!fms_) {
795  }
796  if (fms_)
798  previousFileSize_ = 0;
799  }
800  nextFile = "";
801 
802  //reached limit
803  if (maxLS >= 0 && ls > (unsigned int)maxLS)
804  return false;
805 
806  struct stat buf;
807  std::stringstream ss;
808  unsigned int nextIndex = index;
809  nextIndex++;
810 
811  // 1. Check suggested file
812  std::string nextFileJson = getInputJsonFilePath(ls, index);
813  if (stat(nextFileJson.c_str(), &buf) == 0) {
814  fsize = previousFileSize_ = buf.st_size;
815  nextFile = nextFileJson;
816  return true;
817  }
818  // 2. No file -> lumi ended? (and how many?)
819  else {
820  // 3. No file -> check for standalone raw file
821  std::string nextFileRaw = getRawFilePath(ls, index);
822  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
823  fsize = previousFileSize_ = buf.st_size;
824  nextFile = nextFileRaw;
825  return true;
826  }
827 
828  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
829 
830  if (stat(BUEoLSFile.c_str(), &buf) == 0) {
831  // recheck that no raw file appeared in the meantime
832  if (stat(nextFileJson.c_str(), &buf) == 0) {
833  fsize = previousFileSize_ = buf.st_size;
834  nextFile = nextFileJson;
835  return true;
836  }
837  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
838  fsize = previousFileSize_ = buf.st_size;
839  nextFile = nextFileRaw;
840  return true;
841  }
842 
843  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
844  if (indexFilesInLS < 0)
845  //parsing failed
846  return false;
847  else {
848  //check index
849  if ((int)index < indexFilesInLS) {
850  //we have 2 files, and check for 1 failed... retry (2 will never be here)
851  edm::LogError("EvFDaqDirector")
852  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
853  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
854  setExceptionState = true;
855  return false;
856  }
857  }
858  // this lumi ended, check for files
859  ++ls;
860  index = 0;
861 
862  //reached limit
863  if (maxLS >= 0 && ls > (unsigned int)maxLS)
864  return false;
865 
866  nextFileJson = getInputJsonFilePath(ls, 0);
867  nextFileRaw = getRawFilePath(ls, 0);
868  if (stat(nextFileJson.c_str(), &buf) == 0) {
869  // a new file was found at new lumisection, index 0
870  fsize = previousFileSize_ = buf.st_size;
871  nextFile = nextFileJson;
872  return true;
873  }
874  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
875  fsize = previousFileSize_ = buf.st_size;
876  nextFile = nextFileRaw;
877  return true;
878  }
879  return false;
880  }
881  }
882  // no new file found
883  return false;
884  }
885 
887  if (fu_rw_lock_stream == nullptr)
888  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
889  else {
890  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
891  unsigned int readLs = 1, readIndex = 0;
892  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
893  }
894  }
895 
897  if (create) {
899  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
900  chmod(fulockfile_.c_str(), 0766);
901  } else {
902  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
903  }
904  if (fu_readwritelock_fd_ == -1)
905  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
906  << " create:" << create << " error:" << strerror(errno);
907  else
908  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
909 
910  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
911  if (fu_rw_lock_stream == nullptr)
912  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
913  }
914 
915  void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
916 
917  void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
918 
920  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
921  flock(fulocal_rwlock_fd_, LOCK_SH);
922  }
923 
925  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
926  flock(fulocal_rwlock_fd_, LOCK_UN);
927  }
928 
930  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
931  flock(fulocal_rwlock_fd2_, LOCK_EX);
932  }
933 
935  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
936  flock(fulocal_rwlock_fd2_, LOCK_UN);
937  }
938 
939  void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
940  //used for backpressure mechanisms and monitoring
941  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
942  struct stat buf;
943  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
944  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
945  close(bol_fd);
946  }
947  }
948 
949  void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
950  const uint32_t currentLumiSection,
951  bool doCreateBoLS,
952  bool doCreateEoLS) {
953  if (currentLumiSection > 0) {
954  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
955  struct stat buf;
956  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
957  if (!found) {
958  if (doCreateEoLS) {
959  int eol_fd =
960  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
961  close(eol_fd);
962  }
963  if (doCreateBoLS)
964  createBoLSFile(lumiSection, false);
965  }
966  } else if (doCreateBoLS) {
967  createBoLSFile(lumiSection, true); //needed for initial lumisection
968  }
969  }
970 
972  int& rawFd,
973  uint16_t& rawHeaderSize,
974  uint32_t& lsFromHeader,
975  int32_t& eventsFromHeader,
976  int64_t& fileSizeFromHeader,
977  bool requireHeader,
978  bool retry,
979  bool closeFile) {
980  int infile;
981 
982  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
983  if (retry) {
984  edm::LogWarning("EvFDaqDirector")
985  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
986  return parseFRDFileHeader(rawSourcePath,
987  rawFd,
988  rawHeaderSize,
989  lsFromHeader,
990  eventsFromHeader,
991  fileSizeFromHeader,
992  requireHeader,
993  false,
994  closeFile);
995  } else {
996  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
997  edm::LogError("EvFDaqDirector")
998  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
999  if (errno == ENOENT)
1000  return 1; // error && file not found
1001  else
1002  return -1;
1003  }
1004  }
1005  }
1006 
1007  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
1008  FRDFileHeader_v1 fileHead;
1009 
1010  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1011  if (closeFile) {
1012  close(infile);
1013  infile = -1;
1014  }
1015 
1016  if (sz_read < 0) {
1017  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - unable to read " << rawSourcePath << " : "
1018  << strerror(errno);
1019  if (infile != -1)
1020  close(infile);
1021  return -1;
1022  }
1023  if ((size_t)sz_read < buf_sz) {
1024  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - file smaller than header: " << rawSourcePath;
1025  if (infile != -1)
1026  close(infile);
1027  return -1;
1028  }
1029 
1030  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1031 
1032  if (frd_version == 0) {
1033  //no header (specific sequence not detected)
1034  if (requireHeader) {
1035  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1036  if (infile != -1)
1037  close(infile);
1038  return -1;
1039  } else {
1040  //no header, but valid file
1041  lseek(infile, 0, SEEK_SET);
1042  rawHeaderSize = 0;
1043  lsFromHeader = 0;
1044  eventsFromHeader = -1;
1045  fileSizeFromHeader = -1;
1046  }
1047  } else {
1048  //version 1 header
1049  uint32_t headerSizeRaw = fileHead.headerSize_;
1050  if (headerSizeRaw < buf_sz) {
1051  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1052  << " v:" << frd_version;
1053  if (infile != -1)
1054  close(infile);
1055  return -1;
1056  }
1057  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1058  lsFromHeader = fileHead.lumiSection_;
1059  eventsFromHeader = (int32_t)fileHead.eventCount_;
1060  fileSizeFromHeader = (int64_t)fileHead.fileSize_;
1061  rawHeaderSize = fileHead.headerSize_;
1062  }
1063  rawFd = infile;
1064  return 0; //OK
1065  }
1066 
1067  bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
1068  int infile;
1069  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1070  edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1071  << strerror(errno);
1072  return false;
1073  }
1074  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
1075  FRDFileHeader_v1 fileHead;
1076 
1077  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1078 
1079  if (sz_read < 0) {
1080  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << rawSourcePath << " : "
1081  << strerror(errno);
1082  if (infile != -1)
1083  close(infile);
1084  return false;
1085  }
1086  if ((size_t)sz_read < buf_sz) {
1087  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << rawSourcePath;
1088  if (infile != -1)
1089  close(infile);
1090  return false;
1091  }
1092 
1093  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1094 
1095  close(infile);
1096 
1097  if (frd_version > 0) {
1098  rawHeaderSize = fileHead.headerSize_;
1099  return true;
1100  }
1101 
1102  rawHeaderSize = 0;
1103  return false;
1104  }
1105 
1107  int& rawFd,
1108  uint16_t& rawHeaderSize,
1109  int64_t& fileSizeFromHeader,
1110  bool& fileFound,
1111  uint32_t serverLS,
1112  bool closeFile) {
1113  fileFound = true;
1114 
1115  //take only first three tokens delimited by "_" in the renamed raw file name
1116  std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1117  size_t pos = 0, n_tokens = 0;
1118  while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1119  }
1120  std::string reducedJsonStem = jsonStem.substr(0, pos);
1121 
1122  std::ostringstream fileNameWithPID;
1123  //should be ported to use fffnaming
1124  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1125 
1126  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1127 
1128  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1129 
1130  //parse RAW file header if it exists
1131  uint32_t lsFromRaw;
1132  int32_t nbEventsWrittenRaw;
1133  int64_t fileSizeFromRaw;
1134  auto ret = parseFRDFileHeader(
1135  rawSourcePath, rawFd, rawHeaderSize, lsFromRaw, nbEventsWrittenRaw, fileSizeFromRaw, true, true, closeFile);
1136  if (ret != 0) {
1137  if (ret == 1)
1138  fileFound = false;
1139  return -1;
1140  }
1141 
1142  int outfile;
1143  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1144  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1145  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1146  if (errno == EEXIST) {
1147  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1148  << " : ";
1149  return -1;
1150  }
1151  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1152  << strerror(errno);
1153  struct stat out_stat;
1154  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1155  edm::LogWarning("EvFDaqDirector")
1156  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1157  << jsonDestPath;
1158  if (unlink(jsonDestPath.c_str()) == -1) {
1159  edm::LogWarning("EvFDaqDirector")
1160  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1161  }
1162  }
1163  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1164  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1165  << jsonDestPath << " : " << strerror(errno);
1166  return -1;
1167  }
1168  }
1169  //write JSON file (TODO: use jsoncpp)
1170  std::stringstream ss;
1171  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1172  std::string sstr = ss.str();
1173 
1174  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1175  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1176  << " : " << strerror(errno);
1177  return -1;
1178  }
1179  close(outfile);
1180  if (serverLS && serverLS != lsFromRaw)
1181  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1182  << " and raw file header LS " << lsFromRaw;
1183 
1184  fileSizeFromHeader = fileSizeFromRaw;
1185  return nbEventsWrittenRaw;
1186  }
1187 
1189  std::string const& rawSourcePath,
1190  int64_t& fileSizeFromJson,
1191  bool& fileFound) {
1192  fileFound = true;
1193 
1194  //should be ported to use fffnaming
1195  std::ostringstream fileNameWithPID;
1196  fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1197  << std::setw(5) << pid_ << ".jsn";
1198 
1199  // assemble json destination path
1200  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1201 
1202  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1203 
1204  int infile = -1, outfile = -1;
1205 
1206  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1207  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1208  << strerror(errno);
1209  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1210  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1211  << jsonSourcePath << " : " << strerror(errno);
1212  if (errno == ENOENT)
1213  fileFound = false;
1214  return -1;
1215  }
1216  }
1217 
1218  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1219  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1220  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1221  if (errno == EEXIST) {
1222  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1223  << " : ";
1224  ::close(infile);
1225  return -1;
1226  }
1227  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1228  << strerror(errno);
1229  struct stat out_stat;
1230  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1231  edm::LogWarning("EvFDaqDirector")
1232  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1233  if (unlink(jsonDestPath.c_str()) == -1) {
1234  edm::LogWarning("EvFDaqDirector")
1235  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1236  }
1237  }
1238  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1239  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1240  << jsonDestPath << " : " << strerror(errno);
1241  ::close(infile);
1242  return -1;
1243  }
1244  }
1245  //copy contents
1246  const std::size_t buf_sz = 512;
1247  std::size_t tot_written = 0;
1248  std::unique_ptr<char[]> buf(new char[buf_sz]);
1249 
1250  ssize_t sz, sz_read = 1, sz_write;
1251  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1252  sz_write = 0;
1253  do {
1254  assert(sz_read - sz_write > 0);
1255  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1256  sz_read = sz; // cause read loop termination
1257  break;
1258  }
1259  assert(sz > 0);
1260  sz_write += sz;
1261  tot_written += sz;
1262  } while (sz_write < sz_read);
1263  }
1264  close(infile);
1265  close(outfile);
1266 
1267  if (tot_written > 0) {
1268  //leave file if it was empty for diagnosis
1269  if (unlink(jsonSourcePath.c_str()) == -1) {
1270  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1271  << strerror(errno);
1272  return -1;
1273  }
1274  } else {
1275  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1276  << jsonSourcePath;
1277  return -1;
1278  }
1279 
1280  Json::Value deserializeRoot;
1282 
1283  std::string data;
1284  std::stringstream ss;
1285  bool result;
1286  try {
1287  if (tot_written <= buf_sz) {
1288  result = reader.parse(buf.get(), deserializeRoot);
1289  } else {
1290  //json will normally not be bigger than buf_sz bytes
1291  try {
1292  std::ifstream ij(jsonDestPath);
1293  ss << ij.rdbuf();
1294  } catch (std::filesystem::filesystem_error const& ex) {
1295  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1296  return -1;
1297  }
1298  result = reader.parse(ss.str(), deserializeRoot);
1299  }
1300  if (!result) {
1301  if (tot_written <= buf_sz)
1302  ss << buf.get();
1303  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1304  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1305  << ss.str() << ".";
1306  return -1;
1307  }
1308 
1309  //read BU JSON
1310  DataPoint dp;
1311  dp.deserialize(deserializeRoot);
1312  bool success = false;
1313  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1314  if (dpd_->getNames().at(i) == "NEvents")
1315  if (i < dp.getData().size()) {
1316  data = dp.getData()[i];
1317  success = true;
1318  break;
1319  }
1320  }
1321  if (!success) {
1322  if (!dp.getData().empty())
1323  data = dp.getData()[0];
1324  else {
1325  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1326  << "grabNextJsonFile - "
1327  << " error reading number of events from BU JSON; No input value. data -: " << data;
1328  return -1;
1329  }
1330  }
1331 
1332  //try to read raw file size
1333  fileSizeFromJson = -1;
1334  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1335  if (dpd_->getNames().at(i) == "NBytes") {
1336  if (i < dp.getData().size()) {
1337  std::string dataSize = dp.getData()[i];
1338  try {
1339  fileSizeFromJson = std::stol(dataSize);
1340  } catch (const std::exception&) {
1341  //non-fatal currently, processing can continue without this value
1342  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1343  << "Input value is -: " << dataSize;
1344  }
1345  break;
1346  }
1347  }
1348  }
1349  return std::stoi(data);
1350  } catch (const std::out_of_range& e) {
1351  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1352  << "Input value is -: " << data;
1353  } catch (const std::invalid_argument& e) {
1354  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1355  << "Input value is -: " << data;
1356  } catch (std::runtime_error const& e) {
1357  //Can be thrown by Json parser
1358  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1359  }
1360 
1361  catch (std::exception const& e) {
1362  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1363  } catch (...) {
1364  //unknown exception
1365  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1366  }
1367 
1368  return -1;
1369  }
1370 
1372  std::string data;
1373  try {
1374  // assemble json destination path
1375  std::filesystem::path jsonDestPath(baseRunDir());
1376 
1377  //should be ported to use fffnaming
1378  std::ostringstream fileNameWithPID;
1379  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1380  << ".jsn";
1381  jsonDestPath /= fileNameWithPID.str();
1382 
1383  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1384  try {
1385  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1386  } catch (std::filesystem::filesystem_error const& ex) {
1387  // Input dir gone?
1388  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1389  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1390  sleep(1);
1391  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1392  }
1393  unlockFULocal();
1394 
1395  try {
1396  //sometimes this fails but file gets deleted
1397  std::filesystem::remove(jsonSourcePath);
1398  } catch (std::filesystem::filesystem_error const& ex) {
1399  // Input dir gone?
1400  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1401  } catch (std::exception const& ex) {
1402  // Input dir gone?
1403  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1404  }
1405 
1406  std::ifstream ij(jsonDestPath);
1407  Json::Value deserializeRoot;
1409 
1410  std::stringstream ss;
1411  ss << ij.rdbuf();
1412  if (!reader.parse(ss.str(), deserializeRoot)) {
1413  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1414  << "\nERROR:\n"
1415  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1416  << ss.str() << ".";
1417  throw std::runtime_error("Cannot deserialize input JSON file");
1418  }
1419 
1420  //read BU JSON
1421  std::string data;
1422  DataPoint dp;
1423  dp.deserialize(deserializeRoot);
1424  bool success = false;
1425  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1426  if (dpd_->getNames().at(i) == "NEvents")
1427  if (i < dp.getData().size()) {
1428  data = dp.getData()[i];
1429  success = true;
1430  }
1431  }
1432  if (!success) {
1433  if (!dp.getData().empty())
1434  data = dp.getData()[0];
1435  else
1436  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1437  << " error reading number of events from BU JSON -: No input value " << data;
1438  }
1439  return std::stoi(data);
1440  } catch (std::filesystem::filesystem_error const& ex) {
1441  // Input dir gone?
1442  unlockFULocal();
1443  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1444  } catch (std::runtime_error const& e) {
1445  // Another process grabbed the file and NFS did not register this
1446  unlockFULocal();
1447  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1448  } catch (const std::out_of_range&) {
1449  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1450  << "Input value is -: " << data;
1451  } catch (const std::invalid_argument&) {
1452  edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1453  << "Input value is -: " << data;
1454  } catch (std::exception const& e) {
1455  // BU run directory disappeared?
1456  unlockFULocal();
1457  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1458  }
1459 
1460  return -1;
1461  }
1462 
1464  bool& serverError,
1465  uint32_t& serverLS,
1466  uint32_t& closedServerLS,
1467  std::string& nextFileJson,
1468  std::string& nextFileRaw,
1469  bool& rawHeader,
1470  int maxLS) {
1471  EvFDaqDirector::FileStatus fileStatus = noFile;
1472  serverError = false;
1473 
1474  boost::system::error_code ec;
1475  try {
1476  while (true) {
1477  //socket connect
1478  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1480 
1481  if (ec) {
1482  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1483  serverError = true;
1484  break;
1485  }
1486  }
1487 
1488  boost::asio::streambuf request;
1489  std::ostream request_stream(&request);
1490  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1491  if (maxLS >= 0) {
1492  std::stringstream spath;
1493  spath << path << "&stopls=" << maxLS;
1494  path = spath.str();
1495  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1496  }
1497  request_stream << "GET " << path << " HTTP/1.1\r\n";
1498  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1499  request_stream << "Accept: */*\r\n";
1500  request_stream << "Connection: keep-alive\r\n\r\n";
1501 
1502  boost::asio::write(*socket_, request, ec);
1503  if (ec) {
1504  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1505  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1506  //we got disconnected, try to reconnect to the server before writing the request
1508  if (ec) {
1509  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1510  serverError = true;
1511  break;
1512  }
1513  continue;
1514  }
1515  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1516  serverError = true;
1517  break;
1518  }
1519 
1520  boost::asio::streambuf response;
1521  boost::asio::read_until(*socket_, response, "\r\n", ec);
1522  if (ec) {
1523  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1524  serverError = true;
1525  break;
1526  }
1527 
1528  std::istream response_stream(&response);
1529 
1530  std::string http_version;
1531  response_stream >> http_version;
1532 
1533  response_stream >> serverHttpStatus;
1534 
1535  std::string status_message;
1536  std::getline(response_stream, status_message);
1537  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1538  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1539  serverError = true;
1540  break;
1541  }
1542  if (serverHttpStatus != 200) {
1543  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1544  serverError = true;
1545  break;
1546  }
1547 
1548  // Process the response headers.
1550  while (std::getline(response_stream, header) && header != "\r") {
1551  }
1552 
1553  std::string fileInfo;
1554  std::map<std::string, std::string> serverMap;
1555  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1556  auto pos = fileInfo.find('=');
1557  if (pos == std::string::npos)
1558  continue;
1559  auto stitle = fileInfo.substr(0, pos);
1560  auto svalue = fileInfo.substr(pos + 1);
1561  serverMap[stitle] = svalue;
1562  }
1563 
1564  //check that response run number if correct
1565  auto server_version = serverMap.find("version");
1566  assert(server_version != serverMap.end());
1567 
1568  auto server_run = serverMap.find("runnumber");
1569  assert(server_run != serverMap.end());
1570  assert(run_nstring_ == server_run->second);
1571 
1572  auto server_state = serverMap.find("state");
1573  assert(server_state != serverMap.end());
1574 
1575  auto server_eols = serverMap.find("lasteols");
1576  assert(server_eols != serverMap.end());
1577 
1578  auto server_ls = serverMap.find("lumisection");
1579 
1580  int version_maj = 1;
1581  int version_min = 0;
1582  int version_rev = 0;
1583  {
1584  auto* s_ptr = server_version->second.c_str();
1585  if (!server_version->second.empty() && server_version->second[0] == '"')
1586  s_ptr++;
1587  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1588  if (res < 3) {
1589  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1590  if (res < 2) {
1591  res = sscanf(s_ptr, "%d", &version_maj);
1592  if (res < 1) {
1593  //expecting at least 1 number (major version)
1594  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1595  }
1596  }
1597  }
1598  }
1599 
1600  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1601  if (server_ls != serverMap.end())
1602  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1603  else
1604  serverLS = closedServerLS + 1;
1605 
1606  std::string s_state = server_state->second;
1607  if (s_state == "STARTING") //initial, always empty starting with LS 1
1608  {
1609  auto server_file = serverMap.find("file");
1610  assert(server_file == serverMap.end()); //no file with starting state
1611  fileStatus = noFile;
1612  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1613  } else if (s_state == "READY") {
1614  auto server_file = serverMap.find("file");
1615  if (server_file == serverMap.end()) {
1616  //can be returned by server if files from new LS already appeared but LS is not yet closed
1617  if (serverLS <= closedServerLS)
1618  serverLS = closedServerLS + 1;
1619  fileStatus = noFile;
1620  edm::LogInfo("EvFDaqDirector")
1621  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1622  } else {
1623  std::string filestem;
1624  std::string fileprefix;
1625  auto server_fileprefix = serverMap.find("fileprefix");
1626 
1627  if (server_fileprefix != serverMap.end()) {
1628  auto pssize = server_fileprefix->second.size();
1629  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1630  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1631  else
1632  fileprefix = server_fileprefix->second;
1633  }
1634 
1635  //remove string literals
1636  auto ssize = server_file->second.size();
1637  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1638  filestem = server_file->second.substr(1, ssize - 2);
1639  else
1640  filestem = server_file->second;
1641  assert(!filestem.empty());
1642  if (version_maj > 1) {
1643  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1644  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1645  nextFileJson = "";
1646  rawHeader = true;
1647  } else {
1648  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1649  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1650  nextFileJson = filestem + ".jsn";
1651  rawHeader = false;
1652  }
1653  fileStatus = newFile;
1654  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1655  << serverLS << " file:" << filestem;
1656  }
1657  } else if (s_state == "EOLS") {
1658  serverLS = closedServerLS + 1;
1659  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1660  fileStatus = noFile;
1661  } else if (s_state == "EOR") {
1662  //server_eor = serverMap.find("iseor");
1663  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1664  fileStatus = runEnded;
1665  } else if (s_state == "NORUN") {
1666  auto err_msg = serverMap.find("errormessage");
1667  if (err_msg != serverMap.end())
1668  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1669  else
1670  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1671  edm::LogWarning("EvFDaqDirector") << "executing run end";
1672  fileStatus = runEnded;
1673  } else if (s_state == "ERROR") {
1674  auto err_msg = serverMap.find("errormessage");
1675  if (err_msg != serverMap.end())
1676  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1677  else
1678  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1679  fileStatus = noFile;
1680  serverError = true;
1681  } else {
1682  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1683  fileStatus = noFile;
1684  serverError = true;
1685  }
1686 
1687  // Read until EOF, writing data to output as we go.
1688  if (!fileBrokerKeepAlive_) {
1689  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1690  }
1691  if (ec != boost::asio::error::eof) {
1692  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1693  serverError = true;
1694  }
1695  }
1696 
1697  break;
1698  }
1699 
1700  } catch (std::exception const& e) {
1701  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1702  serverError = true;
1703  }
1704 
1705  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1706  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1707  if (ec) {
1708  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1709  }
1710  socket_->close(ec);
1711  if (ec) {
1712  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1713  }
1714  }
1715 
1716  if (serverError) {
1717  if (socket_->is_open())
1718  socket_->close(ec);
1719  if (ec) {
1720  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1721  }
1722  fileStatus = noFile;
1723  sleep(1); //back-off if error detected
1724  }
1725 
1726  return fileStatus;
1727  }
1728 
1730  unsigned int& ls,
1731  std::string& nextFileRaw,
1732  int& rawFd,
1733  uint16_t& rawHeaderSize,
1734  int32_t& serverEventsInNewFile,
1735  int64_t& fileSizeFromMetadata,
1736  uint64_t& thisLockWaitTimeUs) {
1737  EvFDaqDirector::FileStatus fileStatus = noFile;
1738 
1739  //int retval = -1;
1740  //int lock_attempts = 0;
1741  //long total_lock_attempts = 0;
1742 
1743  struct stat buf;
1744  int stopFileLS = -1;
1745  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1746  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1747  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1748  if (stopFileCheck == 0)
1749  stopFileLS = readLastLSEntry(stopFilePath_);
1750  else
1751  stopFileLS = 1; //stop without drain if only pid is stopped
1752  if (!stop_ls_override_) {
1753  //if lumisection is higher than in stop file, should quit at next from current
1754  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1755  stopFileLS = stop_ls_override_ = ls;
1756  } else
1757  stopFileLS = stop_ls_override_;
1758  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1759  << stopFileLS;
1760  //return runEnded;
1761  } else //if file was removed before reaching stop condition, reset this
1762  stop_ls_override_ = 0;
1763 
1764  /* look for EoLS
1765  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1766  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1767  ls++;
1768  return noFile;
1769  }
1770  */
1771 
1772  timeval ts_lockbegin;
1773  gettimeofday(&ts_lockbegin, nullptr);
1774 
1775  std::string nextFileJson;
1776  uint32_t serverLS, closedServerLS;
1777  unsigned int serverHttpStatus;
1778  bool serverError;
1779 
1780  //local lock to force index json and EoLS files to appear in order
1782  lockFULocal();
1783 
1784  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1785  bool rawHeader = false;
1786  fileStatus = contactFileBroker(
1787  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1788 
1789  if (serverError) {
1790  //do not update anything
1792  unlockFULocal();
1793  return noFile;
1794  }
1795 
1796  //handle creation of BoLS files if lumisection has changed
1797  if (currentLumiSection == 0) {
1798  if (fileStatus == runEnded)
1799  createLumiSectionFiles(closedServerLS, 0, true, false);
1800  else
1801  createLumiSectionFiles(serverLS, 0, true, false);
1802  } else {
1803  if (closedServerLS >= currentLumiSection) {
1804  //only BoLS files
1805  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1806  createLumiSectionFiles(i + 1, i, true, false);
1807  }
1808  }
1809 
1810  bool fileFound = true;
1811 
1812  if (fileStatus == newFile) {
1813  if (rawHeader > 0)
1814  serverEventsInNewFile =
1815  grabNextJsonFromRaw(nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false);
1816  else
1817  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1818  }
1819  //closing file in case of any error
1820  if (serverEventsInNewFile < 0 && rawFd != -1) {
1821  close(rawFd);
1822  rawFd = -1;
1823  }
1824 
1825  //can unlock because all files have been created locally
1827  unlockFULocal();
1828 
1829  if (!fileFound) {
1830  //catch condition where directory got deleted
1831  fileStatus = noFile;
1832  struct stat buf;
1833  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1834  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1835  fileStatus = runEnded;
1836  }
1837  }
1838 
1839  //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1840  //so that EoLS files can not appear locally before index files
1841  if (currentLumiSection == 0) {
1842  lockFULocal2();
1843  if (fileStatus == runEnded) {
1844  createLumiSectionFiles(closedServerLS, 0, false, true);
1845  createLumiSectionFiles(serverLS, closedServerLS, false, true); // +1
1846  } else {
1847  createLumiSectionFiles(serverLS, 0, false, true);
1848  }
1849  unlockFULocal2();
1850  } else {
1851  if (closedServerLS >= currentLumiSection) {
1852  //lock exclusive to create EoLS files
1853  lockFULocal2();
1854  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1855  createLumiSectionFiles(i + 1, i, false, true);
1856  unlockFULocal2();
1857  }
1858  }
1859 
1860  if (fileStatus == runEnded)
1861  ls = std::max(currentLumiSection, serverLS);
1862  else if (fileStatus == newFile) {
1863  assert(serverLS >= ls);
1864  ls = serverLS;
1865  } else if (fileStatus == noFile) {
1866  if (serverLS >= ls)
1867  ls = serverLS;
1868  else {
1869  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1870  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1871  sleep(1);
1872  }
1873  }
1874 
1875  return fileStatus;
1876  }
1877 
1879  // create open dir if not already there
1880 
1882  if (!std::filesystem::is_directory(openPath)) {
1883  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1884  std::filesystem::create_directories(openPath);
1885  }
1886  }
1887 
1889  std::ifstream ij(file);
1890  Json::Value deserializeRoot;
1892 
1893  if (!reader.parse(ij, deserializeRoot)) {
1894  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1895  return -1;
1896  }
1897 
1898  int ret = deserializeRoot.get("lastLS", "").asInt();
1899  return ret;
1900  }
1901 
1903  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1905  struct stat buf;
1906  unsigned int lscount = 1;
1907  do {
1908  std::stringstream ss;
1909  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1910  fullpath = ss.str();
1911  lscount++;
1912  } while (stat(fullpath.c_str(), &buf) == 0);
1913  return lscount - 1;
1914  }
1915 
1916  //if transferSystem PSet is present in the menu, we require it to be complete and consistent for all specified streams
1918  if (transferSystemJson_)
1919  return;
1920 
1921  transferSystemJson_.reset(new Json::Value);
1922  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1923  if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
1924  const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
1925 
1926  Json::Value destinationsVal(Json::arrayValue);
1927  std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
1928  for (auto& dest : destinations)
1929  destinationsVal.append(dest);
1930  (*transferSystemJson_)["destinations"] = destinationsVal;
1931 
1932  Json::Value modesVal(Json::arrayValue);
1933  std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
1934  for (auto& mode : modes)
1935  modesVal.append(mode);
1936  (*transferSystemJson_)["transferModes"] = modesVal;
1937 
1938  for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
1939  if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
1940  const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
1941  Json::Value streamVal;
1942  for (auto& mode : modes) {
1943  //validation
1944  if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
1945  throw cms::Exception("EvFDaqDirector")
1946  << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
1947  << ")";
1948  std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
1949 
1950  Json::Value sDestsValue(Json::arrayValue);
1951 
1952  if (streamDestinations.empty())
1953  throw cms::Exception("EvFDaqDirector")
1954  << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;
1955 
1956  for (auto& sdest : streamDestinations) {
1957  bool sDestValid = false;
1958  sDestsValue.append(sdest);
1959  for (auto& dest : destinations) {
1960  if (dest == sdest)
1961  sDestValid = true;
1962  }
1963  if (!sDestValid)
1964  throw cms::Exception("EvFDaqDirector")
1965  << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
1966  << ", dest:" << sdest;
1967  }
1968  streamVal[mode] = sDestsValue;
1969  }
1970  (*transferSystemJson_)[psKeyItr->first] = streamVal;
1971  }
1972  }
1973  } else {
1974  if (requireTSPSet_)
1975  throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
1976  }
1977  }
1978 
1980  std::string streamRequestName;
1981  if (transferSystemJson_->isMember(stream.c_str()))
1982  streamRequestName = stream;
1983  else {
1984  std::stringstream msg;
1985  msg << "Transfer system mode definitions missing for -: " << stream;
1986  if (requireTSPSet_)
1987  throw cms::Exception("EvFDaqDirector") << msg.str();
1988  else {
1989  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1990  return std::string("Failsafe");
1991  }
1992  }
1993  //return empty if strict check parameter is not on
1994  if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1995  edm::LogWarning("EvFDaqDirector")
1996  << "Selected mode string is not provided as DaqDirector parameter."
1997  << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1998  return std::string("Failsafe");
1999  }
2000  if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
2001  throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
2002  }
2003  //check if stream has properly listed transfer stream
2004  if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
2005  std::stringstream msg;
2006  msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
2007  if (requireTSPSet_)
2008  throw cms::Exception("EvFDaqDirector") << msg.str();
2009  else
2010  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
2011  return std::string("Failsafe");
2012  }
2013  Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");
2014 
2015  //flatten string json::Array into CSV std::string
2016  std::string ret;
2017  for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
2018  if (!ret.empty())
2019  ret += ",";
2020  ret += (*it).asString();
2021  }
2022  return ret;
2023  }
2024 
2026  if (mergeTypePset_.empty())
2027  return;
2028  if (!mergeTypeMap_.empty())
2029  return;
2030  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
2031  if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
2032  const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
2033  for (const std::string& pname : tsPset.getParameterNames()) {
2034  std::string streamType = tsPset.getParameter<std::string>(pname);
2035  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2036  mergeTypeMap_.insert(ac, pname);
2037  ac->second = streamType;
2038  ac.release();
2039  }
2040  }
2041  }
2042 
2044  tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
2045  if (mergeTypeMap_.find(search_ac, stream))
2046  return search_ac->second;
2047 
2048  edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
2049  std::string defaultName = MergeTypeNames_[defaultType];
2050  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2051  mergeTypeMap_.insert(ac, stream);
2052  ac->second = defaultName;
2053  ac.release();
2054  return defaultName;
2055  }
2056 
2058  std::string proc_flag = run_dir_ + "/processing";
2059  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2060  close(proc_flag_fd);
2061  }
2062 
2063  struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
2064 #ifdef __APPLE__
2065  return {start, len, pid, type, whence};
2066 #else
2067  return {type, whence, start, len, pid};
2068 #endif
2069  }
2070 
2072  struct stat buf;
2073  return (stat(input_throttled_file_.c_str(), &buf) == 0);
2074  }
2075 
2077  struct stat buf;
2078  return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2079  }
2080 
2081 } // namespace evf
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:43
unsigned int nThreads_
Definition: fillJson.h:27
int def(FILE *, FILE *, int)
struct flock fu_rw_flk
unsigned int nConcurrentLumis_
const_iterator end() const
std::string run_string_
LuminosityBlockNumber_t luminosityBlock() const
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::string bolsFileName(const unsigned int run, const unsigned int ls)
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
void watchPreallocate(Preallocate::slot_type const &iSlot)
boost::asio::io_service io_service_
const_iterator begin() const
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::string fulockfile_
def create(alignables, pedeDump, additionalData, outputFile, config)
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
std::shared_ptr< Json::Value > transferSystemJson_
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
jsoncollector::DataPointDefinition * dpd_
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::string getFFFParamsFilePathOnBU() const
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
Int asInt() const
ret
prodAgent to be discontinued
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
pthread_mutex_t init_lock_
ParameterSet const & getParameterSet(std::string const &) const
Value get(UInt index, const Value &defaultValue) const
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Value & append(const Value &value)
Append value to array at the end.
std::string to_string(const V &value)
Definition: OMSAccess.h:77
bool lumisectionDiscarded(unsigned int ls)
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
reader
Definition: DQM.py:105
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:171
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string getOpenInitFilePath(std::string const &stream) const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
assert(be >=bs)
bool isExceptionOnData(unsigned int ls)
std::string getStreamDestinations(std::string const &stream) const
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: Electron.h:6
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
std::string getInitTempFilePath(std::string const &stream) const
std::string getEoRFilePathOnFU() const
struct flock fu_rw_fulk
std::string hltSourceDirectory_
unsigned int startFromLS_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::mutex * fileDeleteLockPtr_
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
std::string mergeTypePset_
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def chmod(path, mode)
Definition: eostools.py:294
std::string getRunOpenDirPath() const
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
std::string stopFilePath_
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string bu_base_dir_
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
void removeFile(unsigned int ls, unsigned int index)
unsigned int stop_ls_override_
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::string selectedTransferMode_
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string input_throttled_file_
std::string getBoLSFilePathOnFU(const unsigned int ls) const
std::string eorFileName(const unsigned int run)
int readLastLSEntry(std::string const &file)
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string stopFilePathPid_
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:62
unsigned int eolsNFilesIndex_
std::string & baseRunDir()
std::string run_nstring_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
std::string getInitFilePath(std::string const &stream) const
void createProcessingNotificationMaybe() const
Log< level::Info, false > LogInfo
void openFULockfileStream(bool create)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
int getNFilesFromEoLS(std::string BUEoLSFile)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
def ls(path, rec=False)
Definition: eostools.py:349
void preBeginRun(edm::GlobalContext const &globalContext)
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned long long uint64_t
Definition: Time.h:13
void checkMergeTypePSet(edm::ProcessContext const &pc)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
std::vector< std::string > const & getNames() const
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:223
tuple msg
Definition: mps_check.py:286
evf::FastMonitoringService * fms_
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::string bu_run_dir_
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
ParameterSet const & getParameterSet(ParameterSetID const &id)
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
std::string getEoRFilePath() const
def mkdir(path)
Definition: eostools.py:251
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
std::string findHighestRunDir()
Definition: DirManager.cc:23
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
void postEndRun(edm::GlobalContext const &globalContext)
std::string discard_ls_filestem_
Unserialize a JSON document into a Value.
Definition: reader.h:16
static const std::vector< std::string > MergeTypeNames_
unsigned int getLumisectionToStart() const
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
void checkTransferSystemPSet(edm::ProcessContext const &pc)
std::string fileBrokerPort_
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
std::string initTempFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
void preallocate(edm::service::SystemBounds const &bounds)
Log< level::Warning, false > LogWarning
Iterator for object and array value.
Definition: value.h:908
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
std::string fileBrokerHost_
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned int fuLockPollInterval_
std::string eolsFileName(const unsigned int run, const unsigned int ls)
unsigned int nStreams_
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
ParameterSetID const & parameterSetID() const
std::string bu_run_open_dir_
#define LogDebug(id)
array value (ordered list)
Definition: value.h:30