CMS 3D CMS Logo

EvFDaqDirector.cc
Go to the documentation of this file.
10 
18 
19 #include <iostream>
20 #include <fstream>
21 #include <sstream>
22 #include <sys/time.h>
23 #include <unistd.h>
24 #include <cstdio>
25 #include <boost/algorithm/string.hpp>
26 
27 //using boost::asio::ip::tcp;
28 
29 //#define DEBUG
30 
31 using namespace jsoncollector;
32 
33 namespace evf {
34 
35  //for enum MergeType
36  const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
37 
39  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
40  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
41  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
42  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
43  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
44  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
45  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
46  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
47  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
48  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
49  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
50  requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
51  selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
52  mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
53  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
54  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
55  hostname_(""),
56  bu_readlock_fd_(-1),
57  bu_writelock_fd_(-1),
58  fu_readwritelock_fd_(-1),
59  fulocal_rwlock_fd_(-1),
60  fulocal_rwlock_fd2_(-1),
61  bu_w_lock_stream(nullptr),
62  bu_r_lock_stream(nullptr),
63  fu_rw_lock_stream(nullptr),
64  dirManager_(base_dir_),
65  previousFileSize_(0),
66  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
67  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
68  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
69  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
70  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
71  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
77 
78  //save hostname for later
79  char hostname[33];
80  gethostname(hostname, 32);
81  hostname_ = hostname;
82 
83  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
84  if (fuLockPollIntervalPtr) {
85  try {
86  fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
87  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
88  << " us";
89  } catch (const std::exception&) {
90  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
91  }
92  }
93 
94  //override file service parameter if specified by environment
95  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
96  if (fileBrokerParamPtr) {
97  try {
98  useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
99  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
100  } catch (const std::exception&) {
101  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
102  }
103  }
104  if (useFileBroker_) {
106  //find BU data address from hltd configuration
108  struct stat buf;
109  if (stat("/etc/appliance/bus.config", &buf) == 0) {
110  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
111  std::getline(busconfig, fileBrokerHost_);
112  }
113  if (fileBrokerHost_.empty())
114  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
115  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
116  throw cms::Exception("EvFDaqDirector")
117  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
118 
119  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
120  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
121  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
122  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
123  }
124 
125  char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
126  if (startFromLSPtr) {
127  try {
128  startFromLS_ = std::stoul(std::string(startFromLSPtr));
129  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
130  } catch (const std::exception&) {
131  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
132  }
133  }
134 
135  //override file service parameter if specified by environment
136  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
137  if (fileBrokerUseLockParamPtr) {
138  try {
139  fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
140  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
142  } catch (const std::exception&) {
143  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
144  }
145  }
146  }
147 
149  std::stringstream ss;
150  ss << "run" << std::setfill('0') << std::setw(6) << run_;
151  run_string_ = ss.str();
152  ss = std::stringstream();
153  ss << run_;
154  run_nstring_ = ss.str();
155  run_dir_ = base_dir_ + "/" + run_string_;
156  input_throttled_file_ = run_dir_ + "/input_throttle";
157  ss = std::stringstream();
158  ss << getpid();
159  pid_ = ss.str();
160 
161  // check if base dir exists or create it accordingly
162  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
163  if (retval != 0 && errno != EEXIST) {
164  throw cms::Exception("DaqDirector")
165  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
166  }
167 
168  //create run dir in base dir
169  umask(0);
170  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
171  if (retval != 0 && errno != EEXIST) {
172  throw cms::Exception("DaqDirector")
173  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
174  }
175 
176  //create fu-local.lock in run open dir
177  if (!directorBU_) {
179  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
181  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
182  if (fulocal_rwlock_fd_ == -1)
183  throw cms::Exception("DaqDirector")
184  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
185  chmod(fulocal_lock_.c_str(), 0777);
186  fsync(fulocal_rwlock_fd_);
187  //open second fd for another input source thread
189  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
190  if (fulocal_rwlock_fd2_ == -1)
191  throw cms::Exception("DaqDirector")
192  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
193  }
194 
195  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
196  //for BU, it is created at this point
197  if (directorBU_) {
199  std::string bulockfile = bu_run_dir_ + "/bu.lock";
200  fulockfile_ = bu_run_dir_ + "/fu.lock";
201 
202  //make or find bu run dir
203  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
204  if (retval != 0 && errno != EEXIST) {
205  throw cms::Exception("DaqDirector")
206  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno) << "\n";
207  }
208  bu_run_open_dir_ = bu_run_dir_ + "/open";
209  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
210  if (retval != 0 && errno != EEXIST) {
211  throw cms::Exception("DaqDirector")
212  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno) << "\n";
213  }
214 
215  // the BU director does not need to know about the fu lock
216  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
217  if (bu_writelock_fd_ == -1)
218  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
219  else
220  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
221  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
222  if (bu_w_lock_stream == nullptr)
223  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
224 
225  // BU INITIALIZES LOCK FILE
226  // FU LOCK FILE OPEN
227  openFULockfileStream(true);
229  fflush(fu_rw_lock_stream);
230  close(fu_readwritelock_fd_);
231 
232  if (!hltSourceDirectory_.empty()) {
233  struct stat buf;
234  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
235  std::string hltdir = bu_run_dir_ + "/hlt";
236  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
237  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
238  if (retval != 0 && errno != EEXIST)
239  throw cms::Exception("DaqDirector")
240  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno) << "\n";
241 
242  std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
243  std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
244 
245  std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
246  for (auto& optfile : optfiles) {
247  try {
248  std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
249  } catch (...) {
250  }
251  }
252 
253  std::filesystem::rename(tmphltdir, hltdir);
254  } else
255  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
256  }
257  //else{}//no configuration specified
258  } else {
259  // for FU, check if bu base dir exists
260 
261  retval = mkdir(bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
262  if (retval != 0 && errno != EEXIST) {
263  throw cms::Exception("DaqDirector")
264  << " Error checking for bu base dir -: " << bu_base_dir_ << " mkdir error:" << strerror(errno) << "\n";
265  }
266 
268  fulockfile_ = bu_run_dir_ + "/fu.lock";
269  openFULockfileStream(false);
270  }
271 
272  pthread_mutex_init(&init_lock_, nullptr);
273 
274  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
275  std::stringstream sstp;
276  sstp << stopFilePath_ << "_pid" << pid_;
277  stopFilePathPid_ = sstp.str();
278 
279  if (!directorBU_) {
280  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
281  struct stat statbuf;
282  if (!stat(defPath.c_str(), &statbuf))
283  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
284  else {
285  //look in source directory if not present in ramdisk
286  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
287  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
288  if (stat(defPath.c_str(), &statbuf)) {
289  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
290  if (stat(defPath.c_str(), &statbuf)) {
291  defPath = defPathSuffix;
292  }
293  }
294  }
295  dpd_ = new DataPointDefinition();
296  std::string defLabel = "data";
297  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
298  }
299  }
300 
302  //close server connection
303  if (socket_.get() && socket_->is_open()) {
304  boost::system::error_code ec;
305  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
306  socket_->close(ec);
307  }
308 
309  if (fulocal_rwlock_fd_ != -1) {
310  unlockFULocal();
311  close(fulocal_rwlock_fd_);
312  }
313 
314  if (fulocal_rwlock_fd2_ != -1) {
315  unlockFULocal2();
316  close(fulocal_rwlock_fd2_);
317  }
318  }
319 
321  initRun();
322 
323  nThreads_ = bounds.maxNumberOfStreams();
324  nStreams_ = bounds.maxNumberOfThreads();
325  }
326 
329  desc.setComment(
330  "Service used for file locking arbitration and for propagating information between other EvF components");
331  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
332  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
333  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
334  desc.addUntracked<bool>("useFileBroker", false)
335  ->setComment("Use BU file service to grab input data instead of NFS file locking");
336  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
337  ->setComment("Allow service to discover BU address from hltd configuration");
338  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
339  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
340  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
341  ->setComment("Use keep alive to avoid using large number of sockets");
342  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
343  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
344  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
345  ->setComment("Lock polling interval in microseconds for the input directory file lock");
346  desc.addUntracked<bool>("outputAdler32Recheck", false)
347  ->setComment("Check Adler32 of per-process output files while micro-merging");
348  desc.addUntracked<bool>("requireTransfersPSet", false)
349  ->setComment("Require complete transferSystem PSet in the process configuration");
350  desc.addUntracked<std::string>("selectedTransferMode", "")
351  ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
352  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
353  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
354  desc.addUntracked<std::string>("mergingPset", "")
355  ->setComment("Name of merging PSet to look for merging type definitions for streams");
356  descriptions.add("EvFDaqDirector", desc);
357  }
358 
361  checkMergeTypePSet(pc);
362  }
363 
364  void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
365  //assert(run_ == id.run());
366 
367  // check if the requested run is the latest one - issue a warning if it isn't
369  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
370  << ". This is not the highest run " << dirManager_.findHighestRunDir();
371  }
372  }
373 
374  void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
375  close(bu_readlock_fd_);
376  close(bu_writelock_fd_);
377  if (directorBU_) {
378  std::string filename = bu_run_dir_ + "/bu.lock";
380  }
381  }
382 
384  //delete all files belonging to just closed lumi
385  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
387  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
388  return;
389  }
390 
391  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
392  auto it = filesToDeletePtr_->begin();
393  while (it != filesToDeletePtr_->end()) {
394  if (it->second->lumi_ == ls && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
395  it = filesToDeletePtr_->erase(it);
396  } else
397  it++;
398  }
399  }
400 
401  std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
403  }
404 
405  std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
407  }
408 
409  std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
410  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
411  }
412 
413  std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
414  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
415  }
416 
417  std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
419  }
420 
423  }
424 
427  }
428 
431  }
432 
435  }
436 
439  }
440 
442  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
443  }
444 
447  }
448 
450  std::string const& stream) const {
452  }
453 
455  std::string const& stream) const {
457  }
458 
460  std::string const& stream) const {
462  }
463 
466  }
467 
470  }
471 
474  }
475 
477  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
478  }
479 
481  return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
482  }
483 
485  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
486  }
487 
489 
491 
492  std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
493 
495  int retval = remove(filename.c_str());
496  if (retval != 0)
497  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
498  << ". error = " << strerror(errno);
499  }
500 
501  void EvFDaqDirector::removeFile(unsigned int ls, unsigned int index) { removeFile(getRawFilePath(ls, index)); }
502 
504  std::string& nextFile,
505  uint32_t& fsize,
506  uint16_t& rawHeaderSize,
507  uint64_t& lockWaitTime,
508  bool& setExceptionState) {
509  EvFDaqDirector::FileStatus fileStatus = noFile;
510  rawHeaderSize = 0;
511 
512  int retval = -1;
513  int lock_attempts = 0;
514  long total_lock_attempts = 0;
515 
516  struct stat buf;
517  int stopFileLS = -1;
518  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
519  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
520  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
521  if (stopFileCheck == 0)
522  stopFileLS = readLastLSEntry(stopFilePath_);
523  else
524  stopFileLS = 1; //stop without drain if only pid is stopped
525  if (!stop_ls_override_) {
526  //if lumisection is higher than in stop file, should quit at next from current
527  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
528  stopFileLS = stop_ls_override_ = ls;
529  } else
530  stopFileLS = stop_ls_override_;
531  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
532  << stopFileLS;
533  //return runEnded;
534  } else //if file was removed before reaching stop condition, reset this
535  stop_ls_override_ = 0;
536 
537  timeval ts_lockbegin;
538  gettimeofday(&ts_lockbegin, nullptr);
539 
540  while (retval == -1) {
541  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
542  if (retval == -1)
543  usleep(fuLockPollInterval_);
544  else
545  continue;
546 
547  lock_attempts += fuLockPollInterval_;
548  total_lock_attempts += fuLockPollInterval_;
549  if (lock_attempts > 5000000 || errno == 116) {
550  if (errno == 116)
551  edm::LogWarning("EvFDaqDirector")
552  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
553  else
554  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
555  "fu.lock file are present -: errno "
556  << errno << ":" << strerror(errno) << std::endl;
557 
558  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
559  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
560  ls++;
561  return noFile;
562  }
563 
564  if (stat(bu_run_dir_.c_str(), &buf) != 0)
565  return runEnded;
566  if (stat(fulockfile_.c_str(), &buf) != 0)
567  return runEnded;
568 
569  lock_attempts = 0;
570  }
571  if (total_lock_attempts > 5 * 60000000) {
572  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
573  return runAbort;
574  }
575  }
576 
577  timeval ts_lockend;
578  gettimeofday(&ts_lockend, nullptr);
579  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
580  if (deltat > 0.)
581  lockWaitTime = deltat;
582 
583  if (retval != 0)
584  return fileStatus;
585 
586 #ifdef DEBUG
587  timeval ts_lockend;
588  gettimeofday(&ts_lockend, 0);
589 #endif
590 
591  //open another lock file FD after the lock using main fd has been acquired
592  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
593  if (fu_readwritelock_fd2 == -1)
594  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
595  << " create. error:" << strerror(errno);
596 
597  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
598 
599  // if the stream is readable
600  if (fu_rw_lock_stream2 != nullptr) {
601  unsigned int readLs, readIndex;
602  int check = 0;
603  // rewind the stream
604  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
605  // if rewinded ok
606  if (check == 0) {
607  // read its' values
608  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
609  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
610 
611  unsigned int currentLs = readLs;
612  bool bumpedOk = false;
613  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
614  //no lock file write in this case
615  if (ls && ls + 1 < currentLs)
616  ls++;
617  else {
618  // try to bump (look for new index or EoLS file)
619  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
620  //avoid 2 lumisections jump
621  if (ls && readLs > currentLs && currentLs > ls) {
622  ls++;
623  readLs = currentLs = ls;
624  readIndex = 0;
625  bumpedOk = false;
626  //no write to lock file
627  } else {
628  if (ls == 0 && readLs > currentLs) {
629  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
630  //in this case there is no new file in the same LS
631  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
632  readLs = currentLs;
633  readIndex = 0;
634  bumpedOk = false;
635  //no write to lock file
636  }
637  //update return LS value
638  ls = readLs;
639  }
640  }
641  if (bumpedOk) {
642  // there is a new index file to grab, lock file needs to be updated
643  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
644  if (check == 0) {
645  ftruncate(fu_readwritelock_fd2, 0);
646  // write next index in the file, which is the file the next process should take
647  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
648  fflush(fu_rw_lock_stream2);
649  fsync(fu_readwritelock_fd2);
650  fileStatus = newFile;
651  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
652  } else {
653  edm::LogError("EvFDaqDirector")
654  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
655  setExceptionState = true;
656  return noFile;
657  }
658  } else if (currentLs < readLs) {
659  //there is no new file in next LS (yet), but lock file can be updated to the next LS
660  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
661  if (check == 0) {
662  ftruncate(fu_readwritelock_fd2, 0);
663  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
664  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
665  fflush(fu_rw_lock_stream2);
666  fsync(fu_readwritelock_fd2);
667  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
668  } else {
669  edm::LogError("EvFDaqDirector")
670  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
671  setExceptionState = true;
672  return noFile;
673  }
674  }
675  } else {
676  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
677  << strerror(errno);
678  }
679  } else {
680  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
681  }
682  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
683 
684 #ifdef DEBUG
685  timeval ts_preunlock;
686  gettimeofday(&ts_preunlock, 0);
687  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
688  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
689 #endif
690 
691  //if new json is present, lock file which FedRawDataInputSource will later unlock
692  if (fileStatus == newFile)
693  lockFULocal();
694 
695  //release lock at this point
696  int retvalu = -1;
697  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
698  if (retvalu == -1)
699  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
700 
701 #ifdef DEBUG
702  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
703 #endif
704 
705  if (fileStatus == noFile) {
706  struct stat buf;
707  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
708  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
709  fileStatus = runEnded;
710  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
711  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
712  fileStatus = runEnded;
713  }
714  }
715  return fileStatus;
716  }
717 
719  std::ifstream ij(BUEoLSFile);
720  Json::Value deserializeRoot;
722 
723  if (!reader.parse(ij, deserializeRoot)) {
724  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
725  return -1;
726  }
727 
729  DataPoint dp;
730  dp.deserialize(deserializeRoot);
731 
732  //read definition
733  if (readEolsDefinition_) {
734  //std::string def = boost::algorithm::trim(dp.getDefinition());
735  std::string def = dp.getDefinition();
736  if (def.empty())
737  readEolsDefinition_ = false;
738  while (!def.empty()) {
740  if (def.find('/') == 0)
741  fullpath = def;
742  else
743  fullpath = bu_run_dir_ + '/' + def;
744  struct stat buf;
745  if (stat(fullpath.c_str(), &buf) == 0) {
746  DataPointDefinition eolsDpd;
747  std::string defLabel = "legend";
748  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
749  if (eolsDpd.getNames().empty()) {
750  //try with "data" label if "legend" format is not used
751  eolsDpd = DataPointDefinition();
752  defLabel = "data";
753  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
754  }
755  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
756  if (eolsDpd.getNames().at(i) == "NFiles")
758  readEolsDefinition_ = false;
759  break;
760  }
761  //check if we can still find definition
762  if (def.size() <= 1 || def.find('/') == std::string::npos) {
763  readEolsDefinition_ = false;
764  break;
765  }
766  def = def.substr(def.find('/') + 1);
767  }
768  }
769 
770  if (dp.getData().size() > eolsNFilesIndex_)
771  data = dp.getData()[eolsNFilesIndex_];
772  else {
773  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
774  return -1;
775  }
776  return std::stoi(data);
777  }
778 
779  bool EvFDaqDirector::bumpFile(unsigned int& ls,
780  unsigned int& index,
781  std::string& nextFile,
782  uint32_t& fsize,
783  uint16_t& rawHeaderSize,
784  int maxLS,
785  bool& setExceptionState) {
786  if (previousFileSize_ != 0) {
787  if (!fms_) {
789  }
790  if (fms_)
792  previousFileSize_ = 0;
793  }
794  nextFile = "";
795 
796  //reached limit
797  if (maxLS >= 0 && ls > (unsigned int)maxLS)
798  return false;
799 
800  struct stat buf;
801  std::stringstream ss;
802  unsigned int nextIndex = index;
803  nextIndex++;
804 
805  // 1. Check suggested file
806  std::string nextFileJson = getInputJsonFilePath(ls, index);
807  if (stat(nextFileJson.c_str(), &buf) == 0) {
808  fsize = previousFileSize_ = buf.st_size;
809  nextFile = nextFileJson;
810  return true;
811  }
812  // 2. No file -> lumi ended? (and how many?)
813  else {
814  // 3. No file -> check for standalone raw file
815  std::string nextFileRaw = getRawFilePath(ls, index);
816  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
817  fsize = previousFileSize_ = buf.st_size;
818  nextFile = nextFileRaw;
819  return true;
820  }
821 
822  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
823 
824  if (stat(BUEoLSFile.c_str(), &buf) == 0) {
825  // recheck that no raw file appeared in the meantime
826  if (stat(nextFileJson.c_str(), &buf) == 0) {
827  fsize = previousFileSize_ = buf.st_size;
828  nextFile = nextFileJson;
829  return true;
830  }
831  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
832  fsize = previousFileSize_ = buf.st_size;
833  nextFile = nextFileRaw;
834  return true;
835  }
836 
837  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
838  if (indexFilesInLS < 0)
839  //parsing failed
840  return false;
841  else {
842  //check index
843  if ((int)index < indexFilesInLS) {
844  //we have 2 files, and check for 1 failed... retry (2 will never be here)
845  edm::LogError("EvFDaqDirector")
846  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
847  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
848  setExceptionState = true;
849  return false;
850  }
851  }
852  // this lumi ended, check for files
853  ++ls;
854  index = 0;
855 
856  //reached limit
857  if (maxLS >= 0 && ls > (unsigned int)maxLS)
858  return false;
859 
860  nextFileJson = getInputJsonFilePath(ls, 0);
861  nextFileRaw = getRawFilePath(ls, 0);
862  if (stat(nextFileJson.c_str(), &buf) == 0) {
863  // a new file was found at new lumisection, index 0
864  fsize = previousFileSize_ = buf.st_size;
865  nextFile = nextFileJson;
866  return true;
867  }
868  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
869  fsize = previousFileSize_ = buf.st_size;
870  nextFile = nextFileRaw;
871  return true;
872  }
873  return false;
874  }
875  }
876  // no new file found
877  return false;
878  }
879 
881  if (fu_rw_lock_stream == nullptr)
882  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
883  else {
884  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
885  unsigned int readLs = 1, readIndex = 0;
886  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
887  }
888  }
889 
891  if (create) {
893  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
894  chmod(fulockfile_.c_str(), 0766);
895  } else {
896  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
897  }
898  if (fu_readwritelock_fd_ == -1)
899  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
900  << " create:" << create << " error:" << strerror(errno);
901  else
902  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
903 
904  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
905  if (fu_rw_lock_stream == nullptr)
906  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
907  }
908 
909  void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
910 
911  void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
912 
914  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
915  flock(fulocal_rwlock_fd_, LOCK_SH);
916  }
917 
919  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
920  flock(fulocal_rwlock_fd_, LOCK_UN);
921  }
922 
924  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
925  flock(fulocal_rwlock_fd2_, LOCK_EX);
926  }
927 
929  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
930  flock(fulocal_rwlock_fd2_, LOCK_UN);
931  }
932 
933  void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
934  //used for backpressure mechanisms and monitoring
935  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
936  struct stat buf;
937  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
938  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
939  close(bol_fd);
940  }
941  }
942 
943  void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
944  const uint32_t currentLumiSection,
945  bool doCreateBoLS,
946  bool doCreateEoLS) {
947  if (currentLumiSection > 0) {
948  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
949  struct stat buf;
950  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
951  if (!found) {
952  if (doCreateEoLS) {
953  int eol_fd =
954  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
955  close(eol_fd);
956  }
957  if (doCreateBoLS)
958  createBoLSFile(lumiSection, false);
959  }
960  } else if (doCreateBoLS) {
961  createBoLSFile(lumiSection, true); //needed for initial lumisection
962  }
963  }
964 
966  int& rawFd,
967  uint16_t& rawHeaderSize,
968  uint32_t& lsFromHeader,
969  int32_t& eventsFromHeader,
970  int64_t& fileSizeFromHeader,
971  bool requireHeader,
972  bool retry,
973  bool closeFile) {
974  int infile;
975 
976  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
977  if (retry) {
978  edm::LogWarning("EvFDaqDirector")
979  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
980  return parseFRDFileHeader(rawSourcePath,
981  rawFd,
982  rawHeaderSize,
983  lsFromHeader,
984  eventsFromHeader,
985  fileSizeFromHeader,
986  requireHeader,
987  false,
988  closeFile);
989  } else {
990  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
991  edm::LogError("EvFDaqDirector")
992  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
993  if (errno == ENOENT)
994  return 1; // error && file not found
995  else
996  return -1;
997  }
998  }
999  }
1000 
1001  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
1002  FRDFileHeader_v1 fileHead;
1003 
1004  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1005  if (closeFile) {
1006  close(infile);
1007  infile = -1;
1008  }
1009 
1010  if (sz_read < 0) {
1011  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - unable to read " << rawSourcePath << " : "
1012  << strerror(errno);
1013  if (infile != -1)
1014  close(infile);
1015  return -1;
1016  }
1017  if ((size_t)sz_read < buf_sz) {
1018  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - file smaller than header: " << rawSourcePath;
1019  if (infile != -1)
1020  close(infile);
1021  return -1;
1022  }
1023 
1024  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1025 
1026  if (frd_version == 0) {
1027  //no header (specific sequence not detected)
1028  if (requireHeader) {
1029  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1030  if (infile != -1)
1031  close(infile);
1032  return -1;
1033  } else {
1034  //no header, but valid file
1035  lseek(infile, 0, SEEK_SET);
1036  rawHeaderSize = 0;
1037  lsFromHeader = 0;
1038  eventsFromHeader = -1;
1039  fileSizeFromHeader = -1;
1040  }
1041  } else {
1042  //version 1 header
1043  uint32_t headerSizeRaw = fileHead.headerSize_;
1044  if (headerSizeRaw < buf_sz) {
1045  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1046  << " v:" << frd_version;
1047  if (infile != -1)
1048  close(infile);
1049  return -1;
1050  }
1051  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1052  lsFromHeader = fileHead.lumiSection_;
1053  eventsFromHeader = (int32_t)fileHead.eventCount_;
1054  fileSizeFromHeader = (int64_t)fileHead.fileSize_;
1055  rawHeaderSize = fileHead.headerSize_;
1056  }
1057  rawFd = infile;
1058  return 0; //OK
1059  }
1060 
1061  bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
1062  int infile;
1063  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1064  edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1065  << strerror(errno);
1066  return false;
1067  }
1068  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
1069  FRDFileHeader_v1 fileHead;
1070 
1071  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
1072 
1073  if (sz_read < 0) {
1074  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << rawSourcePath << " : "
1075  << strerror(errno);
1076  if (infile != -1)
1077  close(infile);
1078  return false;
1079  }
1080  if ((size_t)sz_read < buf_sz) {
1081  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << rawSourcePath;
1082  if (infile != -1)
1083  close(infile);
1084  return false;
1085  }
1086 
1087  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
1088 
1089  close(infile);
1090 
1091  if (frd_version > 0) {
1092  rawHeaderSize = fileHead.headerSize_;
1093  return true;
1094  }
1095 
1096  rawHeaderSize = 0;
1097  return false;
1098  }
1099 
1101  int& rawFd,
1102  uint16_t& rawHeaderSize,
1103  int64_t& fileSizeFromHeader,
1104  bool& fileFound,
1105  uint32_t serverLS,
1106  bool closeFile) {
1107  fileFound = true;
1108 
1109  //take only first three tokens delimited by "_" in the renamed raw file name
1110  std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1111  size_t pos = 0, n_tokens = 0;
1112  while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1113  }
1114  std::string reducedJsonStem = jsonStem.substr(0, pos);
1115 
1116  std::ostringstream fileNameWithPID;
1117  //should be ported to use fffnaming
1118  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1119 
1120  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1121 
1122  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1123 
1124  //parse RAW file header if it exists
1125  uint32_t lsFromRaw;
1126  int32_t nbEventsWrittenRaw;
1127  int64_t fileSizeFromRaw;
1128  auto ret = parseFRDFileHeader(
1129  rawSourcePath, rawFd, rawHeaderSize, lsFromRaw, nbEventsWrittenRaw, fileSizeFromRaw, true, true, closeFile);
1130  if (ret != 0) {
1131  if (ret == 1)
1132  fileFound = false;
1133  return -1;
1134  }
1135 
1136  int outfile;
1137  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1138  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1139  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1140  if (errno == EEXIST) {
1141  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1142  << " : ";
1143  return -1;
1144  }
1145  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1146  << strerror(errno);
1147  struct stat out_stat;
1148  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1149  edm::LogWarning("EvFDaqDirector")
1150  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1151  << jsonDestPath;
1152  if (unlink(jsonDestPath.c_str()) == -1) {
1153  edm::LogWarning("EvFDaqDirector")
1154  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1155  }
1156  }
1157  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1158  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1159  << jsonDestPath << " : " << strerror(errno);
1160  return -1;
1161  }
1162  }
1163  //write JSON file (TODO: use jsoncpp)
1164  std::stringstream ss;
1165  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1166  std::string sstr = ss.str();
1167 
1168  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1169  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1170  << " : " << strerror(errno);
1171  return -1;
1172  }
1173  close(outfile);
1174  if (serverLS && serverLS != lsFromRaw)
1175  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1176  << " and raw file header LS " << lsFromRaw;
1177 
1178  fileSizeFromHeader = fileSizeFromRaw;
1179  return nbEventsWrittenRaw;
1180  }
1181 
1183  std::string const& rawSourcePath,
1184  int64_t& fileSizeFromJson,
1185  bool& fileFound) {
1186  fileFound = true;
1187 
1188  //should be ported to use fffnaming
1189  std::ostringstream fileNameWithPID;
1190  fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1191  << std::setw(5) << pid_ << ".jsn";
1192 
1193  // assemble json destination path
1194  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1195 
1196  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1197 
1198  int infile = -1, outfile = -1;
1199 
1200  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1201  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1202  << strerror(errno);
1203  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1204  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1205  << jsonSourcePath << " : " << strerror(errno);
1206  if (errno == ENOENT)
1207  fileFound = false;
1208  return -1;
1209  }
1210  }
1211 
1212  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1213  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1214  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1215  if (errno == EEXIST) {
1216  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1217  << " : ";
1218  ::close(infile);
1219  return -1;
1220  }
1221  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1222  << strerror(errno);
1223  struct stat out_stat;
1224  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1225  edm::LogWarning("EvFDaqDirector")
1226  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1227  if (unlink(jsonDestPath.c_str()) == -1) {
1228  edm::LogWarning("EvFDaqDirector")
1229  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1230  }
1231  }
1232  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1233  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1234  << jsonDestPath << " : " << strerror(errno);
1235  ::close(infile);
1236  return -1;
1237  }
1238  }
1239  //copy contents
1240  const std::size_t buf_sz = 512;
1241  std::size_t tot_written = 0;
1242  std::unique_ptr<char> buf(new char[buf_sz]);
1243 
1244  ssize_t sz, sz_read = 1, sz_write;
1245  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1246  sz_write = 0;
1247  do {
1248  assert(sz_read - sz_write > 0);
1249  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1250  sz_read = sz; // cause read loop termination
1251  break;
1252  }
1253  assert(sz > 0);
1254  sz_write += sz;
1255  tot_written += sz;
1256  } while (sz_write < sz_read);
1257  }
1258  close(infile);
1259  close(outfile);
1260 
1261  if (tot_written > 0) {
1262  //leave file if it was empty for diagnosis
1263  if (unlink(jsonSourcePath.c_str()) == -1) {
1264  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1265  << strerror(errno);
1266  return -1;
1267  }
1268  } else {
1269  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1270  << jsonSourcePath;
1271  return -1;
1272  }
1273 
1274  Json::Value deserializeRoot;
1276 
1277  std::string data;
1278  std::stringstream ss;
1279  bool result;
1280  try {
1281  if (tot_written <= buf_sz) {
1282  result = reader.parse(buf.get(), deserializeRoot);
1283  } else {
1284  //json will normally not be bigger than buf_sz bytes
1285  try {
1286  std::ifstream ij(jsonDestPath);
1287  ss << ij.rdbuf();
1288  } catch (std::filesystem::filesystem_error const& ex) {
1289  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1290  return -1;
1291  }
1292  result = reader.parse(ss.str(), deserializeRoot);
1293  }
1294  if (!result) {
1295  if (tot_written <= buf_sz)
1296  ss << buf.get();
1297  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1298  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1299  << ss.str() << ".";
1300  return -1;
1301  }
1302 
1303  //read BU JSON
1304  DataPoint dp;
1305  dp.deserialize(deserializeRoot);
1306  bool success = false;
1307  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1308  if (dpd_->getNames().at(i) == "NEvents")
1309  if (i < dp.getData().size()) {
1310  data = dp.getData()[i];
1311  success = true;
1312  break;
1313  }
1314  }
1315  if (!success) {
1316  if (!dp.getData().empty())
1317  data = dp.getData()[0];
1318  else {
1319  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1320  << "grabNextJsonFile - "
1321  << " error reading number of events from BU JSON; No input value. data -: " << data;
1322  return -1;
1323  }
1324  }
1325 
1326  //try to read raw file size
1327  fileSizeFromJson = -1;
1328  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1329  if (dpd_->getNames().at(i) == "NBytes") {
1330  if (i < dp.getData().size()) {
1331  std::string dataSize = dp.getData()[i];
1332  try {
1333  fileSizeFromJson = std::stol(dataSize);
1334  } catch (const std::exception&) {
1335  //non-fatal currently, processing can continue without this value
1336  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1337  << "Input value is -: " << dataSize;
1338  }
1339  break;
1340  }
1341  }
1342  }
1343  return std::stoi(data);
1344  } catch (const std::out_of_range& e) {
1345  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1346  << "Input value is -: " << data;
1347  } catch (const std::invalid_argument& e) {
1348  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1349  << "Input value is -: " << data;
1350  } catch (std::runtime_error const& e) {
1351  //Can be thrown by Json parser
1352  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1353  }
1354 
1355  catch (std::exception const& e) {
1356  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1357  } catch (...) {
1358  //unknown exception
1359  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1360  }
1361 
1362  return -1;
1363  }
1364 
1366  std::string data;
1367  try {
1368  // assemble json destination path
1369  std::filesystem::path jsonDestPath(baseRunDir());
1370 
1371  //should be ported to use fffnaming
1372  std::ostringstream fileNameWithPID;
1373  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1374  << ".jsn";
1375  jsonDestPath /= fileNameWithPID.str();
1376 
1377  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1378  try {
1379  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1380  } catch (std::filesystem::filesystem_error const& ex) {
1381  // Input dir gone?
1382  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1383  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1384  sleep(1);
1385  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1386  }
1387  unlockFULocal();
1388 
1389  try {
1390  //sometimes this fails but file gets deleted
1391  std::filesystem::remove(jsonSourcePath);
1392  } catch (std::filesystem::filesystem_error const& ex) {
1393  // Input dir gone?
1394  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1395  } catch (std::exception const& ex) {
1396  // Input dir gone?
1397  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1398  }
1399 
1400  std::ifstream ij(jsonDestPath);
1401  Json::Value deserializeRoot;
1403 
1404  std::stringstream ss;
1405  ss << ij.rdbuf();
1406  if (!reader.parse(ss.str(), deserializeRoot)) {
1407  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1408  << "\nERROR:\n"
1409  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1410  << ss.str() << ".";
1411  throw std::runtime_error("Cannot deserialize input JSON file");
1412  }
1413 
1414  //read BU JSON
1415  std::string data;
1416  DataPoint dp;
1417  dp.deserialize(deserializeRoot);
1418  bool success = false;
1419  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1420  if (dpd_->getNames().at(i) == "NEvents")
1421  if (i < dp.getData().size()) {
1422  data = dp.getData()[i];
1423  success = true;
1424  }
1425  }
1426  if (!success) {
1427  if (!dp.getData().empty())
1428  data = dp.getData()[0];
1429  else
1430  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1431  << " error reading number of events from BU JSON -: No input value " << data;
1432  }
1433  return std::stoi(data);
1434  } catch (std::filesystem::filesystem_error const& ex) {
1435  // Input dir gone?
1436  unlockFULocal();
1437  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1438  } catch (std::runtime_error const& e) {
1439  // Another process grabbed the file and NFS did not register this
1440  unlockFULocal();
1441  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1442  } catch (const std::out_of_range&) {
1443  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1444  << "Input value is -: " << data;
1445  } catch (const std::invalid_argument&) {
1446  edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1447  << "Input value is -: " << data;
1448  } catch (std::exception const& e) {
1449  // BU run directory disappeared?
1450  unlockFULocal();
1451  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1452  }
1453 
1454  return -1;
1455  }
1456 
1458  bool& serverError,
1459  uint32_t& serverLS,
1460  uint32_t& closedServerLS,
1461  std::string& nextFileJson,
1462  std::string& nextFileRaw,
1463  bool& rawHeader,
1464  int maxLS) {
1465  EvFDaqDirector::FileStatus fileStatus = noFile;
1466  serverError = false;
1467 
1468  boost::system::error_code ec;
1469  try {
1470  while (true) {
1471  //socket connect
1472  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1474 
1475  if (ec) {
1476  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1477  serverError = true;
1478  break;
1479  }
1480  }
1481 
1482  boost::asio::streambuf request;
1483  std::ostream request_stream(&request);
1484  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1485  if (maxLS >= 0) {
1486  std::stringstream spath;
1487  spath << path << "&stopls=" << maxLS;
1488  path = spath.str();
1489  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1490  }
1491  request_stream << "GET " << path << " HTTP/1.1\r\n";
1492  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1493  request_stream << "Accept: */*\r\n";
1494  request_stream << "Connection: keep-alive\r\n\r\n";
1495 
1496  boost::asio::write(*socket_, request, ec);
1497  if (ec) {
1498  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1499  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1500  //we got disconnected, try to reconnect to the server before writing the request
1502  if (ec) {
1503  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1504  serverError = true;
1505  break;
1506  }
1507  continue;
1508  }
1509  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1510  serverError = true;
1511  break;
1512  }
1513 
1514  boost::asio::streambuf response;
1515  boost::asio::read_until(*socket_, response, "\r\n", ec);
1516  if (ec) {
1517  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1518  serverError = true;
1519  break;
1520  }
1521 
1522  std::istream response_stream(&response);
1523 
1524  std::string http_version;
1525  response_stream >> http_version;
1526 
1527  response_stream >> serverHttpStatus;
1528 
1529  std::string status_message;
1530  std::getline(response_stream, status_message);
1531  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1532  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1533  serverError = true;
1534  break;
1535  }
1536  if (serverHttpStatus != 200) {
1537  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1538  serverError = true;
1539  break;
1540  }
1541 
1542  // Process the response headers.
1544  while (std::getline(response_stream, header) && header != "\r") {
1545  }
1546 
1547  std::string fileInfo;
1548  std::map<std::string, std::string> serverMap;
1549  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1550  auto pos = fileInfo.find('=');
1551  if (pos == std::string::npos)
1552  continue;
1553  auto stitle = fileInfo.substr(0, pos);
1554  auto svalue = fileInfo.substr(pos + 1);
1555  serverMap[stitle] = svalue;
1556  }
1557 
1558  //check that response run number if correct
1559  auto server_version = serverMap.find("version");
1560  assert(server_version != serverMap.end());
1561 
1562  auto server_run = serverMap.find("runnumber");
1563  assert(server_run != serverMap.end());
1564  assert(run_nstring_ == server_run->second);
1565 
1566  auto server_state = serverMap.find("state");
1567  assert(server_state != serverMap.end());
1568 
1569  auto server_eols = serverMap.find("lasteols");
1570  assert(server_eols != serverMap.end());
1571 
1572  auto server_ls = serverMap.find("lumisection");
1573 
1574  int version_maj = 1;
1575  int version_min = 0;
1576  int version_rev = 0;
1577  {
1578  auto* s_ptr = server_version->second.c_str();
1579  if (!server_version->second.empty() && server_version->second[0] == '"')
1580  s_ptr++;
1581  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1582  if (res < 3) {
1583  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1584  if (res < 2) {
1585  res = sscanf(s_ptr, "%d", &version_maj);
1586  if (res < 1) {
1587  //expecting at least 1 number (major version)
1588  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1589  }
1590  }
1591  }
1592  }
1593 
1594  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1595  if (server_ls != serverMap.end())
1596  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1597  else
1598  serverLS = closedServerLS + 1;
1599 
1600  std::string s_state = server_state->second;
1601  if (s_state == "STARTING") //initial, always empty starting with LS 1
1602  {
1603  auto server_file = serverMap.find("file");
1604  assert(server_file == serverMap.end()); //no file with starting state
1605  fileStatus = noFile;
1606  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1607  } else if (s_state == "READY") {
1608  auto server_file = serverMap.find("file");
1609  if (server_file == serverMap.end()) {
1610  //can be returned by server if files from new LS already appeared but LS is not yet closed
1611  if (serverLS <= closedServerLS)
1612  serverLS = closedServerLS + 1;
1613  fileStatus = noFile;
1614  edm::LogInfo("EvFDaqDirector")
1615  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1616  } else {
1617  std::string filestem;
1618  std::string fileprefix;
1619  auto server_fileprefix = serverMap.find("fileprefix");
1620 
1621  if (server_fileprefix != serverMap.end()) {
1622  auto pssize = server_fileprefix->second.size();
1623  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1624  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1625  else
1626  fileprefix = server_fileprefix->second;
1627  }
1628 
1629  //remove string literals
1630  auto ssize = server_file->second.size();
1631  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1632  filestem = server_file->second.substr(1, ssize - 2);
1633  else
1634  filestem = server_file->second;
1635  assert(!filestem.empty());
1636  if (version_maj > 1) {
1637  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1638  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1639  nextFileJson = "";
1640  rawHeader = true;
1641  } else {
1642  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1643  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1644  nextFileJson = filestem + ".jsn";
1645  rawHeader = false;
1646  }
1647  fileStatus = newFile;
1648  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1649  << serverLS << " file:" << filestem;
1650  }
1651  } else if (s_state == "EOLS") {
1652  serverLS = closedServerLS + 1;
1653  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1654  fileStatus = noFile;
1655  } else if (s_state == "EOR") {
1656  //server_eor = serverMap.find("iseor");
1657  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1658  fileStatus = runEnded;
1659  } else if (s_state == "NORUN") {
1660  auto err_msg = serverMap.find("errormessage");
1661  if (err_msg != serverMap.end())
1662  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1663  else
1664  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1665  edm::LogWarning("EvFDaqDirector") << "executing run end";
1666  fileStatus = runEnded;
1667  } else if (s_state == "ERROR") {
1668  auto err_msg = serverMap.find("errormessage");
1669  if (err_msg != serverMap.end())
1670  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1671  else
1672  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1673  fileStatus = noFile;
1674  serverError = true;
1675  } else {
1676  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1677  fileStatus = noFile;
1678  serverError = true;
1679  }
1680 
1681  // Read until EOF, writing data to output as we go.
1682  if (!fileBrokerKeepAlive_) {
1683  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1684  }
1685  if (ec != boost::asio::error::eof) {
1686  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1687  serverError = true;
1688  }
1689  }
1690 
1691  break;
1692  }
1693 
1694  } catch (std::exception const& e) {
1695  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1696  serverError = true;
1697  }
1698 
1699  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1700  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1701  if (ec) {
1702  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1703  }
1704  socket_->close(ec);
1705  if (ec) {
1706  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1707  }
1708  }
1709 
1710  if (serverError) {
1711  if (socket_->is_open())
1712  socket_->close(ec);
1713  if (ec) {
1714  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1715  }
1716  fileStatus = noFile;
1717  sleep(1); //back-off if error detected
1718  }
1719 
1720  return fileStatus;
1721  }
1722 
1724  unsigned int& ls,
1725  std::string& nextFileRaw,
1726  int& rawFd,
1727  uint16_t& rawHeaderSize,
1728  int32_t& serverEventsInNewFile,
1729  int64_t& fileSizeFromMetadata,
1730  uint64_t& thisLockWaitTimeUs) {
1731  EvFDaqDirector::FileStatus fileStatus = noFile;
1732 
1733  //int retval = -1;
1734  //int lock_attempts = 0;
1735  //long total_lock_attempts = 0;
1736 
1737  struct stat buf;
1738  int stopFileLS = -1;
1739  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1740  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1741  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1742  if (stopFileCheck == 0)
1743  stopFileLS = readLastLSEntry(stopFilePath_);
1744  else
1745  stopFileLS = 1; //stop without drain if only pid is stopped
1746  if (!stop_ls_override_) {
1747  //if lumisection is higher than in stop file, should quit at next from current
1748  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1749  stopFileLS = stop_ls_override_ = ls;
1750  } else
1751  stopFileLS = stop_ls_override_;
1752  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1753  << stopFileLS;
1754  //return runEnded;
1755  } else //if file was removed before reaching stop condition, reset this
1756  stop_ls_override_ = 0;
1757 
1758  /* look for EoLS
1759  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1760  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1761  ls++;
1762  return noFile;
1763  }
1764  */
1765 
1766  timeval ts_lockbegin;
1767  gettimeofday(&ts_lockbegin, nullptr);
1768 
1769  std::string nextFileJson;
1770  uint32_t serverLS, closedServerLS;
1771  unsigned int serverHttpStatus;
1772  bool serverError;
1773 
1774  //local lock to force index json and EoLS files to appear in order
1776  lockFULocal();
1777 
1778  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1779  bool rawHeader = false;
1780  fileStatus = contactFileBroker(
1781  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1782 
1783  if (serverError) {
1784  //do not update anything
1786  unlockFULocal();
1787  return noFile;
1788  }
1789 
1790  //handle creation of BoLS files if lumisection has changed
1791  if (currentLumiSection == 0) {
1792  if (fileStatus == runEnded)
1793  createLumiSectionFiles(closedServerLS, 0, true, false);
1794  else
1795  createLumiSectionFiles(serverLS, 0, true, false);
1796  } else {
1797  if (closedServerLS >= currentLumiSection) {
1798  //only BoLS files
1799  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1800  createLumiSectionFiles(i + 1, i, true, false);
1801  }
1802  }
1803 
1804  bool fileFound = true;
1805 
1806  if (fileStatus == newFile) {
1807  if (rawHeader > 0)
1808  serverEventsInNewFile =
1809  grabNextJsonFromRaw(nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false);
1810  else
1811  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1812  }
1813  //closing file in case of any error
1814  if (serverEventsInNewFile < 0 && rawFd != -1) {
1815  close(rawFd);
1816  rawFd = -1;
1817  }
1818 
1819  //can unlock because all files have been created locally
1821  unlockFULocal();
1822 
1823  if (!fileFound) {
1824  //catch condition where directory got deleted
1825  fileStatus = noFile;
1826  struct stat buf;
1827  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1828  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1829  fileStatus = runEnded;
1830  }
1831  }
1832 
1833  //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1834  //so that EoLS files can not appear locally before index files
1835  if (currentLumiSection == 0) {
1836  lockFULocal2();
1837  if (fileStatus == runEnded) {
1838  createLumiSectionFiles(closedServerLS, 0, false, true);
1839  createLumiSectionFiles(serverLS, closedServerLS, false, true); // +1
1840  } else {
1841  createLumiSectionFiles(serverLS, 0, false, true);
1842  }
1843  unlockFULocal2();
1844  } else {
1845  if (closedServerLS >= currentLumiSection) {
1846  //lock exclusive to create EoLS files
1847  lockFULocal2();
1848  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1849  createLumiSectionFiles(i + 1, i, false, true);
1850  unlockFULocal2();
1851  }
1852  }
1853 
1854  if (fileStatus == runEnded)
1855  ls = std::max(currentLumiSection, serverLS);
1856  else if (fileStatus == newFile) {
1857  assert(serverLS >= ls);
1858  ls = serverLS;
1859  } else if (fileStatus == noFile) {
1860  if (serverLS >= ls)
1861  ls = serverLS;
1862  else {
1863  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1864  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1865  sleep(1);
1866  }
1867  }
1868 
1869  return fileStatus;
1870  }
1871 
1873  // create open dir if not already there
1874 
1876  if (!std::filesystem::is_directory(openPath)) {
1877  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1878  std::filesystem::create_directories(openPath);
1879  }
1880  }
1881 
1883  std::ifstream ij(file);
1884  Json::Value deserializeRoot;
1886 
1887  if (!reader.parse(ij, deserializeRoot)) {
1888  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1889  return -1;
1890  }
1891 
1892  int ret = deserializeRoot.get("lastLS", "").asInt();
1893  return ret;
1894  }
1895 
1897  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1899  struct stat buf;
1900  unsigned int lscount = 1;
1901  do {
1902  std::stringstream ss;
1903  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1904  fullpath = ss.str();
1905  lscount++;
1906  } while (stat(fullpath.c_str(), &buf) == 0);
1907  return lscount - 1;
1908  }
1909 
1910  //if transferSystem PSet is present in the menu, we require it to be complete and consistent for all specified streams
1912  if (transferSystemJson_)
1913  return;
1914 
1915  transferSystemJson_.reset(new Json::Value);
1916  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1917  if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
1918  const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
1919 
1920  Json::Value destinationsVal(Json::arrayValue);
1921  std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
1922  for (auto& dest : destinations)
1923  destinationsVal.append(dest);
1924  (*transferSystemJson_)["destinations"] = destinationsVal;
1925 
1926  Json::Value modesVal(Json::arrayValue);
1927  std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
1928  for (auto& mode : modes)
1929  modesVal.append(mode);
1930  (*transferSystemJson_)["transferModes"] = modesVal;
1931 
1932  for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
1933  if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
1934  const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
1935  Json::Value streamVal;
1936  for (auto& mode : modes) {
1937  //validation
1938  if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
1939  throw cms::Exception("EvFDaqDirector")
1940  << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
1941  << ")";
1942  std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
1943 
1944  Json::Value sDestsValue(Json::arrayValue);
1945 
1946  if (streamDestinations.empty())
1947  throw cms::Exception("EvFDaqDirector")
1948  << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;
1949 
1950  for (auto& sdest : streamDestinations) {
1951  bool sDestValid = false;
1952  sDestsValue.append(sdest);
1953  for (auto& dest : destinations) {
1954  if (dest == sdest)
1955  sDestValid = true;
1956  }
1957  if (!sDestValid)
1958  throw cms::Exception("EvFDaqDirector")
1959  << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
1960  << ", dest:" << sdest;
1961  }
1962  streamVal[mode] = sDestsValue;
1963  }
1964  (*transferSystemJson_)[psKeyItr->first] = streamVal;
1965  }
1966  }
1967  } else {
1968  if (requireTSPSet_)
1969  throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
1970  }
1971  }
1972 
1974  std::string streamRequestName;
1975  if (transferSystemJson_->isMember(stream.c_str()))
1976  streamRequestName = stream;
1977  else {
1978  std::stringstream msg;
1979  msg << "Transfer system mode definitions missing for -: " << stream;
1980  if (requireTSPSet_)
1981  throw cms::Exception("EvFDaqDirector") << msg.str();
1982  else {
1983  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1984  return std::string("Failsafe");
1985  }
1986  }
1987  //return empty if strict check parameter is not on
1988  if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1989  edm::LogWarning("EvFDaqDirector")
1990  << "Selected mode string is not provided as DaqDirector parameter."
1991  << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1992  return std::string("Failsafe");
1993  }
1994  if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1995  throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
1996  }
1997  //check if stream has properly listed transfer stream
1998  if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
1999  std::stringstream msg;
2000  msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
2001  if (requireTSPSet_)
2002  throw cms::Exception("EvFDaqDirector") << msg.str();
2003  else
2004  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
2005  return std::string("Failsafe");
2006  }
2007  Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");
2008 
2009  //flatten string json::Array into CSV std::string
2010  std::string ret;
2011  for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
2012  if (!ret.empty())
2013  ret += ",";
2014  ret += (*it).asString();
2015  }
2016  return ret;
2017  }
2018 
2020  if (mergeTypePset_.empty())
2021  return;
2022  if (!mergeTypeMap_.empty())
2023  return;
2024  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
2025  if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
2026  const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
2027  for (const std::string& pname : tsPset.getParameterNames()) {
2028  std::string streamType = tsPset.getParameter<std::string>(pname);
2029  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2030  mergeTypeMap_.insert(ac, pname);
2031  ac->second = streamType;
2032  ac.release();
2033  }
2034  }
2035  }
2036 
2038  tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
2039  if (mergeTypeMap_.find(search_ac, stream))
2040  return search_ac->second;
2041 
2042  edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
2043  std::string defaultName = MergeTypeNames_[defaultType];
2044  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
2045  mergeTypeMap_.insert(ac, stream);
2046  ac->second = defaultName;
2047  ac.release();
2048  return defaultName;
2049  }
2050 
2052  std::string proc_flag = run_dir_ + "/processing";
2053  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2054  close(proc_flag_fd);
2055  }
2056 
2057  struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
2058 #ifdef __APPLE__
2059  return {start, len, pid, type, whence};
2060 #else
2061  return {type, whence, start, len, pid};
2062 #endif
2063  }
2064 
2066  struct stat buf;
2067  return (stat(input_throttled_file_.c_str(), &buf) == 0);
2068  }
2069 
2070 } // namespace evf
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:43
unsigned int nThreads_
Definition: fillJson.h:27
int def(FILE *, FILE *, int)
struct flock fu_rw_flk
const_iterator end() const
std::string run_string_
LuminosityBlockNumber_t luminosityBlock() const
T getParameter(std::string const &) const
Definition: ParameterSet.h:303
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::string bolsFileName(const unsigned int run, const unsigned int ls)
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
void watchPreallocate(Preallocate::slot_type const &iSlot)
boost::asio::io_service io_service_
const_iterator begin() const
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::string fulockfile_
def create(alignables, pedeDump, additionalData, outputFile, config)
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
std::shared_ptr< Json::Value > transferSystemJson_
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
jsoncollector::DataPointDefinition * dpd_
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::string getFFFParamsFilePathOnBU() const
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
Int asInt() const
ret
prodAgent to be discontinued
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
pthread_mutex_t init_lock_
ParameterSet const & getParameterSet(std::string const &) const
Value get(UInt index, const Value &defaultValue) const
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Value & append(const Value &value)
Append value to array at the end.
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
reader
Definition: DQM.py:105
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:171
Represents a JSON value.
Definition: value.h:99
Log< level::Error, false > LogError
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string getOpenInitFilePath(std::string const &stream) const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
assert(be >=bs)
bool isExceptionOnData(unsigned int ls)
std::string getStreamDestinations(std::string const &stream) const
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: Electron.h:6
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
std::string getEoRFilePathOnFU() const
struct flock fu_rw_fulk
std::string hltSourceDirectory_
unsigned int startFromLS_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::mutex * fileDeleteLockPtr_
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
std::string mergeTypePset_
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def chmod(path, mode)
Definition: eostools.py:294
std::string getRunOpenDirPath() const
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
std::string stopFilePath_
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string bu_base_dir_
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
void removeFile(unsigned int ls, unsigned int index)
unsigned int stop_ls_override_
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::string selectedTransferMode_
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string input_throttled_file_
std::string getBoLSFilePathOnFU(const unsigned int ls) const
std::string eorFileName(const unsigned int run)
int readLastLSEntry(std::string const &file)
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string stopFilePathPid_
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:60
unsigned int eolsNFilesIndex_
std::string & baseRunDir()
std::string run_nstring_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
std::string getInitFilePath(std::string const &stream) const
void createProcessingNotificationMaybe() const
Log< level::Info, false > LogInfo
void openFULockfileStream(bool create)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
int getNFilesFromEoLS(std::string BUEoLSFile)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
def ls(path, rec=False)
Definition: eostools.py:349
void preBeginRun(edm::GlobalContext const &globalContext)
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned long long uint64_t
Definition: Time.h:13
void checkMergeTypePSet(edm::ProcessContext const &pc)
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
std::vector< std::string > const & getNames() const
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:223
tuple msg
Definition: mps_check.py:285
evf::FastMonitoringService * fms_
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::string bu_run_dir_
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
ParameterSet const & getParameterSet(ParameterSetID const &id)
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
std::string getEoRFilePath() const
def mkdir(path)
Definition: eostools.py:251
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
std::string findHighestRunDir()
Definition: DirManager.cc:23
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
void postEndRun(edm::GlobalContext const &globalContext)
Unserialize a JSON document into a Value.
Definition: reader.h:16
static const std::vector< std::string > MergeTypeNames_
unsigned int getLumisectionToStart() const
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
void checkTransferSystemPSet(edm::ProcessContext const &pc)
std::string fileBrokerPort_
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
void preallocate(edm::service::SystemBounds const &bounds)
Log< level::Warning, false > LogWarning
Iterator for object and array value.
Definition: value.h:908
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
std::string fileBrokerHost_
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned int fuLockPollInterval_
std::string eolsFileName(const unsigned int run, const unsigned int ls)
unsigned int nStreams_
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
ParameterSetID const & parameterSetID() const
std::string bu_run_open_dir_
#define LogDebug(id)
array value (ordered list)
Definition: value.h:30