CMS 3D CMS Logo

EvFDaqDirector.cc
Go to the documentation of this file.
11 
19 
20 #include <iostream>
21 #include <fstream>
22 #include <sstream>
23 #include <sys/time.h>
24 #include <unistd.h>
25 #include <cstdio>
26 #include <boost/algorithm/string.hpp>
27 #include <fmt/printf.h>
28 
29 //using boost::asio::ip::tcp;
30 
31 //#define DEBUG
32 
33 using namespace jsoncollector;
34 using namespace edm::streamer;
35 
36 namespace evf {
37 
38  //for enum MergeType
39  const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
40 
42  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
43  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
44  bu_base_dirs_all_(pset.getUntrackedParameter<std::vector<std::string>>("buBaseDirsAll")),
45  bu_base_dirs_nSources_(pset.getUntrackedParameter<std::vector<int>>("buBaseDirsNumStreams")),
46  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
47  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
48  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", false)),
49  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
50  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
51  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
52  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
53  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
54  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
55  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
56  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
57  hostname_(""),
58  bu_readlock_fd_(-1),
59  bu_writelock_fd_(-1),
60  fu_readwritelock_fd_(-1),
61  fulocal_rwlock_fd_(-1),
62  fulocal_rwlock_fd2_(-1),
63  bu_w_lock_stream(nullptr),
64  bu_r_lock_stream(nullptr),
65  fu_rw_lock_stream(nullptr),
66  dirManager_(base_dir_),
67  previousFileSize_(0),
68  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
69  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
70  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
71  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
72  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
73  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
78 
79  //save hostname for later
80  char hostname[33];
81  gethostname(hostname, 32);
82  hostname_ = hostname;
83 
84  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
85  if (fuLockPollIntervalPtr) {
86  try {
87  fuLockPollInterval_ = std::stoul(std::string(fuLockPollIntervalPtr));
88  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
89  << " us";
90  } catch (const std::exception&) {
91  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
92  }
93  }
94 
95  //override file service parameter if specified by environment
96  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
97  if (fileBrokerParamPtr) {
98  try {
99  useFileBroker_ = (std::stoul(std::string(fileBrokerParamPtr))) > 0;
100  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
101  } catch (const std::exception&) {
102  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
103  }
104  }
105  if (useFileBroker_) {
107  //find BU data address from hltd configuration
109  struct stat buf;
110  if (stat("/etc/appliance/bus.config", &buf) == 0) {
111  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
112  std::getline(busconfig, fileBrokerHost_);
113  }
114  if (fileBrokerHost_.empty())
115  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
116  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
117  throw cms::Exception("EvFDaqDirector")
118  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
119 
120  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
121  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
122  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
123  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
124  }
125 
126  char* startFromLSPtr = std::getenv("FFF_START_LUMISECTION");
127  if (startFromLSPtr) {
128  try {
129  startFromLS_ = std::stoul(std::string(startFromLSPtr));
130  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
131  } catch (const std::exception&) {
132  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
133  }
134  }
135 
136  //override file service parameter if specified by environment
137  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
138  if (fileBrokerUseLockParamPtr) {
139  try {
140  fileBrokerUseLocalLock_ = (std::stoul(std::string(fileBrokerUseLockParamPtr))) > 0;
141  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
143  } catch (const std::exception&) {
144  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
145  }
146  }
147 
148  // set number of streams in each BU's ramdisk
149  if (bu_base_dirs_nSources_.empty()) {
150  // default is 1 stream per ramdisk
151  for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
152  bu_base_dirs_nSources_.push_back(1);
153  }
154  } else if (bu_base_dirs_nSources_.size() != bu_base_dirs_all_.size()) {
155  throw cms::Exception("DaqDirector")
156  << " Error while setting number of sources: size mismatch with BU base directory vector";
157  } else {
158  for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++) {
160  edm::LogInfo("EvFDaqDirector") << "Setting " << bu_base_dirs_nSources_[i] << " sources"
161  << " for ramdisk " << bu_base_dirs_all_[i];
162  }
163  }
164 
165  std::stringstream ss;
166  ss << "run" << std::setfill('0') << std::setw(6) << run_;
167  run_string_ = ss.str();
168  ss = std::stringstream();
169  ss << run_;
170  run_nstring_ = ss.str();
171  run_dir_ = base_dir_ + "/" + run_string_;
172  input_throttled_file_ = run_dir_ + "/input_throttle";
173  discard_ls_filestem_ = run_dir_ + "/discard_ls";
174  ss = std::stringstream();
175  ss << getpid();
176  pid_ = ss.str();
177  }
178 
180  // check if base dir exists or create it accordingly
181  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
182  if (retval != 0 && errno != EEXIST) {
183  throw cms::Exception("DaqDirector")
184  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
185  }
186 
187  //create run dir in base dir
188  umask(0);
189  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
190  if (retval != 0 && errno != EEXIST) {
191  throw cms::Exception("DaqDirector")
192  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
193  }
194 
195  //create fu-local.lock in run open dir
196  if (!directorBU_) {
198  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
200  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
201  if (fulocal_rwlock_fd_ == -1)
202  throw cms::Exception("DaqDirector")
203  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
204  chmod(fulocal_lock_.c_str(), 0777);
205  fsync(fulocal_rwlock_fd_);
206  //open second fd for another input source thread
208  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
209  if (fulocal_rwlock_fd2_ == -1)
210  throw cms::Exception("DaqDirector")
211  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
212  }
213 
214  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
215  //for BU, it is created at this point
216  if (directorBU_) {
218  std::string bulockfile = bu_run_dir_ + "/bu.lock";
219  fulockfile_ = bu_run_dir_ + "/fu.lock";
220 
221  //make or find bu run dir
222  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
223  if (retval != 0 && errno != EEXIST) {
224  throw cms::Exception("DaqDirector")
225  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno);
226  }
227  bu_run_open_dir_ = bu_run_dir_ + "/open";
228  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
229  if (retval != 0 && errno != EEXIST) {
230  throw cms::Exception("DaqDirector")
231  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno);
232  }
233 
234  // the BU director does not need to know about the fu lock
235  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
236  if (bu_writelock_fd_ == -1)
237  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
238  else
239  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
240  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
241  if (bu_w_lock_stream == nullptr)
242  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
243 
244  // BU INITIALIZES LOCK FILE
245  // FU LOCK FILE OPEN
246  openFULockfileStream(true);
248  fflush(fu_rw_lock_stream);
249  close(fu_readwritelock_fd_);
250 
251  if (!hltSourceDirectory_.empty()) {
252  struct stat buf;
253  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
254  std::string hltdir = bu_run_dir_ + "/hlt";
255  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
256  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
257  if (retval != 0 && errno != EEXIST)
258  throw cms::Exception("DaqDirector")
259  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno);
260 
261  std::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
262  std::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
263 
264  std::string optfiles[3] = {"hltinfo", "blacklist", "whitelist"};
265  for (auto& optfile : optfiles) {
266  try {
267  std::filesystem::copy_file(hltSourceDirectory_ + "/" + optfile, tmphltdir + "/" + optfile);
268  } catch (...) {
269  }
270  }
271 
272  std::filesystem::rename(tmphltdir, hltdir);
273  } else
274  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
275  }
276  //else{}//no configuration specified
277  } else {
278  // for FU, check if bu base dir exists
279 
280  auto checkExists = [=](std::string const& bu_base_dir) -> void {
281  int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
282  if (retval != 0 && errno != EEXIST) {
283  throw cms::Exception("DaqDirector")
284  << " Error checking for bu base dir -: " << bu_base_dir << " mkdir error:" << strerror(errno);
285  }
286  };
287 
288  auto waitForDir = [=](std::string const& bu_base_dir) -> void {
289  int cnt = 0;
290  while (!edm::shutdown_flag.load(std::memory_order_relaxed)) {
291  //stat should trigger autofs mount (mkdir could fail with access denied first time)
292  struct stat statbuf;
293  stat(bu_base_dir.c_str(), &statbuf);
294  int retval = mkdir(bu_base_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
295  if (retval != 0 && errno != EEXIST) {
296  usleep(500000);
297  cnt++;
298  if (cnt % 20 == 0)
299  edm::LogWarning("DaqDirector") << "waiting for " << bu_base_dir;
300  if (cnt > 120)
301  throw cms::Exception("DaqDirector") << " Error checking for bu base dir after 1 minute -: " << bu_base_dir
302  << " mkdir error:" << strerror(errno);
303  continue;
304  }
305  break;
306  }
307  };
308 
309  if (!bu_base_dirs_all_.empty()) {
310  std::string check_dir = bu_base_dir_.empty() ? bu_base_dirs_all_[0] : bu_base_dir_;
311  checkExists(check_dir);
312  bu_run_dir_ = check_dir + "/" + run_string_;
313  for (unsigned int i = 0; i < bu_base_dirs_all_.size(); i++)
314  waitForDir(bu_base_dirs_all_[i]);
315  } else {
316  checkExists(bu_base_dir_);
318  }
319 
320  fulockfile_ = bu_run_dir_ + "/fu.lock";
321  if (!useFileBroker_)
322  openFULockfileStream(false);
323  }
324 
325  pthread_mutex_init(&init_lock_, nullptr);
326 
327  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
328  std::stringstream sstp;
329  sstp << stopFilePath_ << "_pid" << pid_;
330  stopFilePathPid_ = sstp.str();
331 
332  if (!directorBU_) {
333  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
334  struct stat statbuf;
335  if (!stat(defPath.c_str(), &statbuf))
336  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
337  else {
338  //look in source directory if not present in ramdisk
339  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
340  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
341  if (stat(defPath.c_str(), &statbuf)) {
342  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
343  if (stat(defPath.c_str(), &statbuf)) {
344  defPath = defPathSuffix;
345  }
346  }
347  }
348  dpd_ = new DataPointDefinition();
349  std::string defLabel = "data";
350  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
351  }
352  }
353 
355  //close server connection
356  if (socket_.get() && socket_->is_open()) {
357  boost::system::error_code ec;
358  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
359  socket_->close(ec);
360  }
361 
362  if (fulocal_rwlock_fd_ != -1) {
363  unlockFULocal();
364  close(fulocal_rwlock_fd_);
365  }
366 
367  if (fulocal_rwlock_fd2_ != -1) {
368  unlockFULocal2();
369  close(fulocal_rwlock_fd2_);
370  }
371  }
372 
374  initRun();
375 
376  nThreads_ = bounds.maxNumberOfStreams();
377  nStreams_ = bounds.maxNumberOfThreads();
378  nConcurrentLumis_ = bounds.maxNumberOfConcurrentLuminosityBlocks();
379  }
380 
383  desc.setComment(
384  "Service used for file locking arbitration and for propagating information between other EvF components");
385  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
386  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
387  desc.addUntracked<std::vector<std::string>>("buBaseDirsAll", std::vector<std::string>())
388  ->setComment("BU base ramdisk directories for multi-file DAQSource models");
389  desc.addUntracked<std::vector<int>>("buBaseDirsNumStreams", std::vector<int>())
390  ->setComment("Number of streams for each BU base ramdisk directories for multi-file DAQSource models");
391  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
392  desc.addUntracked<bool>("useFileBroker", false)
393  ->setComment("Use BU file service to grab input data instead of NFS file locking");
394  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
395  ->setComment("Allow service to discover BU address from hltd configuration");
396  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
397  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
398  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
399  ->setComment("Use keep alive to avoid using large number of sockets");
400  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
401  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
402  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
403  ->setComment("Lock polling interval in microseconds for the input directory file lock");
404  desc.addUntracked<bool>("outputAdler32Recheck", false)
405  ->setComment("Check Adler32 of per-process output files while micro-merging");
406  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
407  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
408  desc.addUntracked<std::string>("mergingPset", "")
409  ->setComment("Name of merging PSet to look for merging type definitions for streams");
410  descriptions.add("EvFDaqDirector", desc);
411  }
412 
413  void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
414  //assert(run_ == id.run());
415 
416  // check if the requested run is the latest one - issue a warning if it isn't
418  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
419  << ". This is not the highest run " << dirManager_.findHighestRunDir();
420  }
421  }
422 
423  void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
424  close(bu_readlock_fd_);
425  close(bu_writelock_fd_);
426  if (directorBU_) {
427  std::string filename = bu_run_dir_ + "/bu.lock";
429  }
430  }
431 
433  //delete all files belonging to just closed lumi
434  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
436  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
437  return;
438  }
439 
440  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
441  auto it = filesToDeletePtr_->begin();
442  while (it != filesToDeletePtr_->end()) {
443  if (it->second->lumi_ == ls && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
444  it = filesToDeletePtr_->erase(it);
445  } else
446  it++;
447  }
448  }
449 
450  std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
452  }
453 
454  std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
456  }
457 
458  std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
459  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
460  }
461 
462  std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
463  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
464  }
465 
466  std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
468  }
469 
472  }
473 
476  }
477 
480  }
481 
484  }
485 
488  }
489 
491  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
492  }
493 
496  }
497 
500  }
501 
503  std::string const& stream) const {
505  }
506 
508  std::string const& stream) const {
510  }
511 
513  std::string const& stream) const {
515  }
516 
519  }
520 
523  }
524 
527  }
528 
530  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
531  }
532 
534  return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
535  }
536 
538  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
539  }
540 
542 
544 
546 
547  std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
548 
550  int retval = remove(filename.c_str());
551  if (retval != 0)
552  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
553  << ". error = " << strerror(errno);
554  }
555 
557  std::string& nextFile,
558  uint32_t& fsize,
559  uint16_t& rawHeaderSize,
560  uint64_t& lockWaitTime,
561  bool& setExceptionState) {
562  EvFDaqDirector::FileStatus fileStatus = noFile;
563  rawHeaderSize = 0;
564 
565  int retval = -1;
566  int lock_attempts = 0;
567  long total_lock_attempts = 0;
568 
569  struct stat buf;
570  int stopFileLS = -1;
571  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
572  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
573  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
574  if (stopFileCheck == 0)
575  stopFileLS = readLastLSEntry(stopFilePath_);
576  else
577  stopFileLS = 1; //stop without drain if only pid is stopped
578  if (!stop_ls_override_) {
579  //if lumisection is higher than in stop file, should quit at next from current
580  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
581  stopFileLS = stop_ls_override_ = ls;
582  } else
583  stopFileLS = stop_ls_override_;
584  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
585  << stopFileLS;
586  //return runEnded;
587  } else //if file was removed before reaching stop condition, reset this
588  stop_ls_override_ = 0;
589 
590  timeval ts_lockbegin;
591  gettimeofday(&ts_lockbegin, nullptr);
592 
593  while (retval == -1) {
594  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
595  if (retval == -1)
596  usleep(fuLockPollInterval_);
597  else
598  continue;
599 
600  lock_attempts += fuLockPollInterval_;
601  total_lock_attempts += fuLockPollInterval_;
602  if (lock_attempts > 5000000 || errno == 116) {
603  if (errno == 116)
604  edm::LogWarning("EvFDaqDirector")
605  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
606  else
607  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
608  "fu.lock file are present -: errno "
609  << errno << ":" << strerror(errno) << std::endl;
610 
611  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
612  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
613  ls++;
614  return noFile;
615  }
616 
617  if (stat(bu_run_dir_.c_str(), &buf) != 0)
618  return runEnded;
619  if (stat(fulockfile_.c_str(), &buf) != 0)
620  return runEnded;
621 
622  lock_attempts = 0;
623  }
624  if (total_lock_attempts > 5 * 60000000) {
625  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
626  return runAbort;
627  }
628  }
629 
630  timeval ts_lockend;
631  gettimeofday(&ts_lockend, nullptr);
632  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
633  if (deltat > 0.)
634  lockWaitTime = deltat;
635 
636  if (retval != 0)
637  return fileStatus;
638 
639 #ifdef DEBUG
640  timeval ts_lockend;
641  gettimeofday(&ts_lockend, 0);
642 #endif
643 
644  //open another lock file FD after the lock using main fd has been acquired
645  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
646  if (fu_readwritelock_fd2 == -1)
647  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
648  << " create. error:" << strerror(errno);
649 
650  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
651 
652  // if the stream is readable
653  if (fu_rw_lock_stream2 != nullptr) {
654  unsigned int readLs, readIndex;
655  int check = 0;
656  // rewind the stream
657  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
658  // if rewinded ok
659  if (check == 0) {
660  // read its' values
661  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
662  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
663 
664  unsigned int currentLs = readLs;
665  bool bumpedOk = false;
666  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
667  //no lock file write in this case
668  if (ls && ls + 1 < currentLs)
669  ls++;
670  else {
671  // try to bump (look for new index or EoLS file)
672  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, rawHeaderSize, stopFileLS, setExceptionState);
673  //avoid 2 lumisections jump
674  if (ls && readLs > currentLs && currentLs > ls) {
675  ls++;
676  readLs = currentLs = ls;
677  readIndex = 0;
678  bumpedOk = false;
679  //no write to lock file
680  } else {
681  if (ls == 0 && readLs > currentLs) {
682  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
683  //in this case there is no new file in the same LS
684  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
685  readLs = currentLs;
686  readIndex = 0;
687  bumpedOk = false;
688  //no write to lock file
689  }
690  //update return LS value
691  ls = readLs;
692  }
693  }
694  if (bumpedOk) {
695  // there is a new index file to grab, lock file needs to be updated
696  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
697  if (check == 0) {
698  ftruncate(fu_readwritelock_fd2, 0);
699  // write next index in the file, which is the file the next process should take
700  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
701  fflush(fu_rw_lock_stream2);
702  fsync(fu_readwritelock_fd2);
703  fileStatus = newFile;
704  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
705  } else {
706  edm::LogError("EvFDaqDirector")
707  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
708  setExceptionState = true;
709  return noFile;
710  }
711  } else if (currentLs < readLs) {
712  //there is no new file in next LS (yet), but lock file can be updated to the next LS
713  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
714  if (check == 0) {
715  ftruncate(fu_readwritelock_fd2, 0);
716  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
717  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
718  fflush(fu_rw_lock_stream2);
719  fsync(fu_readwritelock_fd2);
720  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
721  } else {
722  edm::LogError("EvFDaqDirector")
723  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
724  setExceptionState = true;
725  return noFile;
726  }
727  }
728  } else {
729  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
730  << strerror(errno);
731  }
732  } else {
733  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
734  }
735  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
736 
737 #ifdef DEBUG
738  timeval ts_preunlock;
739  gettimeofday(&ts_preunlock, 0);
740  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
741  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
742 #endif
743 
744  //if new json is present, lock file which FedRawDataInputSource will later unlock
745  if (fileStatus == newFile)
746  lockFULocal();
747 
748  //release lock at this point
749  int retvalu = -1;
750  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
751  if (retvalu == -1)
752  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
753 
754 #ifdef DEBUG
755  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
756 #endif
757 
758  if (fileStatus == noFile) {
759  struct stat buf;
760  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
761  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
762  fileStatus = runEnded;
763  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
764  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
765  fileStatus = runEnded;
766  }
767  }
768  return fileStatus;
769  }
770 
772  std::ifstream ij(BUEoLSFile);
773  Json::Value deserializeRoot;
775 
776  if (!reader.parse(ij, deserializeRoot)) {
777  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
778  return -1;
779  }
780 
782  DataPoint dp;
783  dp.deserialize(deserializeRoot);
784 
785  //read definition
786  if (readEolsDefinition_) {
787  //std::string def = boost::algorithm::trim(dp.getDefinition());
788  std::string def = dp.getDefinition();
789  if (def.empty())
790  readEolsDefinition_ = false;
791  while (!def.empty()) {
793  if (def.find('/') == 0)
794  fullpath = def;
795  else
796  fullpath = bu_run_dir_ + '/' + def;
797  struct stat buf;
798  if (stat(fullpath.c_str(), &buf) == 0) {
799  DataPointDefinition eolsDpd;
800  std::string defLabel = "legend";
801  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
802  if (eolsDpd.getNames().empty()) {
803  //try with "data" label if "legend" format is not used
804  eolsDpd = DataPointDefinition();
805  defLabel = "data";
806  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
807  }
808  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
809  if (eolsDpd.getNames().at(i) == "NFiles")
811  readEolsDefinition_ = false;
812  break;
813  }
814  //check if we can still find definition
815  if (def.size() <= 1 || def.find('/') == std::string::npos) {
816  readEolsDefinition_ = false;
817  break;
818  }
819  def = def.substr(def.find('/') + 1);
820  }
821  }
822 
823  if (dp.getData().size() > eolsNFilesIndex_)
824  data = dp.getData()[eolsNFilesIndex_];
825  else {
826  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
827  return -1;
828  }
829  return std::stoi(data);
830  }
831 
832  bool EvFDaqDirector::bumpFile(unsigned int& ls,
833  unsigned int& index,
834  std::string& nextFile,
835  uint32_t& fsize,
836  uint16_t& rawHeaderSize,
837  int maxLS,
838  bool& setExceptionState) {
839  if (previousFileSize_ != 0) {
840  if (!fms_) {
842  }
843  if (fms_)
845  previousFileSize_ = 0;
846  }
847  nextFile = "";
848 
849  //reached limit
850  if (maxLS >= 0 && ls > (unsigned int)maxLS)
851  return false;
852 
853  struct stat buf;
854  std::stringstream ss;
855 
856  // 1. Check suggested file
857  std::string nextFileJson = getInputJsonFilePath(ls, index);
858  if (stat(nextFileJson.c_str(), &buf) == 0) {
859  fsize = previousFileSize_ = buf.st_size;
860  nextFile = nextFileJson;
861  return true;
862  }
863  // 2. No file -> lumi ended? (and how many?)
864  else {
865  // 3. No file -> check for standalone raw file
866  std::string nextFileRaw = getRawFilePath(ls, index);
867  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
868  fsize = previousFileSize_ = buf.st_size;
869  nextFile = nextFileRaw;
870  return true;
871  }
872 
873  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
874 
875  if (stat(BUEoLSFile.c_str(), &buf) == 0) {
876  // recheck that no raw file appeared in the meantime
877  if (stat(nextFileJson.c_str(), &buf) == 0) {
878  fsize = previousFileSize_ = buf.st_size;
879  nextFile = nextFileJson;
880  return true;
881  }
882  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
883  fsize = previousFileSize_ = buf.st_size;
884  nextFile = nextFileRaw;
885  return true;
886  }
887 
888  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
889  if (indexFilesInLS < 0)
890  //parsing failed
891  return false;
892  else {
893  //check index
894  if ((int)index < indexFilesInLS) {
895  //we have 2 files, and check for 1 failed... retry (2 will never be here)
896  edm::LogError("EvFDaqDirector")
897  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
898  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
899  setExceptionState = true;
900  return false;
901  }
902  }
903  // this lumi ended, check for files
904  ++ls;
905  index = 0;
906 
907  //reached limit
908  if (maxLS >= 0 && ls > (unsigned int)maxLS)
909  return false;
910 
911  nextFileJson = getInputJsonFilePath(ls, 0);
912  nextFileRaw = getRawFilePath(ls, 0);
913  if (stat(nextFileJson.c_str(), &buf) == 0) {
914  // a new file was found at new lumisection, index 0
915  fsize = previousFileSize_ = buf.st_size;
916  nextFile = nextFileJson;
917  return true;
918  }
919  if (stat(nextFileRaw.c_str(), &buf) == 0 && rawFileHasHeader(nextFileRaw, rawHeaderSize)) {
920  fsize = previousFileSize_ = buf.st_size;
921  nextFile = nextFileRaw;
922  return true;
923  }
924  return false;
925  }
926  }
927  // no new file found
928  return false;
929  }
930 
932  if (fu_rw_lock_stream == nullptr)
933  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
934  else {
935  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
936  unsigned int readLs = 1, readIndex = 0;
937  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
938  }
939  }
940 
942  if (create) {
944  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
945  chmod(fulockfile_.c_str(), 0766);
946  } else {
947  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
948  }
949  if (fu_readwritelock_fd_ == -1)
950  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
951  << " create:" << create << " error:" << strerror(errno);
952  else
953  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
954 
955  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
956  if (fu_rw_lock_stream == nullptr)
957  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
958  }
959 
960  void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
961 
962  void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
963 
965  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
966  flock(fulocal_rwlock_fd_, LOCK_SH);
967  }
968 
970  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
971  flock(fulocal_rwlock_fd_, LOCK_UN);
972  }
973 
975  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
976  flock(fulocal_rwlock_fd2_, LOCK_EX);
977  }
978 
980  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
981  flock(fulocal_rwlock_fd2_, LOCK_UN);
982  }
983 
984  void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
985  //used for backpressure mechanisms and monitoring
986  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
987  struct stat buf;
988  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
989  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
990  close(bol_fd);
991  }
992  }
993 
994  void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
995  const uint32_t currentLumiSection,
996  bool doCreateBoLS,
997  bool doCreateEoLS) {
998  if (currentLumiSection > 0) {
999  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
1000  struct stat buf;
1001  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
1002  if (!found) {
1003  if (doCreateEoLS) {
1004  int eol_fd =
1005  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1006  close(eol_fd);
1007  }
1008  if (doCreateBoLS)
1009  createBoLSFile(lumiSection, false);
1010  }
1011  } else if (doCreateBoLS) {
1012  createBoLSFile(lumiSection, true); //needed for initial lumisection
1013  }
1014  }
1015 
1017  int& rawFd,
1018  uint16_t& rawHeaderSize,
1019  uint16_t& rawDataType,
1020  uint32_t& lsFromHeader,
1021  int32_t& eventsFromHeader,
1022  int64_t& fileSizeFromHeader,
1023  bool requireHeader,
1024  bool retry,
1025  bool closeFile) {
1026  int infile;
1027 
1028  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1029  if (retry) {
1030  edm::LogWarning("EvFDaqDirector")
1031  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1032  return parseFRDFileHeader(rawSourcePath,
1033  rawFd,
1034  rawHeaderSize,
1035  rawDataType,
1036  lsFromHeader,
1037  eventsFromHeader,
1038  fileSizeFromHeader,
1039  requireHeader,
1040  false,
1041  closeFile);
1042  } else {
1043  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1044  edm::LogError("EvFDaqDirector")
1045  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
1046  if (errno == ENOENT)
1047  return 1; // error && file not found
1048  else
1049  return -1;
1050  }
1051  }
1052  }
1053 
1054  //v2 is the largest possible read
1055  char hdr[sizeof(FRDFileHeader_v2)];
1056  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1057  return -1;
1058 
1060  uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1061 
1062  if (frd_version == 0) {
1063  //no header (specific sequence not detected)
1064  if (requireHeader) {
1065  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
1066  close(infile);
1067  return -1;
1068  } else {
1069  //no header, but valid file
1070  lseek(infile, 0, SEEK_SET);
1071  rawHeaderSize = 0;
1072  lsFromHeader = 0;
1073  eventsFromHeader = -1;
1074  fileSizeFromHeader = -1;
1075  }
1076  } else if (frd_version == 1) {
1077  //version 1 header
1078  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1079  return -1;
1081  uint32_t headerSizeRaw = fhContent->headerSize_;
1082  if (headerSizeRaw != sizeof(FRDFileHeader_v1)) {
1083  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1084  << " v:" << frd_version;
1085  close(infile);
1086  return -1;
1087  }
1088  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1089  rawDataType = 0;
1090  lsFromHeader = fhContent->lumiSection_;
1091  eventsFromHeader = (int32_t)fhContent->eventCount_;
1092  fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1093  rawHeaderSize = fhContent->headerSize_;
1094 
1095  } else if (frd_version == 2) {
1096  //version 2 heade
1097  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1098  return -1;
1100  uint32_t headerSizeRaw = fhContent->headerSize_;
1101  if (headerSizeRaw != sizeof(FRDFileHeader_v2)) {
1102  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1103  << " v:" << frd_version;
1104  close(infile);
1105  return -1;
1106  }
1107  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1108  rawDataType = fhContent->dataType_;
1109  lsFromHeader = fhContent->lumiSection_;
1110  eventsFromHeader = (int32_t)fhContent->eventCount_;
1111  fileSizeFromHeader = (int64_t)fhContent->fileSize_;
1112  rawHeaderSize = fhContent->headerSize_;
1113  }
1114 
1115  if (closeFile) {
1116  close(infile);
1117  infile = -1;
1118  }
1119 
1120  rawFd = infile;
1121  return 0; //OK
1122  }
1123 
1124  bool EvFDaqDirector::checkFileRead(char* buf, int infile, std::size_t buf_sz, std::string const& path) {
1125  ssize_t sz_read = ::read(infile, buf, buf_sz);
1126  if (sz_read < 0) {
1127  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unable to read " << path << " : " << strerror(errno);
1128  if (infile != -1)
1129  close(infile);
1130  return false;
1131  }
1132  if ((size_t)sz_read < buf_sz) {
1133  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - file smaller than header: " << path;
1134  if (infile != -1)
1135  close(infile);
1136  return false;
1137  }
1138  return true;
1139  }
1140 
1141  bool EvFDaqDirector::rawFileHasHeader(std::string const& rawSourcePath, uint16_t& rawHeaderSize) {
1142  int infile;
1143  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
1144  edm::LogWarning("EvFDaqDirector") << "rawFileHasHeader - failed to open input file -: " << rawSourcePath << " : "
1145  << strerror(errno);
1146  return false;
1147  }
1148  //try to read FRD header size (v2 is the biggest, use read buffer of that size)
1149  char hdr[sizeof(FRDFileHeader_v2)];
1150  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderIdentifier), rawSourcePath))
1151  return false;
1153  uint16_t frd_version = getFRDFileHeaderVersion(fileId->id_, fileId->version_);
1154 
1155  if (frd_version == 1) {
1156  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v1), rawSourcePath))
1157  return false;
1159  rawHeaderSize = fhContent->headerSize_;
1160  close(infile);
1161  return true;
1162  } else if (frd_version == 2) {
1163  if (!checkFileRead(hdr, infile, sizeof(FRDFileHeaderContent_v2), rawSourcePath))
1164  return false;
1166  rawHeaderSize = fhContent->headerSize_;
1167  close(infile);
1168  return true;
1169  } else
1170  edm::LogError("EvFDaqDirector") << "rawFileHasHeader - unknown version: " << frd_version;
1171 
1172  close(infile);
1173  rawHeaderSize = 0;
1174  return false;
1175  }
1176 
1178  int& rawFd,
1179  uint16_t& rawHeaderSize,
1180  int64_t& fileSizeFromHeader,
1181  bool& fileFound,
1182  uint32_t serverLS,
1183  bool closeFile,
1184  bool requireHeader) {
1185  fileFound = true;
1186 
1187  //take only first three tokens delimited by "_" in the renamed raw file name
1188  std::string jsonStem = std::filesystem::path(rawSourcePath).stem().string();
1189  size_t pos = 0, n_tokens = 0;
1190  while (n_tokens++ < 3 && (pos = jsonStem.find('_', pos + 1)) != std::string::npos) {
1191  }
1192  std::string reducedJsonStem = jsonStem.substr(0, pos);
1193 
1194  std::ostringstream fileNameWithPID;
1195  //should be ported to use fffnaming
1196  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1197 
1198  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1199 
1200  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1201 
1202  //parse RAW file header if it exists
1203  uint32_t lsFromRaw;
1204  int32_t nbEventsWrittenRaw;
1205  int64_t fileSizeFromRaw;
1206  uint16_t rawDataType;
1207  auto ret = parseFRDFileHeader(rawSourcePath,
1208  rawFd,
1209  rawHeaderSize,
1210  rawDataType,
1211  lsFromRaw,
1212  nbEventsWrittenRaw,
1213  fileSizeFromRaw,
1214  requireHeader,
1215  true,
1216  closeFile);
1217  if (ret != 0) {
1218  if (ret == 1)
1219  fileFound = false;
1220  return -1;
1221  }
1222 
1223  int outfile;
1224  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1225  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1226  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1227  if (errno == EEXIST) {
1228  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1229  << " : ";
1230  return -1;
1231  }
1232  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1233  << strerror(errno);
1234  struct stat out_stat;
1235  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1236  edm::LogWarning("EvFDaqDirector")
1237  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1238  << jsonDestPath;
1239  if (unlink(jsonDestPath.c_str()) == -1) {
1240  edm::LogWarning("EvFDaqDirector")
1241  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1242  }
1243  }
1244  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1245  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1246  << jsonDestPath << " : " << strerror(errno);
1247  return -1;
1248  }
1249  }
1250  //write JSON file (TODO: use jsoncpp)
1251  std::stringstream ss;
1252  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1253  std::string sstr = ss.str();
1254 
1255  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1256  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1257  << " : " << strerror(errno);
1258  return -1;
1259  }
1260  close(outfile);
1261  if (serverLS && serverLS != lsFromRaw)
1262  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1263  << " and raw file header LS " << lsFromRaw;
1264 
1265  fileSizeFromHeader = fileSizeFromRaw;
1266  return nbEventsWrittenRaw;
1267  }
1268 
1270  std::string const& rawSourcePath,
1271  int64_t& fileSizeFromJson,
1272  bool& fileFound) {
1273  fileFound = true;
1274 
1275  //should be ported to use fffnaming
1276  std::ostringstream fileNameWithPID;
1277  fileNameWithPID << std::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1278  << std::setw(5) << pid_ << ".jsn";
1279 
1280  // assemble json destination path
1281  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1282 
1283  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1284 
1285  int infile = -1, outfile = -1;
1286 
1287  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1288  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1289  << strerror(errno);
1290  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1291  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1292  << jsonSourcePath << " : " << strerror(errno);
1293  if (errno == ENOENT)
1294  fileFound = false;
1295  return -1;
1296  }
1297  }
1298 
1299  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1300  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1301  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1302  if (errno == EEXIST) {
1303  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1304  << " : ";
1305  ::close(infile);
1306  return -1;
1307  }
1308  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1309  << strerror(errno);
1310  struct stat out_stat;
1311  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1312  edm::LogWarning("EvFDaqDirector")
1313  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1314  if (unlink(jsonDestPath.c_str()) == -1) {
1315  edm::LogWarning("EvFDaqDirector")
1316  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1317  }
1318  }
1319  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1320  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1321  << jsonDestPath << " : " << strerror(errno);
1322  ::close(infile);
1323  return -1;
1324  }
1325  }
1326  //copy contents
1327  const std::size_t buf_sz = 512;
1328  std::size_t tot_written = 0;
1329  std::unique_ptr<char[]> buf(new char[buf_sz]);
1330 
1331  ssize_t sz, sz_read = 1, sz_write;
1332  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1333  sz_write = 0;
1334  do {
1335  assert(sz_read - sz_write > 0);
1336  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1337  sz_read = sz; // cause read loop termination
1338  break;
1339  }
1340  assert(sz > 0);
1341  sz_write += sz;
1342  tot_written += sz;
1343  } while (sz_write < sz_read);
1344  }
1345  close(infile);
1346  close(outfile);
1347 
1348  if (tot_written > 0) {
1349  //leave file if it was empty for diagnosis
1350  if (unlink(jsonSourcePath.c_str()) == -1) {
1351  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1352  << strerror(errno);
1353  return -1;
1354  }
1355  } else {
1356  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1357  << jsonSourcePath;
1358  return -1;
1359  }
1360 
1361  Json::Value deserializeRoot;
1363 
1364  std::string data;
1365  std::stringstream ss;
1366  bool result;
1367  try {
1368  if (tot_written <= buf_sz) {
1369  result = reader.parse(buf.get(), deserializeRoot);
1370  } else {
1371  //json will normally not be bigger than buf_sz bytes
1372  try {
1373  std::ifstream ij(jsonDestPath);
1374  ss << ij.rdbuf();
1375  } catch (std::filesystem::filesystem_error const& ex) {
1376  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - FILESYSTEM ERROR CAUGHT -: " << ex.what();
1377  return -1;
1378  }
1379  result = reader.parse(ss.str(), deserializeRoot);
1380  }
1381  if (!result) {
1382  if (tot_written <= buf_sz)
1383  ss << buf.get();
1384  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1385  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1386  << ss.str() << ".";
1387  return -1;
1388  }
1389 
1390  //read BU JSON
1391  DataPoint dp;
1392  dp.deserialize(deserializeRoot);
1393  bool success = false;
1394  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1395  if (dpd_->getNames().at(i) == "NEvents")
1396  if (i < dp.getData().size()) {
1397  data = dp.getData()[i];
1398  success = true;
1399  break;
1400  }
1401  }
1402  if (!success) {
1403  if (!dp.getData().empty())
1404  data = dp.getData()[0];
1405  else {
1406  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1407  << "grabNextJsonFile - "
1408  << " error reading number of events from BU JSON; No input value. data -: " << data;
1409  return -1;
1410  }
1411  }
1412 
1413  //try to read raw file size
1414  fileSizeFromJson = -1;
1415  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1416  if (dpd_->getNames().at(i) == "NBytes") {
1417  if (i < dp.getData().size()) {
1418  std::string dataSize = dp.getData()[i];
1419  try {
1420  fileSizeFromJson = std::stol(dataSize);
1421  } catch (const std::exception&) {
1422  //non-fatal currently, processing can continue without this value
1423  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1424  << "Input value is -: " << dataSize;
1425  }
1426  break;
1427  }
1428  }
1429  }
1430  return std::stoi(data);
1431  } catch (const std::out_of_range& e) {
1432  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1433  << "Input value is -: " << data;
1434  } catch (const std::invalid_argument& e) {
1435  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - argument error parsing events from BU JSON. "
1436  << "Input value is -: " << data;
1437  } catch (std::runtime_error const& e) {
1438  //Can be thrown by Json parser
1439  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1440  }
1441 
1442  catch (std::exception const& e) {
1443  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1444  } catch (...) {
1445  //unknown exception
1446  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1447  }
1448 
1449  return -1;
1450  }
1451 
1453  std::string data;
1454  try {
1455  // assemble json destination path
1456  std::filesystem::path jsonDestPath(baseRunDir());
1457 
1458  //should be ported to use fffnaming
1459  std::ostringstream fileNameWithPID;
1460  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1461  << ".jsn";
1462  jsonDestPath /= fileNameWithPID.str();
1463 
1464  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1465  try {
1466  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1467  } catch (std::filesystem::filesystem_error const& ex) {
1468  // Input dir gone?
1469  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1470  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1471  sleep(1);
1472  std::filesystem::copy(jsonSourcePath, jsonDestPath);
1473  }
1474  unlockFULocal();
1475 
1476  try {
1477  //sometimes this fails but file gets deleted
1478  std::filesystem::remove(jsonSourcePath);
1479  } catch (std::filesystem::filesystem_error const& ex) {
1480  // Input dir gone?
1481  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1482  } catch (std::exception const& ex) {
1483  // Input dir gone?
1484  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1485  }
1486 
1487  std::ifstream ij(jsonDestPath);
1488  Json::Value deserializeRoot;
1490 
1491  std::stringstream ss;
1492  ss << ij.rdbuf();
1493  if (!reader.parse(ss.str(), deserializeRoot)) {
1494  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1495  << "\nERROR:\n"
1496  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1497  << ss.str() << ".";
1498  throw std::runtime_error("Cannot deserialize input JSON file");
1499  }
1500 
1501  //read BU JSON
1502  std::string data;
1503  DataPoint dp;
1504  dp.deserialize(deserializeRoot);
1505  bool success = false;
1506  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1507  if (dpd_->getNames().at(i) == "NEvents")
1508  if (i < dp.getData().size()) {
1509  data = dp.getData()[i];
1510  success = true;
1511  }
1512  }
1513  if (!success) {
1514  if (!dp.getData().empty())
1515  data = dp.getData()[0];
1516  else
1517  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1518  << " error reading number of events from BU JSON -: No input value " << data;
1519  }
1520  return std::stoi(data);
1521  } catch (std::filesystem::filesystem_error const& ex) {
1522  // Input dir gone?
1523  unlockFULocal();
1524  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1525  } catch (std::runtime_error const& e) {
1526  // Another process grabbed the file and NFS did not register this
1527  unlockFULocal();
1528  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1529  } catch (const std::out_of_range&) {
1530  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1531  << "Input value is -: " << data;
1532  } catch (const std::invalid_argument&) {
1533  edm::LogError("EvFDaqDirector") << "grabNextFile argument error parsing events from BU JSON. "
1534  << "Input value is -: " << data;
1535  } catch (std::exception const& e) {
1536  // BU run directory disappeared?
1537  unlockFULocal();
1538  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1539  }
1540 
1541  return -1;
1542  }
1543 
1545  bool& serverError,
1546  uint32_t& serverLS,
1547  uint32_t& closedServerLS,
1548  std::string& nextFileJson,
1549  std::string& nextFileRaw,
1550  bool& rawHeader,
1551  int maxLS) {
1552  EvFDaqDirector::FileStatus fileStatus = noFile;
1553  serverError = false;
1554  std::string dest = fmt::sprintf(" on connection to %s:%s", fileBrokerHost_, fileBrokerPort_);
1555 
1556  boost::system::error_code ec;
1557  try {
1558  while (true) {
1559  //socket connect
1560  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1562 
1563  if (ec) {
1564  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
1565  serverError = true;
1566  break;
1567  }
1568  }
1569 
1570  boost::asio::streambuf request;
1571  std::ostream request_stream(&request);
1572  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1573  if (maxLS >= 0) {
1574  std::stringstream spath;
1575  spath << path << "&stopls=" << maxLS;
1576  path = spath.str();
1577  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1578  }
1579  request_stream << "GET " << path << " HTTP/1.1\r\n";
1580  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1581  request_stream << "Accept: */*\r\n";
1582  request_stream << "Connection: keep-alive\r\n\r\n";
1583 
1584  boost::asio::write(*socket_, request, ec);
1585  if (ec) {
1586  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1587  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset" << dest;
1588  //we got disconnected, try to reconnect to the server before writing the request
1590  if (ec) {
1591  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec << dest;
1592  serverError = true;
1593  break;
1594  }
1595  continue;
1596  }
1597  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec << dest;
1598  serverError = true;
1599  break;
1600  }
1601 
1602  boost::asio::streambuf response;
1603  boost::asio::read_until(*socket_, response, "\r\n", ec);
1604  if (ec) {
1605  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
1606  serverError = true;
1607  break;
1608  }
1609 
1610  std::istream response_stream(&response);
1611 
1612  std::string http_version;
1613  response_stream >> http_version;
1614 
1615  response_stream >> serverHttpStatus;
1616 
1617  std::string status_message;
1618  std::getline(response_stream, status_message);
1619  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1620  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1621  serverError = true;
1622  break;
1623  }
1624  if (serverHttpStatus != 200) {
1625  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1626  serverError = true;
1627  break;
1628  }
1629 
1630  // Process the response headers.
1632  while (std::getline(response_stream, header) && header != "\r") {
1633  }
1634 
1635  std::string fileInfo;
1636  std::map<std::string, std::string> serverMap;
1637  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1638  auto pos = fileInfo.find('=');
1639  if (pos == std::string::npos)
1640  continue;
1641  auto stitle = fileInfo.substr(0, pos);
1642  auto svalue = fileInfo.substr(pos + 1);
1643  serverMap[stitle] = svalue;
1644  }
1645 
1646  //check that response run number if correct
1647  auto server_version = serverMap.find("version");
1648  assert(server_version != serverMap.end());
1649 
1650  auto server_run = serverMap.find("runnumber");
1651  assert(server_run != serverMap.end());
1652  assert(run_nstring_ == server_run->second);
1653 
1654  auto server_state = serverMap.find("state");
1655  assert(server_state != serverMap.end());
1656 
1657  auto server_eols = serverMap.find("lasteols");
1658  assert(server_eols != serverMap.end());
1659 
1660  auto server_ls = serverMap.find("lumisection");
1661 
1662  int version_maj = 1;
1663  int version_min = 0;
1664  int version_rev = 0;
1665  {
1666  auto* s_ptr = server_version->second.c_str();
1667  if (!server_version->second.empty() && server_version->second[0] == '"')
1668  s_ptr++;
1669  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1670  if (res < 3) {
1671  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1672  if (res < 2) {
1673  res = sscanf(s_ptr, "%d", &version_maj);
1674  if (res < 1) {
1675  //expecting at least 1 number (major version)
1676  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1677  }
1678  }
1679  }
1680  }
1681 
1682  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1683  if (server_ls != serverMap.end())
1684  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1685  else
1686  serverLS = closedServerLS + 1;
1687 
1688  std::string s_state = server_state->second;
1689  if (s_state == "STARTING") //initial, always empty starting with LS 1
1690  {
1691  auto server_file = serverMap.find("file");
1692  assert(server_file == serverMap.end()); //no file with starting state
1693  fileStatus = noFile;
1694  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1695  } else if (s_state == "READY") {
1696  auto server_file = serverMap.find("file");
1697  if (server_file == serverMap.end()) {
1698  //can be returned by server if files from new LS already appeared but LS is not yet closed
1699  if (serverLS <= closedServerLS)
1700  serverLS = closedServerLS + 1;
1701  fileStatus = noFile;
1702  edm::LogInfo("EvFDaqDirector")
1703  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1704  } else {
1705  std::string filestem;
1706  std::string fileprefix;
1707  auto server_fileprefix = serverMap.find("fileprefix");
1708 
1709  if (server_fileprefix != serverMap.end()) {
1710  auto pssize = server_fileprefix->second.size();
1711  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1712  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1713  else
1714  fileprefix = server_fileprefix->second;
1715  }
1716 
1717  //remove string literals
1718  auto ssize = server_file->second.size();
1719  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1720  filestem = server_file->second.substr(1, ssize - 2);
1721  else
1722  filestem = server_file->second;
1723  assert(!filestem.empty());
1724  if (version_maj > 1) {
1725  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1726  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1727  nextFileJson = "";
1728  rawHeader = true;
1729  } else {
1730  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1731  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1732  nextFileJson = filestem + ".jsn";
1733  rawHeader = false;
1734  }
1735  fileStatus = newFile;
1736  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1737  << serverLS << " file:" << filestem;
1738  }
1739  } else if (s_state == "EOLS") {
1740  serverLS = closedServerLS + 1;
1741  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1742  fileStatus = noFile;
1743  } else if (s_state == "EOR") {
1744  //server_eor = serverMap.find("iseor");
1745  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1746  fileStatus = runEnded;
1747  } else if (s_state == "NORUN") {
1748  auto err_msg = serverMap.find("errormessage");
1749  if (err_msg != serverMap.end())
1750  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1751  else
1752  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1753  edm::LogWarning("EvFDaqDirector") << "executing run end";
1754  fileStatus = runEnded;
1755  } else if (s_state == "ERROR") {
1756  auto err_msg = serverMap.find("errormessage");
1757  if (err_msg != serverMap.end())
1758  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1759  else
1760  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1761  fileStatus = noFile;
1762  serverError = true;
1763  } else {
1764  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1765  fileStatus = noFile;
1766  serverError = true;
1767  }
1768 
1769  // Read until EOF, writing data to output as we go.
1770  if (!fileBrokerKeepAlive_) {
1771  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1772  }
1773  if (ec != boost::asio::error::eof) {
1774  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec << dest;
1775  serverError = true;
1776  }
1777  }
1778 
1779  break;
1780  }
1781 
1782  } catch (std::exception const& e) {
1783  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1784  serverError = true;
1785  }
1786 
1787  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1788  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1789  if (ec) {
1790  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec << dest;
1791  }
1792  socket_->close(ec);
1793  if (ec) {
1794  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
1795  }
1796  }
1797 
1798  if (serverError) {
1799  if (socket_->is_open())
1800  socket_->close(ec);
1801  if (ec) {
1802  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec << dest;
1803  }
1804  fileStatus = noFile;
1805  sleep(1); //back-off if error detected
1806  }
1807 
1808  return fileStatus;
1809  }
1810 
1812  unsigned int& ls,
1813  std::string& nextFileRaw,
1814  int& rawFd,
1815  uint16_t& rawHeaderSize,
1816  int32_t& serverEventsInNewFile,
1817  int64_t& fileSizeFromMetadata,
1818  uint64_t& thisLockWaitTimeUs,
1819  bool requireHeader) {
1820  EvFDaqDirector::FileStatus fileStatus = noFile;
1821 
1822  //int retval = -1;
1823  //int lock_attempts = 0;
1824  //long total_lock_attempts = 0;
1825 
1826  struct stat buf;
1827  int stopFileLS = -1;
1828  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1829  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1830  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1831  if (stopFileCheck == 0)
1832  stopFileLS = readLastLSEntry(stopFilePath_);
1833  else
1834  stopFileLS = 1; //stop without drain if only pid is stopped
1835  if (!stop_ls_override_) {
1836  //if lumisection is higher than in stop file, should quit at next from current
1837  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1838  stopFileLS = stop_ls_override_ = ls;
1839  } else
1840  stopFileLS = stop_ls_override_;
1841  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1842  << stopFileLS;
1843  //return runEnded;
1844  } else //if file was removed before reaching stop condition, reset this
1845  stop_ls_override_ = 0;
1846 
1847  /* look for EoLS
1848  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1849  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1850  ls++;
1851  return noFile;
1852  }
1853  */
1854 
1855  timeval ts_lockbegin;
1856  gettimeofday(&ts_lockbegin, nullptr);
1857 
1858  std::string nextFileJson;
1859  uint32_t serverLS, closedServerLS;
1860  unsigned int serverHttpStatus;
1861  bool serverError;
1862 
1863  //local lock to force index json and EoLS files to appear in order
1865  lockFULocal();
1866 
1867  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1868  bool rawHeader = false;
1869  fileStatus = contactFileBroker(
1870  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1871 
1872  if (serverError) {
1873  //do not update anything
1875  unlockFULocal();
1876  return noFile;
1877  }
1878 
1879  //handle creation of BoLS files if lumisection has changed
1880  if (currentLumiSection == 0) {
1881  if (fileStatus == runEnded)
1882  createLumiSectionFiles(closedServerLS, 0, true, false);
1883  else
1884  createLumiSectionFiles(serverLS, 0, true, false);
1885  } else {
1886  if (closedServerLS >= currentLumiSection) {
1887  //only BoLS files
1888  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1889  createLumiSectionFiles(i + 1, i, true, false);
1890  }
1891  }
1892 
1893  bool fileFound = true;
1894 
1895  if (fileStatus == newFile) {
1896  if (rawHeader > 0)
1897  serverEventsInNewFile = grabNextJsonFromRaw(
1898  nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS, false, requireHeader);
1899  else
1900  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1901  }
1902  //closing file in case of any error
1903  if (serverEventsInNewFile < 0 && rawFd != -1) {
1904  close(rawFd);
1905  rawFd = -1;
1906  }
1907 
1908  //can unlock because all files have been created locally
1910  unlockFULocal();
1911 
1912  if (!fileFound) {
1913  //catch condition where directory got deleted
1914  fileStatus = noFile;
1915  struct stat buf;
1916  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1917  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1918  fileStatus = runEnded;
1919  }
1920  }
1921 
1922  //handle creation of EoLS files if lumisection has changed, this needs to be locked exclusively
1923  //so that EoLS files can not appear locally before index files
1924  if (currentLumiSection == 0) {
1925  lockFULocal2();
1926  if (fileStatus == runEnded) {
1927  createLumiSectionFiles(closedServerLS, 0, false, true);
1928  createLumiSectionFiles(serverLS, closedServerLS, false, true); // +1
1929  } else {
1930  createLumiSectionFiles(serverLS, 0, false, true);
1931  }
1932  unlockFULocal2();
1933  } else {
1934  if (closedServerLS >= currentLumiSection) {
1935  //lock exclusive to create EoLS files
1936  lockFULocal2();
1937  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1938  createLumiSectionFiles(i + 1, i, false, true);
1939  unlockFULocal2();
1940  }
1941  }
1942 
1943  if (fileStatus == runEnded)
1944  ls = std::max(currentLumiSection, serverLS);
1945  else if (fileStatus == newFile) {
1946  assert(serverLS >= ls);
1947  ls = serverLS;
1948  } else if (fileStatus == noFile) {
1949  if (serverLS >= ls)
1950  ls = serverLS;
1951  else {
1952  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1953  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1954  sleep(1);
1955  }
1956  }
1957 
1958  return fileStatus;
1959  }
1960 
1962  // create open dir if not already there
1963 
1965  if (!std::filesystem::is_directory(openPath)) {
1966  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1967  std::filesystem::create_directories(openPath);
1968  }
1969  }
1970 
1972  std::ifstream ij(file);
1973  Json::Value deserializeRoot;
1975 
1976  if (!reader.parse(ij, deserializeRoot)) {
1977  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1978  return -1;
1979  }
1980 
1981  int ret = deserializeRoot.get("lastLS", "").asInt();
1982  return ret;
1983  }
1984 
1986  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1988  struct stat buf;
1989  unsigned int lscount = 1;
1990  do {
1991  std::stringstream ss;
1992  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1993  fullpath = ss.str();
1994  lscount++;
1995  } while (stat(fullpath.c_str(), &buf) == 0);
1996  return lscount - 1;
1997  }
1998 
1999  //if transferSystem PSet is present in the menu, we require it to be complete and consistent for all specified streams
2001  std::string proc_flag = run_dir_ + "/processing";
2002  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
2003  close(proc_flag_fd);
2004  }
2005 
2006  struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
2007 #ifdef __APPLE__
2008  return {start, len, pid, type, whence};
2009 #else
2010  return {type, whence, start, len, pid};
2011 #endif
2012  }
2013 
2015  struct stat buf;
2016  return (stat(input_throttled_file_.c_str(), &buf) == 0);
2017  }
2018 
2020  struct stat buf;
2021  return (stat((discard_ls_filestem_ + std::to_string(ls)).c_str(), &buf) == 0);
2022  }
2023 
2024 } // namespace evf
unsigned int nThreads_
Definition: fillJson.h:27
int def(FILE *, FILE *, int)
struct flock fu_rw_flk
unsigned int nConcurrentLumis_
std::vector< std::string > bu_base_dirs_all_
std::string run_string_
LuminosityBlockNumber_t luminosityBlock() const
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::string bolsFileName(const unsigned int run, const unsigned int ls)
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
void watchPreallocate(Preallocate::slot_type const &iSlot)
boost::asio::io_service io_service_
bool rawFileHasHeader(std::string const &rawSourcePath, uint16_t &rawHeaderSize)
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
std::string fulockfile_
def create(alignables, pedeDump, additionalData, outputFile, config)
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
static bool checkFileRead(char *buf, int infile, std::size_t buf_sz, std::string const &path)
jsoncollector::DataPointDefinition * dpd_
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::string getFFFParamsFilePathOnBU() const
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
ret
prodAgent to be discontinued
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:31
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
volatile std::atomic< bool > shutdown_flag
pthread_mutex_t init_lock_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void removeFile(std::string)
bool lumisectionDiscarded(unsigned int ls)
reader
Definition: DQM.py:105
Unserialize a JSON document into a Value.
Definition: reader.h:17
Log< level::Error, false > LogError
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
std::string getOpenInitFilePath(std::string const &stream) const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
assert(be >=bs)
bool isExceptionOnData(unsigned int ls)
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
Definition: Electron.h:6
unsigned long previousFileSize_
static std::string to_string(const XMLCh *ch)
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
std::string getInitTempFilePath(std::string const &stream) const
std::string getEoRFilePathOnFU() const
struct flock fu_rw_fulk
std::string hltSourceDirectory_
unsigned int startFromLS_
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::mutex * fileDeleteLockPtr_
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def chmod(path, mode)
Definition: eostools.py:294
std::string getRunOpenDirPath() const
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
std::string stopFilePath_
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string bu_base_dir_
unsigned int stop_ls_override_
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string input_throttled_file_
std::string getBoLSFilePathOnFU(const unsigned int ls) const
std::string eorFileName(const unsigned int run)
int readLastLSEntry(std::string const &file)
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string stopFilePathPid_
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:66
unsigned int eolsNFilesIndex_
std::string & baseRunDir()
std::string run_nstring_
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
std::string getInitFilePath(std::string const &stream) const
void createProcessingNotificationMaybe() const
Log< level::Info, false > LogInfo
void openFULockfileStream(bool create)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
int getNFilesFromEoLS(std::string BUEoLSFile)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
def ls(path, rec=False)
Definition: eostools.py:349
void preBeginRun(edm::GlobalContext const &globalContext)
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned long long uint64_t
Definition: Time.h:13
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
std::vector< std::string > const & getNames() const
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, int maxLS, bool &setExceptionState)
def load(fileName)
Definition: svgfig.py:547
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
std::string getEoRFileName() const
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:233
evf::FastMonitoringService * fms_
std::string bu_run_dir_
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
std::string getEoRFilePath() const
def mkdir(path)
Definition: eostools.py:251
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
std::string findHighestRunDir()
Definition: DirManager.cc:23
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:80
Value get(UInt index, const Value &defaultValue) const
void postEndRun(edm::GlobalContext const &globalContext)
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:81
std::string discard_ls_filestem_
unsigned int getLumisectionToStart() const
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
std::string fileBrokerPort_
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS, bool doCreateEoLS)
std::string initTempFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
void preallocate(edm::service::SystemBounds const &bounds)
Log< level::Warning, false > LogWarning
std::vector< int > bu_base_dirs_nSources_
Represents a JSON value.
Definition: value.h:101
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
std::string fileBrokerHost_
JSON (JavaScript Object Notation).
Definition: DataPoint.h:26
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned int fuLockPollInterval_
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
std::string eolsFileName(const unsigned int run, const unsigned int ls)
unsigned int nStreams_
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string bu_run_open_dir_
#define LogDebug(id)