CMS 3D CMS Logo

EvFDaqDirector.cc
Go to the documentation of this file.
10 
18 
19 #include <iostream>
20 //#include <istream>
21 #include <sstream>
22 #include <sys/time.h>
23 #include <unistd.h>
24 #include <cstdio>
25 #include <boost/lexical_cast.hpp>
26 #include <boost/filesystem/fstream.hpp>
27 #include <boost/algorithm/string.hpp>
28 
29 //using boost::asio::ip::tcp;
30 
31 //#define DEBUG
32 
33 using namespace jsoncollector;
34 
35 namespace evf {
36 
37  //for enum MergeType
38  const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"", "DAT", "PB", "JSNDATA"};
39 
41  : base_dir_(pset.getUntrackedParameter<std::string>("baseDir")),
42  bu_base_dir_(pset.getUntrackedParameter<std::string>("buBaseDir")),
43  run_(pset.getUntrackedParameter<unsigned int>("runNumber")),
44  useFileBroker_(pset.getUntrackedParameter<bool>("useFileBroker")),
45  fileBrokerHostFromCfg_(pset.getUntrackedParameter<bool>("fileBrokerHostFromCfg", true)),
46  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost", "InValid")),
47  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort", "8080")),
48  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
49  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock", true)),
50  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval", 2000)),
51  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck", false)),
52  requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet", false)),
53  selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode", "")),
54  mergeTypePset_(pset.getUntrackedParameter<std::string>("mergingPset", "")),
55  directorBU_(pset.getUntrackedParameter<bool>("directorIsBU", false)),
56  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory", "")),
57  hostname_(""),
58  bu_readlock_fd_(-1),
59  bu_writelock_fd_(-1),
60  fu_readwritelock_fd_(-1),
61  fulocal_rwlock_fd_(-1),
62  fulocal_rwlock_fd2_(-1),
63  bu_w_lock_stream(nullptr),
64  bu_r_lock_stream(nullptr),
65  fu_rw_lock_stream(nullptr),
66  dirManager_(base_dir_),
67  previousFileSize_(0),
68  bu_w_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, 0)),
69  bu_r_flk(make_flock(F_RDLCK, SEEK_SET, 0, 0, 0)),
70  bu_w_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
71  bu_r_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, 0)),
72  fu_rw_flk(make_flock(F_WRLCK, SEEK_SET, 0, 0, getpid())),
73  fu_rw_fulk(make_flock(F_UNLCK, SEEK_SET, 0, 0, getpid())) {
79 
80  //save hostname for later
81  char hostname[33];
82  gethostname(hostname, 32);
83  hostname_ = hostname;
84 
85  char* fuLockPollIntervalPtr = std::getenv("FFF_LOCKPOLLINTERVAL");
86  if (fuLockPollIntervalPtr) {
87  try {
88  fuLockPollInterval_ = boost::lexical_cast<unsigned int>(std::string(fuLockPollIntervalPtr));
89  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_
90  << " us";
91  } catch (boost::bad_lexical_cast const&) {
92  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
93  }
94  }
95 
96  //override file service parameter if specified by environment
97  char* fileBrokerParamPtr = std::getenv("FFF_USEFILEBROKER");
98  if (fileBrokerParamPtr) {
99  try {
100  useFileBroker_ = (boost::lexical_cast<unsigned int>(std::string(fileBrokerParamPtr))) > 0;
101  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
102  } catch (boost::bad_lexical_cast const&) {
103  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
104  }
105  }
106  if (useFileBroker_) {
108  //find BU data address from hltd configuration
110  struct stat buf;
111  if (stat("/etc/appliance/bus.config", &buf) == 0) {
112  std::ifstream busconfig("/etc/appliance/bus.config", std::ifstream::in);
113  std::getline(busconfig, fileBrokerHost_);
114  }
115  if (fileBrokerHost_.empty())
116  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
117  } else if (fileBrokerHost_.empty() || fileBrokerHost_ == "InValid")
118  throw cms::Exception("EvFDaqDirector")
119  << "fileBrokerHostFromCfg must be set to true if fileBrokerHost parameter is not valid or empty";
120 
121  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
122  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_);
123  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
124  socket_ = std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
125  }
126 
127  char* startFromLSPtr = std::getenv("FFF_STARTFROMLS");
128  if (startFromLSPtr) {
129  try {
130  startFromLS_ = boost::lexical_cast<unsigned int>(std::string(startFromLSPtr));
131  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
132  } catch (boost::bad_lexical_cast const&) {
133  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
134  }
135  }
136 
137  //override file service parameter if specified by environment
138  char* fileBrokerUseLockParamPtr = std::getenv("FFF_FILEBROKERUSELOCALLOCK");
139  if (fileBrokerUseLockParamPtr) {
140  try {
141  fileBrokerUseLocalLock_ = (boost::lexical_cast<unsigned int>(std::string(fileBrokerUseLockParamPtr))) > 0;
142  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: "
144  } catch (boost::bad_lexical_cast const&) {
145  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
146  }
147  }
148  }
149 
151  std::stringstream ss;
152  ss << "run" << std::setfill('0') << std::setw(6) << run_;
153  run_string_ = ss.str();
154  ss = std::stringstream();
155  ss << run_;
156  run_nstring_ = ss.str();
157  run_dir_ = base_dir_ + "/" + run_string_;
158  ss = std::stringstream();
159  ss << getpid();
160  pid_ = ss.str();
161 
162  // check if base dir exists or create it accordingly
163  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
164  if (retval != 0 && errno != EEXIST) {
165  throw cms::Exception("DaqDirector")
166  << " Error checking for base dir -: " << base_dir_ << " mkdir error:" << strerror(errno);
167  }
168 
169  //create run dir in base dir
170  umask(0);
171  retval = mkdir(run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
172  if (retval != 0 && errno != EEXIST) {
173  throw cms::Exception("DaqDirector")
174  << " Error creating run dir -: " << run_dir_ << " mkdir error:" << strerror(errno);
175  }
176 
177  //create fu-local.lock in run open dir
178  if (!directorBU_) {
180  std::string fulocal_lock_ = getRunOpenDirPath() + "/fu-local.lock";
182  open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
183  if (fulocal_rwlock_fd_ == -1)
184  throw cms::Exception("DaqDirector")
185  << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
186  chmod(fulocal_lock_.c_str(), 0777);
187  fsync(fulocal_rwlock_fd_);
188  //open second fd for another input source thread
190  open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); //O_RDWR?
191  if (fulocal_rwlock_fd2_ == -1)
192  throw cms::Exception("DaqDirector")
193  << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
194  }
195 
196  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
197  //for BU, it is created at this point
198  if (directorBU_) {
200  std::string bulockfile = bu_run_dir_ + "/bu.lock";
201  fulockfile_ = bu_run_dir_ + "/fu.lock";
202 
203  //make or find bu run dir
204  retval = mkdir(bu_run_dir_.c_str(), S_IRWXU | S_IRWXG | S_IRWXO);
205  if (retval != 0 && errno != EEXIST) {
206  throw cms::Exception("DaqDirector")
207  << " Error creating bu run dir -: " << bu_run_dir_ << " mkdir error:" << strerror(errno) << "\n";
208  }
209  bu_run_open_dir_ = bu_run_dir_ + "/open";
210  retval = mkdir(bu_run_open_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
211  if (retval != 0 && errno != EEXIST) {
212  throw cms::Exception("DaqDirector")
213  << " Error creating bu run open dir -: " << bu_run_open_dir_ << " mkdir error:" << strerror(errno) << "\n";
214  }
215 
216  // the BU director does not need to know about the fu lock
217  bu_writelock_fd_ = open(bulockfile.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
218  if (bu_writelock_fd_ == -1)
219  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: " << strerror(errno);
220  else
221  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: " << bu_writelock_fd_;
222  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
223  if (bu_w_lock_stream == nullptr)
224  edm::LogWarning("EvFDaqDirector") << "Error creating write lock stream -: " << strerror(errno);
225 
226  // BU INITIALIZES LOCK FILE
227  // FU LOCK FILE OPEN
228  openFULockfileStream(true);
230  fflush(fu_rw_lock_stream);
231  close(fu_readwritelock_fd_);
232 
233  if (!hltSourceDirectory_.empty()) {
234  struct stat buf;
235  if (stat(hltSourceDirectory_.c_str(), &buf) == 0) {
236  std::string hltdir = bu_run_dir_ + "/hlt";
237  std::string tmphltdir = bu_run_open_dir_ + "/hlt";
238  retval = mkdir(tmphltdir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
239  if (retval != 0 && errno != EEXIST)
240  throw cms::Exception("DaqDirector")
241  << " Error creating bu run dir -: " << hltdir << " mkdir error:" << strerror(errno) << "\n";
242 
243  boost::filesystem::copy_file(hltSourceDirectory_ + "/HltConfig.py", tmphltdir + "/HltConfig.py");
244 
245  boost::filesystem::copy_file(hltSourceDirectory_ + "/fffParameters.jsn", tmphltdir + "/fffParameters.jsn");
246 
247  boost::filesystem::rename(tmphltdir, hltdir);
248  } else
249  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
250  }
251  //else{}//no configuration specified
252  } else {
253  // for FU, check if bu base dir exists
254 
255  retval = mkdir(bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
256  if (retval != 0 && errno != EEXIST) {
257  throw cms::Exception("DaqDirector")
258  << " Error checking for bu base dir -: " << bu_base_dir_ << " mkdir error:" << strerror(errno) << "\n";
259  }
260 
262  fulockfile_ = bu_run_dir_ + "/fu.lock";
263  openFULockfileStream(false);
264  }
265 
266  pthread_mutex_init(&init_lock_, nullptr);
267 
268  stopFilePath_ = run_dir_ + "/CMSSW_STOP";
269  std::stringstream sstp;
270  sstp << stopFilePath_ << "_pid" << pid_;
271  stopFilePathPid_ = sstp.str();
272 
273  if (!directorBU_) {
274  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
275  struct stat statbuf;
276  if (!stat(defPath.c_str(), &statbuf))
277  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
278  else {
279  //look in source directory if not present in ramdisk
280  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
281  defPath = std::string(std::getenv("CMSSW_BASE")) + "/" + defPathSuffix;
282  if (stat(defPath.c_str(), &statbuf)) {
283  defPath = std::string(std::getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
284  if (stat(defPath.c_str(), &statbuf)) {
285  defPath = defPathSuffix;
286  }
287  }
288  }
289  dpd_ = new DataPointDefinition();
290  std::string defLabel = "data";
291  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_, &defLabel);
292  }
293  }
294 
296  //close server connection
297  if (socket_.get() && socket_->is_open()) {
298  boost::system::error_code ec;
299  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
300  socket_->close(ec);
301  }
302 
303  if (fulocal_rwlock_fd_ != -1) {
304  unlockFULocal();
305  close(fulocal_rwlock_fd_);
306  }
307 
308  if (fulocal_rwlock_fd2_ != -1) {
309  unlockFULocal2();
310  close(fulocal_rwlock_fd2_);
311  }
312  }
313 
315  initRun();
316 
317  nThreads_ = bounds.maxNumberOfStreams();
318  nStreams_ = bounds.maxNumberOfThreads();
319  }
320 
323  desc.setComment(
324  "Service used for file locking arbitration and for propagating information between other EvF components");
325  desc.addUntracked<std::string>("baseDir", ".")->setComment("Local base directory for run output");
326  desc.addUntracked<std::string>("buBaseDir", ".")->setComment("BU base ramdisk directory ");
327  desc.addUntracked<unsigned int>("runNumber", 0)->setComment("Run Number in ramdisk to open");
328  desc.addUntracked<bool>("useFileBroker", false)
329  ->setComment("Use BU file service to grab input data instead of NFS file locking");
330  desc.addUntracked<bool>("fileBrokerHostFromCfg", true)
331  ->setComment("Allow service to discover BU address from hltd configuration");
332  desc.addUntracked<std::string>("fileBrokerHost", "InValid")->setComment("BU file service host.");
333  desc.addUntracked<std::string>("fileBrokerPort", "8080")->setComment("BU file service port");
334  desc.addUntracked<bool>("fileBrokerKeepAlive", true)
335  ->setComment("Use keep alive to avoid using large number of sockets");
336  desc.addUntracked<bool>("fileBrokerUseLocalLock", true)
337  ->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
338  desc.addUntracked<unsigned int>("fuLockPollInterval", 2000)
339  ->setComment("Lock polling interval in microseconds for the input directory file lock");
340  desc.addUntracked<bool>("outputAdler32Recheck", false)
341  ->setComment("Check Adler32 of per-process output files while micro-merging");
342  desc.addUntracked<bool>("requireTransfersPSet", false)
343  ->setComment("Require complete transferSystem PSet in the process configuration");
344  desc.addUntracked<std::string>("selectedTransferMode", "")
345  ->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
346  desc.addUntracked<bool>("directorIsBU", false)->setComment("BU director mode used for testing");
347  desc.addUntracked<std::string>("hltSourceDirectory", "")->setComment("BU director mode source directory");
348  desc.addUntracked<std::string>("mergingPset", "")
349  ->setComment("Name of merging PSet to look for merging type definitions for streams");
350  descriptions.add("EvFDaqDirector", desc);
351  }
352 
355  checkMergeTypePSet(pc);
356  }
357 
358  void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
359  //assert(run_ == id.run());
360 
361  // check if the requested run is the latest one - issue a warning if it isn't
363  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: " << run_dir_
364  << ". This is not the highest run " << dirManager_.findHighestRunDir();
365  }
366  }
367 
368  void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
369  close(bu_readlock_fd_);
370  close(bu_writelock_fd_);
371  if (directorBU_) {
372  std::string filename = bu_run_dir_ + "/bu.lock";
373  removeFile(filename);
374  }
375  }
376 
378  //delete all files belonging to just closed lumi
379  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
381  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
382  return;
383  }
384 
385  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
386  auto it = filesToDeletePtr_->begin();
387  while (it != filesToDeletePtr_->end()) {
388  if (it->second->lumi_ == ls) {
389  it = filesToDeletePtr_->erase(it);
390  } else
391  it++;
392  }
393  }
394 
395  std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
396  return bu_run_dir_ + "/" + fffnaming::inputJsonFileName(run_, ls, index);
397  }
398 
399  std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
400  return bu_run_dir_ + "/" + fffnaming::inputRawFileName(run_, ls, index);
401  }
402 
403  std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
404  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_, ls, index);
405  }
406 
407  std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
408  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_, ls, index);
409  }
410 
411  std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
412  return run_dir_ + "/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
413  }
414 
415  std::string EvFDaqDirector::getOpenDatFilePath(const unsigned int ls, std::string const& stream) const {
416  return run_dir_ + "/open/" + fffnaming::streamerDataFileNameWithPid(run_, ls, stream);
417  }
418 
419  std::string EvFDaqDirector::getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
420  return run_dir_ + "/open/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
421  }
422 
423  std::string EvFDaqDirector::getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
424  return run_dir_ + "/" + fffnaming::streamerJsonFileNameWithPid(run_, ls, stream);
425  }
426 
427  std::string EvFDaqDirector::getMergedDatFilePath(const unsigned int ls, std::string const& stream) const {
429  }
430 
431  std::string EvFDaqDirector::getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const {
433  }
434 
436  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_, 0, stream);
437  }
438 
440  return run_dir_ + "/" + fffnaming::initFileNameWithPid(run_, 0, stream);
441  }
442 
444  std::string const& stream) const {
445  return run_dir_ + "/open/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_, ls, stream);
446  }
447 
449  std::string const& stream) const {
451  }
452 
454  std::string const& stream) const {
456  }
457 
458  std::string EvFDaqDirector::getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
459  return run_dir_ + "/open/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
460  }
461 
462  std::string EvFDaqDirector::getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
463  return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithPid(run_, ls, stream);
464  }
465 
468  }
469 
471  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
472  }
473 
475  return run_dir_ + "/" + fffnaming::eolsFileName(run_, ls);
476  }
477 
479  return run_dir_ + "/" + fffnaming::bolsFileName(run_, ls);
480  }
481 
483 
485 
486  std::string EvFDaqDirector::getFFFParamsFilePathOnBU() const { return bu_run_dir_ + "/hlt/fffParameters.jsn"; }
487 
489  int retval = remove(filename.c_str());
490  if (retval != 0)
491  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename
492  << ". error = " << strerror(errno);
493  }
494 
495  void EvFDaqDirector::removeFile(unsigned int ls, unsigned int index) { removeFile(getRawFilePath(ls, index)); }
496 
498  std::string& nextFile,
499  uint32_t& fsize,
500  uint64_t& lockWaitTime) {
501  EvFDaqDirector::FileStatus fileStatus = noFile;
502 
503  int retval = -1;
504  int lock_attempts = 0;
505  long total_lock_attempts = 0;
506 
507  struct stat buf;
508  int stopFileLS = -1;
509  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
510  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
511  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
512  if (stopFileCheck == 0)
513  stopFileLS = readLastLSEntry(stopFilePath_);
514  else
515  stopFileLS = 1; //stop without drain if only pid is stopped
516  if (!stop_ls_override_) {
517  //if lumisection is higher than in stop file, should quit at next from current
518  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
519  stopFileLS = stop_ls_override_ = ls;
520  } else
521  stopFileLS = stop_ls_override_;
522  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
523  << stopFileLS;
524  //return runEnded;
525  } else //if file was removed before reaching stop condition, reset this
526  stop_ls_override_ = 0;
527 
528  timeval ts_lockbegin;
529  gettimeofday(&ts_lockbegin, nullptr);
530 
531  while (retval == -1) {
532  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
533  if (retval == -1)
534  usleep(fuLockPollInterval_);
535  else
536  continue;
537 
538  lock_attempts += fuLockPollInterval_;
539  total_lock_attempts += fuLockPollInterval_;
540  if (lock_attempts > 5000000 || errno == 116) {
541  if (errno == 116)
542  edm::LogWarning("EvFDaqDirector")
543  << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
544  else
545  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and "
546  "fu.lock file are present -: errno "
547  << errno << ":" << strerror(errno) << std::endl;
548 
549  if (stat(getEoLSFilePathOnFU(ls).c_str(), &buf) == 0) {
550  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection " << ls;
551  ls++;
552  return noFile;
553  }
554 
555  if (stat(bu_run_dir_.c_str(), &buf) != 0)
556  return runEnded;
557  if (stat(fulockfile_.c_str(), &buf) != 0)
558  return runEnded;
559 
560  lock_attempts = 0;
561  }
562  if (total_lock_attempts > 5 * 60000000) {
563  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
564  return runAbort;
565  }
566  }
567 
568  timeval ts_lockend;
569  gettimeofday(&ts_lockend, nullptr);
570  long deltat = (ts_lockend.tv_usec - ts_lockbegin.tv_usec) + (ts_lockend.tv_sec - ts_lockbegin.tv_sec) * 1000000;
571  if (deltat > 0.)
572  lockWaitTime = deltat;
573 
574  if (retval != 0)
575  return fileStatus;
576 
577 #ifdef DEBUG
578  timeval ts_lockend;
579  gettimeofday(&ts_lockend, 0);
580 #endif
581 
582  //open another lock file FD after the lock using main fd has been acquired
583  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
584  if (fu_readwritelock_fd2 == -1)
585  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
586  << " create. error:" << strerror(errno);
587 
588  FILE* fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
589 
590  // if the stream is readable
591  if (fu_rw_lock_stream2 != nullptr) {
592  unsigned int readLs, readIndex;
593  int check = 0;
594  // rewind the stream
595  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
596  // if rewinded ok
597  if (check == 0) {
598  // read its' values
599  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
600  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
601 
602  unsigned int currentLs = readLs;
603  bool bumpedOk = false;
604  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
605  //no lock file write in this case
606  if (ls && ls + 1 < currentLs)
607  ls++;
608  else {
609  // try to bump (look for new index or EoLS file)
610  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, stopFileLS);
611  //avoid 2 lumisections jump
612  if (ls && readLs > currentLs && currentLs > ls) {
613  ls++;
614  readLs = currentLs = ls;
615  readIndex = 0;
616  bumpedOk = false;
617  //no write to lock file
618  } else {
619  if (ls == 0 && readLs > currentLs) {
620  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
621  //in this case there is no new file in the same LS
622  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
623  readLs = currentLs;
624  readIndex = 0;
625  bumpedOk = false;
626  //no write to lock file
627  }
628  //update return LS value
629  ls = readLs;
630  }
631  }
632  if (bumpedOk) {
633  // there is a new index file to grab, lock file needs to be updated
634  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
635  if (check == 0) {
636  ftruncate(fu_readwritelock_fd2, 0);
637  // write next index in the file, which is the file the next process should take
638  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
639  fflush(fu_rw_lock_stream2);
640  fsync(fu_readwritelock_fd2);
641  fileStatus = newFile;
642  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
643  } else {
644  throw cms::Exception("EvFDaqDirector")
645  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
646  }
647  } else if (currentLs < readLs) {
648  //there is no new file in next LS (yet), but lock file can be updated to the next LS
649  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
650  if (check == 0) {
651  ftruncate(fu_readwritelock_fd2, 0);
652  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
653  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
654  fflush(fu_rw_lock_stream2);
655  fsync(fu_readwritelock_fd2);
656  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
657  } else {
658  throw cms::Exception("EvFDaqDirector")
659  << "seek on fu read/write lock for updating failed with error " << strerror(errno);
660  }
661  }
662  } else {
663  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
664  << strerror(errno);
665  }
666  } else {
667  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
668  }
669  fclose(fu_rw_lock_stream2); // = fdopen(fu_readwritelock_fd2, "r+");
670 
671 #ifdef DEBUG
672  timeval ts_preunlock;
673  gettimeofday(&ts_preunlock, 0);
674  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
675  double locked_period = locked_period_int + double(ts_preunlock.tv_usec - ts_lockend.tv_usec) / 1000000;
676 #endif
677 
678  //if new json is present, lock file which FedRawDataInputSource will later unlock
679  if (fileStatus == newFile)
680  lockFULocal();
681 
682  //release lock at this point
683  int retvalu = -1;
684  retvalu = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
685  if (retvalu == -1)
686  edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
687 
688 #ifdef DEBUG
689  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
690 #endif
691 
692  if (fileStatus == noFile) {
693  struct stat buf;
694  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
695  if (stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf) != 0)
696  fileStatus = runEnded;
697  if (stopFileLS >= 0 && (int)ls > stopFileLS) {
698  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
699  fileStatus = runEnded;
700  }
701  }
702  return fileStatus;
703  }
704 
706  boost::filesystem::ifstream ij(BUEoLSFile);
707  Json::Value deserializeRoot;
709 
710  if (!reader.parse(ij, deserializeRoot)) {
711  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
712  return -1;
713  }
714 
716  DataPoint dp;
717  dp.deserialize(deserializeRoot);
718 
719  //read definition
720  if (readEolsDefinition_) {
721  //std::string def = boost::algorithm::trim(dp.getDefinition());
723  if (def.empty())
724  readEolsDefinition_ = false;
725  while (!def.empty()) {
727  if (def.find('/') == 0)
728  fullpath = def;
729  else
730  fullpath = bu_run_dir_ + '/' + def;
731  struct stat buf;
732  if (stat(fullpath.c_str(), &buf) == 0) {
733  DataPointDefinition eolsDpd;
734  std::string defLabel = "legend";
735  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
736  if (eolsDpd.getNames().empty()) {
737  //try with "data" label if "legend" format is not used
738  eolsDpd = DataPointDefinition();
739  defLabel = "data";
740  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd, &defLabel);
741  }
742  for (unsigned int i = 0; i < eolsDpd.getNames().size(); i++)
743  if (eolsDpd.getNames().at(i) == "NFiles")
745  readEolsDefinition_ = false;
746  break;
747  }
748  //check if we can still find definition
749  if (def.size() <= 1 || def.find('/') == std::string::npos) {
750  readEolsDefinition_ = false;
751  break;
752  }
753  def = def.substr(def.find('/') + 1);
754  }
755  }
756 
757  if (dp.getData().size() > eolsNFilesIndex_)
758  data = dp.getData()[eolsNFilesIndex_];
759  else {
760  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
761  return -1;
762  }
763  return boost::lexical_cast<int>(data);
764  }
765 
767  unsigned int& ls, unsigned int& index, std::string& nextFile, uint32_t& fsize, int maxLS) {
768  if (previousFileSize_ != 0) {
769  if (!fms_) {
771  }
772  if (fms_)
774  previousFileSize_ = 0;
775  }
776 
777  //reached limit
778  if (maxLS >= 0 && ls > (unsigned int)maxLS)
779  return false;
780 
781  struct stat buf;
782  std::stringstream ss;
783  unsigned int nextIndex = index;
784  nextIndex++;
785 
786  // 1. Check suggested file
787  nextFile = getInputJsonFilePath(ls, index);
788  if (stat(nextFile.c_str(), &buf) == 0) {
789  previousFileSize_ = buf.st_size;
790  fsize = buf.st_size;
791  return true;
792  }
793  // 2. No file -> lumi ended? (and how many?)
794  else {
795  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
796  bool eolFound = (stat(BUEoLSFile.c_str(), &buf) == 0);
797  while (eolFound) {
798  // recheck that no raw file appeared in the meantime
799  if (stat(nextFile.c_str(), &buf) == 0) {
800  previousFileSize_ = buf.st_size;
801  fsize = buf.st_size;
802  return true;
803  }
804 
805  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
806  if (indexFilesInLS < 0)
807  //parsing failed
808  return false;
809  else {
810  //check index
811  if ((int)index < indexFilesInLS) {
812  //we have 2 files, and check for 1 failed... retry (2 will never be here)
813  edm::LogError("EvFDaqDirector")
814  << "Potential miss of index file in LS -: " << ls << ". Missing " << nextFile << " because "
815  << indexFilesInLS - 1 << " is the highest index expected. Will not update fu.lock file";
816  return false;
817  }
818  }
819  // this lumi ended, check for files
820  ++ls;
821  index = 0;
822 
823  //reached limit
824  if (maxLS >= 0 && ls > (unsigned int)maxLS)
825  return false;
826 
827  nextFile = getInputJsonFilePath(ls, 0);
828  if (stat(nextFile.c_str(), &buf) == 0) {
829  // a new file was found at new lumisection, index 0
830  previousFileSize_ = buf.st_size;
831  fsize = buf.st_size;
832  return true;
833  } else {
834  //change of policy: we need to cycle through each LS
835  return false;
836  }
837  BUEoLSFile = getEoLSFilePathOnBU(ls);
838  eolFound = (stat(BUEoLSFile.c_str(), &buf) == 0);
839  }
840  }
841  // no new file found
842  return false;
843  }
844 
846  if (fu_rw_lock_stream == nullptr)
847  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream " << strerror(errno);
848  else {
849  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
850  unsigned int readLs = 1, readIndex = 0;
851  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
852  }
853  }
854 
856  if (create) {
858  open(fulockfile_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
859  chmod(fulockfile_.c_str(), 0766);
860  } else {
861  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
862  }
863  if (fu_readwritelock_fd_ == -1)
864  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
865  << " create:" << create << " error:" << strerror(errno);
866  else
867  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: " << fu_readwritelock_fd_;
868 
869  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
870  if (fu_rw_lock_stream == nullptr)
871  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
872  }
873 
874  void EvFDaqDirector::lockInitLock() { pthread_mutex_lock(&init_lock_); }
875 
876  void EvFDaqDirector::unlockInitLock() { pthread_mutex_unlock(&init_lock_); }
877 
879  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
880  flock(fulocal_rwlock_fd_, LOCK_SH);
881  }
882 
884  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
885  flock(fulocal_rwlock_fd_, LOCK_UN);
886  }
887 
889  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
890  flock(fulocal_rwlock_fd2_, LOCK_EX);
891  }
892 
894  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
895  flock(fulocal_rwlock_fd2_, LOCK_UN);
896  }
897 
898  void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const {
899  //used for backpressure mechanisms and monitoring
900  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
901  struct stat buf;
902  if (checkIfExists == false || stat(fuBoLS.c_str(), &buf) != 0) {
903  int bol_fd = open(fuBoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
904  close(bol_fd);
905  }
906  }
907 
908  void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection,
909  const uint32_t currentLumiSection,
910  bool doCreateBoLS) {
911  if (currentLumiSection > 0) {
912  const std::string fuEoLS = getEoLSFilePathOnFU(currentLumiSection);
913  struct stat buf;
914  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
915  if (!found) {
916  int eol_fd = open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
917  close(eol_fd);
918  if (doCreateBoLS)
919  createBoLSFile(lumiSection, false);
920  }
921  } else if (doCreateBoLS) {
922  createBoLSFile(lumiSection, true); //needed for initial lumisection
923  }
924  }
925 
927  int& rawFd,
928  uint16_t& rawHeaderSize,
929  uint32_t& lsFromHeader,
930  int32_t& eventsFromHeader,
931  int64_t& fileSizeFromHeader,
932  bool requireHeader,
933  bool retry,
934  bool closeFile) {
935  int infile;
936 
937  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
938  if (retry) {
939  edm::LogWarning("EvFDaqDirector")
940  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
941  return parseFRDFileHeader(rawSourcePath,
942  rawFd,
943  rawHeaderSize,
944  lsFromHeader,
945  eventsFromHeader,
946  fileSizeFromHeader,
947  requireHeader,
948  false,
949  closeFile);
950  } else {
951  if ((infile = ::open(rawSourcePath.c_str(), O_RDONLY)) < 0) {
952  edm::LogError("EvFDaqDirector")
953  << "parseFRDFileHeader - failed to open input file -: " << rawSourcePath << " : " << strerror(errno);
954  if (errno == ENOENT)
955  return 1; // error && file not found
956  else
957  return -1;
958  }
959  }
960  }
961 
962  constexpr std::size_t buf_sz = sizeof(FRDFileHeader_v1); //try to read v1 FRD header size
963  FRDFileHeader_v1 fileHead;
964 
965  ssize_t sz_read = ::read(infile, (char*)&fileHead, buf_sz);
966  if (closeFile) {
967  close(infile);
968  infile = -1;
969  }
970 
971  if (sz_read < 0) {
972  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - unable to read " << rawSourcePath << " : "
973  << strerror(errno);
974  if (infile != -1)
975  close(infile);
976  return -1;
977  }
978  if ((size_t)sz_read < buf_sz) {
979  edm::LogError("EvFDaqDirector") << "parseFRDFileHeader - file smaller than header: " << rawSourcePath;
980  if (infile != -1)
981  close(infile);
982  return -1;
983  }
984 
985  uint16_t frd_version = getFRDFileHeaderVersion(fileHead.id_, fileHead.version_);
986 
987  if (frd_version == 0) {
988  //no header (specific sequence not detected)
989  if (requireHeader) {
990  edm::LogError("EvFDaqDirector") << "no header or invalid version string found in:" << rawSourcePath;
991  if (infile != -1)
992  close(infile);
993  return -1;
994  } else {
995  //no header, but valid file
996  lseek(infile, 0, SEEK_SET);
997  rawHeaderSize = 0;
998  lsFromHeader = 0;
999  eventsFromHeader = -1;
1000  fileSizeFromHeader = -1;
1001  }
1002  } else {
1003  //version 1 header
1004  uint32_t headerSizeRaw = fileHead.headerSize_;
1005  if (headerSizeRaw < buf_sz) {
1006  edm::LogError("EvFDaqDirector") << "inconsistent header size: " << rawSourcePath << " size: " << headerSizeRaw
1007  << " v:" << frd_version;
1008  if (infile != -1)
1009  close(infile);
1010  return -1;
1011  }
1012  //allow header size to exceed read size. Future header versions will not break this, but the size can change.
1013  lsFromHeader = fileHead.lumiSection_;
1014  eventsFromHeader = (int32_t)fileHead.eventCount_;
1015  fileSizeFromHeader = (int64_t)fileHead.fileSize_;
1016  rawHeaderSize = fileHead.headerSize_;
1017  }
1018  rawFd = infile;
1019  return 0; //OK
1020  }
1021 
1023  int& rawFd,
1024  uint16_t& rawHeaderSize,
1025  int64_t& fileSizeFromHeader,
1026  bool& fileFound,
1027  uint32_t serverLS) {
1028  fileFound = true;
1029 
1030  //take only first three tokens delimited by "_" in the renamed raw file name
1031  std::string jsonStem = boost::filesystem::path(rawSourcePath).stem().string();
1032  size_t pos = 0, n_tokens = 0;
1033  while (n_tokens++ < 3 && (pos = jsonStem.find("_", pos + 1)) != std::string::npos) {
1034  }
1035  std::string reducedJsonStem = jsonStem.substr(0, pos);
1036 
1037  std::ostringstream fileNameWithPID;
1038  //should be ported to use fffnaming
1039  fileNameWithPID << reducedJsonStem << "_pid" << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
1040 
1041  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1042 
1043  LogDebug("EvFDaqDirector") << "RAW parse -: " << rawSourcePath << " and JSON create " << jsonDestPath;
1044 
1045  //parse RAW file header if it exists
1046  uint32_t lsFromRaw;
1047  int32_t nbEventsWrittenRaw;
1048  int64_t fileSizeFromRaw;
1049  auto ret = parseFRDFileHeader(
1050  rawSourcePath, rawFd, rawHeaderSize, lsFromRaw, nbEventsWrittenRaw, fileSizeFromRaw, true, true, false);
1051  if (ret != 0) {
1052  if (ret == 1)
1053  fileFound = false;
1054  return -1;
1055  }
1056 
1057  int outfile;
1058  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1059  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1060  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1061  if (errno == EEXIST) {
1062  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - destination file already exists -: " << jsonDestPath
1063  << " : ";
1064  return -1;
1065  }
1066  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file -: " << jsonDestPath << " : "
1067  << strerror(errno);
1068  struct stat out_stat;
1069  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1070  edm::LogWarning("EvFDaqDirector")
1071  << "grabNextJsonFromRaw - output file possibly got created with error, deleting and retry -: "
1072  << jsonDestPath;
1073  if (unlink(jsonDestPath.c_str()) == -1) {
1074  edm::LogWarning("EvFDaqDirector")
1075  << "grabNextJsonFromRaw - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1076  }
1077  }
1078  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1079  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to open output file (on retry) -: "
1080  << jsonDestPath << " : " << strerror(errno);
1081  return -1;
1082  }
1083  }
1084  //write JSON file (TODO: use jsoncpp)
1085  std::stringstream ss;
1086  ss << "{\"data\":[" << nbEventsWrittenRaw << "," << fileSizeFromRaw << ",\"" << rawSourcePath << "\"]}";
1087  std::string sstr = ss.str();
1088 
1089  if (::write(outfile, sstr.c_str(), sstr.size()) < 0) {
1090  edm::LogError("EvFDaqDirector") << "grabNextJsonFromRaw - failed to write to output file file -: " << jsonDestPath
1091  << " : " << strerror(errno);
1092  return -1;
1093  }
1094  close(outfile);
1095  if (serverLS && serverLS != lsFromRaw)
1096  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFromRaw - mismatch in expected (server) LS " << serverLS
1097  << " and raw file header LS " << lsFromRaw;
1098 
1099  fileSizeFromHeader = fileSizeFromRaw;
1100  return nbEventsWrittenRaw;
1101  }
1102 
1104  std::string const& rawSourcePath,
1105  int64_t& fileSizeFromJson,
1106  bool& fileFound) {
1107  fileFound = true;
1108 
1109  //should be ported to use fffnaming
1110  std::ostringstream fileNameWithPID;
1111  fileNameWithPID << boost::filesystem::path(rawSourcePath).stem().string() << "_pid" << std::setfill('0')
1112  << std::setw(5) << pid_ << ".jsn";
1113 
1114  // assemble json destination path
1115  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
1116 
1117  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1118 
1119  int infile = -1, outfile = -1;
1120 
1121  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1122  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : "
1123  << strerror(errno);
1124  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY)) < 0) {
1125  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: "
1126  << jsonSourcePath << " : " << strerror(errno);
1127  if (errno == ENOENT)
1128  fileFound = false;
1129  return -1;
1130  }
1131  }
1132 
1133  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
1134  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
1135  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1136  if (errno == EEXIST) {
1137  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath
1138  << " : ";
1139  ::close(infile);
1140  return -1;
1141  }
1142  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : "
1143  << strerror(errno);
1144  struct stat out_stat;
1145  if (stat(jsonDestPath.c_str(), &out_stat) == 0) {
1146  edm::LogWarning("EvFDaqDirector")
1147  << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1148  if (unlink(jsonDestPath.c_str()) == -1) {
1149  edm::LogWarning("EvFDaqDirector")
1150  << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : " << strerror(errno);
1151  }
1152  }
1153  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode)) < 0) {
1154  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: "
1155  << jsonDestPath << " : " << strerror(errno);
1156  ::close(infile);
1157  return -1;
1158  }
1159  }
1160  //copy contents
1161  const std::size_t buf_sz = 512;
1162  std::size_t tot_written = 0;
1163  std::unique_ptr<char> buf(new char[buf_sz]);
1164 
1165  ssize_t sz, sz_read = 1, sz_write;
1166  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0) {
1167  sz_write = 0;
1168  do {
1169  assert(sz_read - sz_write > 0);
1170  if ((sz = ::write(outfile, buf.get() + sz_write, sz_read - sz_write)) < 0) {
1171  sz_read = sz; // cause read loop termination
1172  break;
1173  }
1174  assert(sz > 0);
1175  sz_write += sz;
1176  tot_written += sz;
1177  } while (sz_write < sz_read);
1178  }
1179  close(infile);
1180  close(outfile);
1181 
1182  if (tot_written > 0) {
1183  //leave file if it was empty for diagnosis
1184  if (unlink(jsonSourcePath.c_str()) == -1) {
1185  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "
1186  << strerror(errno);
1187  return -1;
1188  }
1189  } else {
1190  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: "
1191  << jsonSourcePath;
1192  return -1;
1193  }
1194 
1195  Json::Value deserializeRoot;
1197 
1198  std::string data;
1199  std::stringstream ss;
1200  bool result;
1201  try {
1202  if (tot_written <= buf_sz) {
1203  result = reader.parse(buf.get(), deserializeRoot);
1204  } else {
1205  //json will normally not be bigger than buf_sz bytes
1206  try {
1207  boost::filesystem::ifstream ij(jsonDestPath);
1208  ss << ij.rdbuf();
1209  } catch (boost::filesystem::filesystem_error const& ex) {
1210  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1211  return -1;
1212  }
1213  result = reader.parse(ss.str(), deserializeRoot);
1214  }
1215  if (!result) {
1216  if (tot_written <= buf_sz)
1217  ss << buf.get();
1218  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath << "\nERROR:\n"
1219  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1220  << ss.str() << ".";
1221  return -1;
1222  }
1223 
1224  //read BU JSON
1225  DataPoint dp;
1226  dp.deserialize(deserializeRoot);
1227  bool success = false;
1228  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1229  if (dpd_->getNames().at(i) == "NEvents")
1230  if (i < dp.getData().size()) {
1231  data = dp.getData()[i];
1232  success = true;
1233  break;
1234  }
1235  }
1236  if (!success) {
1237  if (!dp.getData().empty())
1238  data = dp.getData()[0];
1239  else {
1240  edm::LogError("EvFDaqDirector::grabNextJsonFile")
1241  << "grabNextJsonFile - "
1242  << " error reading number of events from BU JSON; No input value. data -: " << data;
1243  return -1;
1244  }
1245  }
1246 
1247  //try to read raw file size
1248  fileSizeFromJson = -1;
1249  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1250  if (dpd_->getNames().at(i) == "NBytes") {
1251  if (i < dp.getData().size()) {
1252  std::string dataSize = dp.getData()[i];
1253  try {
1254  fileSizeFromJson = boost::lexical_cast<long>(dataSize);
1255  } catch (boost::bad_lexical_cast const&) {
1256  //non-fatal currently, processing can continue without this value
1257  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1258  << "Input value is -: " << dataSize;
1259  }
1260  break;
1261  }
1262  }
1263  }
1264  return boost::lexical_cast<int>(data);
1265  } catch (boost::bad_lexical_cast const& e) {
1266  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1267  << "Input value is -: " << data;
1268  } catch (std::runtime_error const& e) {
1269  //Can be thrown by Json parser
1270  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1271  }
1272 
1273  catch (std::exception const& e) {
1274  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1275  } catch (...) {
1276  //unknown exception
1277  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1278  }
1279 
1280  return -1;
1281  }
1282 
1284  std::string data;
1285  try {
1286  // assemble json destination path
1287  boost::filesystem::path jsonDestPath(baseRunDir());
1288 
1289  //should be ported to use fffnaming
1290  std::ostringstream fileNameWithPID;
1291  fileNameWithPID << jsonSourcePath.stem().string() << "_pid" << std::setfill('0') << std::setw(5) << getpid()
1292  << ".jsn";
1293  jsonDestPath /= fileNameWithPID.str();
1294 
1295  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
1296  try {
1297  boost::filesystem::copy(jsonSourcePath, jsonDestPath);
1298  } catch (boost::filesystem::filesystem_error const& ex) {
1299  // Input dir gone?
1300  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1301  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1302  sleep(1);
1303  boost::filesystem::copy(jsonSourcePath, jsonDestPath);
1304  }
1305  unlockFULocal();
1306 
1307  try {
1308  //sometimes this fails but file gets deleted
1309  boost::filesystem::remove(jsonSourcePath);
1310  } catch (boost::filesystem::filesystem_error const& ex) {
1311  // Input dir gone?
1312  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1313  } catch (std::exception const& ex) {
1314  // Input dir gone?
1315  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1316  }
1317 
1318  boost::filesystem::ifstream ij(jsonDestPath);
1319  Json::Value deserializeRoot;
1321 
1322  std::stringstream ss;
1323  ss << ij.rdbuf();
1324  if (!reader.parse(ss.str(), deserializeRoot)) {
1325  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1326  << "\nERROR:\n"
1327  << reader.getFormatedErrorMessages() << "CONTENT:\n"
1328  << ss.str() << ".";
1329  throw std::runtime_error("Cannot deserialize input JSON file");
1330  }
1331 
1332  //read BU JSON
1333  std::string data;
1334  DataPoint dp;
1335  dp.deserialize(deserializeRoot);
1336  bool success = false;
1337  for (unsigned int i = 0; i < dpd_->getNames().size(); i++) {
1338  if (dpd_->getNames().at(i) == "NEvents")
1339  if (i < dp.getData().size()) {
1340  data = dp.getData()[i];
1341  success = true;
1342  }
1343  }
1344  if (!success) {
1345  if (!dp.getData().empty())
1346  data = dp.getData()[0];
1347  else
1348  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock")
1349  << " error reading number of events from BU JSON -: No input value " << data;
1350  }
1351  return boost::lexical_cast<int>(data);
1352  } catch (boost::filesystem::filesystem_error const& ex) {
1353  // Input dir gone?
1354  unlockFULocal();
1355  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1356  } catch (std::runtime_error const& e) {
1357  // Another process grabbed the file and NFS did not register this
1358  unlockFULocal();
1359  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1360  } catch (boost::bad_lexical_cast const&) {
1361  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1362  << "Input value is -: " << data;
1363  } catch (std::exception const& e) {
1364  // BU run directory disappeared?
1365  unlockFULocal();
1366  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1367  }
1368 
1369  return -1;
1370  }
1371 
1373  bool& serverError,
1374  uint32_t& serverLS,
1375  uint32_t& closedServerLS,
1376  std::string& nextFileJson,
1377  std::string& nextFileRaw,
1378  bool& rawHeader,
1379  int maxLS) {
1380  EvFDaqDirector::FileStatus fileStatus = noFile;
1381  serverError = false;
1382 
1383  boost::system::error_code ec;
1384  try {
1385  while (true) {
1386  //socket connect
1387  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1389 
1390  if (ec) {
1391  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1392  serverError = true;
1393  break;
1394  }
1395  }
1396 
1397  boost::asio::streambuf request;
1398  std::ostream request_stream(&request);
1399  std::string path = "/popfile?runnumber=" + run_nstring_ + "&pid=" + pid_;
1400  if (maxLS >= 0) {
1401  std::stringstream spath;
1402  spath << path << "&stopls=" << maxLS;
1403  path = spath.str();
1404  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1405  }
1406  request_stream << "GET " << path << " HTTP/1.1\r\n";
1407  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1408  request_stream << "Accept: */*\r\n";
1409  request_stream << "Connection: keep-alive\r\n\r\n";
1410 
1411  boost::asio::write(*socket_, request, ec);
1412  if (ec) {
1413  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1414  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1415  //we got disconnected, try to reconnect to the server before writing the request
1417  if (ec) {
1418  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1419  serverError = true;
1420  break;
1421  }
1422  continue;
1423  }
1424  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1425  serverError = true;
1426  break;
1427  }
1428 
1429  boost::asio::streambuf response;
1430  boost::asio::read_until(*socket_, response, "\r\n", ec);
1431  if (ec) {
1432  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1433  serverError = true;
1434  break;
1435  }
1436 
1437  std::istream response_stream(&response);
1438 
1439  std::string http_version;
1440  response_stream >> http_version;
1441 
1442  response_stream >> serverHttpStatus;
1443 
1444  std::string status_message;
1445  std::getline(response_stream, status_message);
1446  if (!response_stream || http_version.substr(0, 5) != "HTTP/") {
1447  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1448  serverError = true;
1449  break;
1450  }
1451  if (serverHttpStatus != 200) {
1452  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1453  serverError = true;
1454  break;
1455  }
1456 
1457  // Process the response headers.
1459  while (std::getline(response_stream, header) && header != "\r") {
1460  }
1461 
1462  std::string fileInfo;
1463  std::map<std::string, std::string> serverMap;
1464  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1465  auto pos = fileInfo.find("=");
1466  if (pos == std::string::npos)
1467  continue;
1468  auto stitle = fileInfo.substr(0, pos);
1469  auto svalue = fileInfo.substr(pos + 1);
1470  serverMap[stitle] = svalue;
1471  }
1472 
1473  //check that response run number if correct
1474  auto server_version = serverMap.find("version");
1475  assert(server_version != serverMap.end());
1476 
1477  auto server_run = serverMap.find("runnumber");
1478  assert(server_run != serverMap.end());
1479  assert(run_nstring_ == server_run->second);
1480 
1481  auto server_state = serverMap.find("state");
1482  assert(server_state != serverMap.end());
1483 
1484  auto server_eols = serverMap.find("lasteols");
1485  assert(server_eols != serverMap.end());
1486 
1487  auto server_ls = serverMap.find("lumisection");
1488 
1489  int version_maj = 1;
1490  int version_min = 0;
1491  int version_rev = 0;
1492  {
1493  auto* s_ptr = server_version->second.c_str();
1494  if (!server_version->second.empty() && server_version->second[0] == '"')
1495  s_ptr++;
1496  auto res = sscanf(s_ptr, "%d.%d.%d", &version_maj, &version_min, &version_rev);
1497  if (res < 3) {
1498  res = sscanf(s_ptr, "%d.%d", &version_maj, &version_min);
1499  if (res < 2) {
1500  res = sscanf(s_ptr, "%d", &version_maj);
1501  if (res < 1) {
1502  //expecting at least 1 number (major version)
1503  edm::LogWarning("EvFDaqDirector") << "Can not parse server version " << server_version->second;
1504  }
1505  }
1506  }
1507  }
1508 
1509  closedServerLS = (uint64_t)std::max(0, atoi(server_eols->second.c_str()));
1510  if (server_ls != serverMap.end())
1511  serverLS = (uint64_t)std::max(1, atoi(server_ls->second.c_str()));
1512  else
1513  serverLS = closedServerLS + 1;
1514 
1515  std::string s_state = server_state->second;
1516  if (s_state == "STARTING") //initial, always empty starting with LS 1
1517  {
1518  auto server_file = serverMap.find("file");
1519  assert(server_file == serverMap.end()); //no file with starting state
1520  fileStatus = noFile;
1521  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1522  } else if (s_state == "READY") {
1523  auto server_file = serverMap.find("file");
1524  if (server_file == serverMap.end()) {
1525  //can be returned by server if files from new LS already appeared but LS is not yet closed
1526  if (serverLS <= closedServerLS)
1527  serverLS = closedServerLS + 1;
1528  fileStatus = noFile;
1529  edm::LogInfo("EvFDaqDirector")
1530  << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1531  } else {
1532  std::string filestem;
1533  std::string fileprefix;
1534  auto server_fileprefix = serverMap.find("fileprefix");
1535 
1536  if (server_fileprefix != serverMap.end()) {
1537  auto pssize = server_fileprefix->second.size();
1538  if (pssize > 1 && server_fileprefix->second[0] == '"' && server_fileprefix->second[pssize - 1] == '"')
1539  fileprefix = server_fileprefix->second.substr(1, pssize - 2);
1540  else
1541  fileprefix = server_fileprefix->second;
1542  }
1543 
1544  //remove string literals
1545  auto ssize = server_file->second.size();
1546  if (ssize > 1 && server_file->second[0] == '"' && server_file->second[ssize - 1] == '"')
1547  filestem = server_file->second.substr(1, ssize - 2);
1548  else
1549  filestem = server_file->second;
1550  assert(!filestem.empty());
1551  if (version_maj > 1) {
1552  nextFileRaw = bu_run_dir_ + "/" + fileprefix + filestem + ".raw"; //filestem should be raw
1553  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1554  nextFileJson = "";
1555  rawHeader = true;
1556  } else {
1557  nextFileRaw = bu_run_dir_ + "/" + filestem + ".raw"; //raw files are not moved
1558  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1559  nextFileJson = filestem + ".jsn";
1560  rawHeader = false;
1561  }
1562  fileStatus = newFile;
1563  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS "
1564  << serverLS << " file:" << filestem;
1565  }
1566  } else if (s_state == "EOLS") {
1567  serverLS = closedServerLS + 1;
1568  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1569  fileStatus = noFile;
1570  } else if (s_state == "EOR") {
1571  //server_eor = serverMap.find("iseor");
1572  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1573  fileStatus = runEnded;
1574  } else if (s_state == "NORUN") {
1575  auto err_msg = serverMap.find("errormessage");
1576  if (err_msg != serverMap.end())
1577  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1578  else
1579  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1580  edm::LogWarning("EvFDaqDirector") << "executing run end";
1581  fileStatus = runEnded;
1582  } else if (s_state == "ERROR") {
1583  auto err_msg = serverMap.find("errormessage");
1584  if (err_msg != serverMap.end())
1585  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1586  else
1587  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1588  fileStatus = noFile;
1589  serverError = true;
1590  } else {
1591  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1592  fileStatus = noFile;
1593  serverError = true;
1594  }
1595 
1596  // Read until EOF, writing data to output as we go.
1597  if (!fileBrokerKeepAlive_) {
1598  while (boost::asio::read(*socket_, response, boost::asio::transfer_at_least(1), ec)) {
1599  }
1600  if (ec != boost::asio::error::eof) {
1601  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1602  serverError = true;
1603  }
1604  }
1605  break;
1606  }
1607  } catch (std::exception const& e) {
1608  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1609  serverError = true;
1610  }
1611 
1612  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1613  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1614  if (ec) {
1615  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1616  }
1617  socket_->close(ec);
1618  if (ec) {
1619  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1620  }
1621  }
1622 
1623  if (serverError) {
1624  if (socket_->is_open())
1625  socket_->close(ec);
1626  if (ec) {
1627  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1628  }
1629  fileStatus = noFile;
1630  sleep(1); //back-off if error detected
1631  }
1632  return fileStatus;
1633  }
1634 
1636  unsigned int& ls,
1637  std::string& nextFileRaw,
1638  int& rawFd,
1639  uint16_t& rawHeaderSize,
1640  int32_t& serverEventsInNewFile,
1641  int64_t& fileSizeFromMetadata,
1642  uint64_t& thisLockWaitTimeUs) {
1643  EvFDaqDirector::FileStatus fileStatus = noFile;
1644 
1645  //int retval = -1;
1646  //int lock_attempts = 0;
1647  //long total_lock_attempts = 0;
1648 
1649  struct stat buf;
1650  int stopFileLS = -1;
1651  int stopFileCheck = stat(stopFilePath_.c_str(), &buf);
1652  int stopFilePidCheck = stat(stopFilePathPid_.c_str(), &buf);
1653  if (stopFileCheck == 0 || stopFilePidCheck == 0) {
1654  if (stopFileCheck == 0)
1655  stopFileLS = readLastLSEntry(stopFilePath_);
1656  else
1657  stopFileLS = 1; //stop without drain if only pid is stopped
1658  if (!stop_ls_override_) {
1659  //if lumisection is higher than in stop file, should quit at next from current
1660  if (stopFileLS >= 0 && (int)ls >= stopFileLS)
1661  stopFileLS = stop_ls_override_ = ls;
1662  } else
1663  stopFileLS = stop_ls_override_;
1664  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: "
1665  << stopFileLS;
1666  //return runEnded;
1667  } else //if file was removed before reaching stop condition, reset this
1668  stop_ls_override_ = 0;
1669 
1670  /* look for EoLS
1671  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1672  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1673  ls++;
1674  return noFile;
1675  }
1676  */
1677 
1678  timeval ts_lockbegin;
1679  gettimeofday(&ts_lockbegin, nullptr);
1680 
1681  std::string nextFileJson;
1682  uint32_t serverLS, closedServerLS;
1683  unsigned int serverHttpStatus;
1684  bool serverError;
1685 
1686  //local lock to force index json and EoLS files to appear in order
1688  lockFULocal2();
1689 
1690  int maxLS = stopFileLS < 0 ? -1 : std::max(stopFileLS, (int)currentLumiSection);
1691  bool rawHeader = false;
1692  fileStatus = contactFileBroker(
1693  serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, rawHeader, maxLS);
1694 
1695  if (serverError) {
1696  //do not update anything
1698  unlockFULocal2();
1699  return noFile;
1700  }
1701 
1702  //handle creation of EoLS and BoLS files if lumisection has changed
1703  if (currentLumiSection == 0) {
1704  if (fileStatus == runEnded) {
1705  createLumiSectionFiles(closedServerLS, 0);
1706  createLumiSectionFiles(serverLS, closedServerLS, false); // +1
1707  } else
1708  createLumiSectionFiles(serverLS, 0);
1709  } else {
1710  //loop over and create any EoLS files missing
1711  if (closedServerLS >= currentLumiSection) {
1712  for (uint32_t i = std::max(currentLumiSection, 1U); i <= closedServerLS; i++)
1713  createLumiSectionFiles(i + 1, i);
1714  }
1715  }
1716 
1717  bool fileFound = true;
1718 
1719  if (fileStatus == newFile) {
1720  if (rawHeader > 0)
1721  serverEventsInNewFile =
1722  grabNextJsonFromRaw(nextFileRaw, rawFd, rawHeaderSize, fileSizeFromMetadata, fileFound, serverLS);
1723  else
1724  serverEventsInNewFile = grabNextJsonFile(nextFileJson, nextFileRaw, fileSizeFromMetadata, fileFound);
1725  }
1726  //closing file in case of any error
1727  if (serverEventsInNewFile < 0 && rawFd != -1) {
1728  close(rawFd);
1729  rawFd = -1;
1730  }
1731  if (!fileFound) {
1732  //catch condition where directory got deleted
1733  fileStatus = noFile;
1734  struct stat buf;
1735  if (stat(bu_run_dir_.c_str(), &buf) != 0) {
1736  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1737  fileStatus = runEnded;
1738  }
1739  }
1740 
1741  //can unlock because all files have been created locally
1743  unlockFULocal2();
1744 
1745  if (fileStatus == runEnded)
1746  ls = std::max(currentLumiSection, serverLS);
1747  else if (fileStatus == newFile) {
1748  assert(serverLS >= ls);
1749  ls = serverLS;
1750  } else if (fileStatus == noFile) {
1751  if (serverLS >= ls)
1752  ls = serverLS;
1753  else {
1754  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS
1755  << " which is smaller than currently open LS " << ls << ". Ignoring response";
1756  sleep(1);
1757  }
1758  }
1759 
1760  return fileStatus;
1761  }
1762 
1764  // create open dir if not already there
1765 
1767  if (!boost::filesystem::is_directory(openPath)) {
1768  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1769  boost::filesystem::create_directories(openPath);
1770  }
1771  }
1772 
1774  boost::filesystem::ifstream ij(file);
1775  Json::Value deserializeRoot;
1777 
1778  if (!reader.parse(ij, deserializeRoot)) {
1779  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1780  return -1;
1781  }
1782 
1783  int ret = deserializeRoot.get("lastLS", "").asInt();
1784  return ret;
1785  }
1786 
1788  std::string fileprefix = run_dir_ + "/" + run_string_ + "_ls";
1790  struct stat buf;
1791  unsigned int lscount = startFromLS_;
1792  do {
1793  std::stringstream ss;
1794  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1795  fullpath = ss.str();
1796  lscount++;
1797  } while (stat(fullpath.c_str(), &buf) == 0);
1798  return lscount - 1;
1799  }
1800 
1801  //if transferSystem PSet is present in the menu, we require it to be complete and consistent for all specified streams
1803  if (transferSystemJson_)
1804  return;
1805 
1806  transferSystemJson_.reset(new Json::Value);
1807  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1808  if (topPset.existsAs<edm::ParameterSet>("transferSystem", true)) {
1809  const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
1810 
1811  Json::Value destinationsVal(Json::arrayValue);
1812  std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
1813  for (auto& dest : destinations)
1814  destinationsVal.append(dest);
1815  (*transferSystemJson_)["destinations"] = destinationsVal;
1816 
1817  Json::Value modesVal(Json::arrayValue);
1818  std::vector<std::string> modes = tsPset.getParameter<std::vector<std::string>>("transferModes");
1819  for (auto& mode : modes)
1820  modesVal.append(mode);
1821  (*transferSystemJson_)["transferModes"] = modesVal;
1822 
1823  for (auto psKeyItr = tsPset.psetTable().begin(); psKeyItr != tsPset.psetTable().end(); ++psKeyItr) {
1824  if (psKeyItr->first != "destinations" && psKeyItr->first != "transferModes") {
1825  const edm::ParameterSet& streamDef = tsPset.getParameterSet(psKeyItr->first);
1826  Json::Value streamVal;
1827  for (auto& mode : modes) {
1828  //validation
1829  if (!streamDef.existsAs<std::vector<std::string>>(mode, true))
1830  throw cms::Exception("EvFDaqDirector")
1831  << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode
1832  << ")";
1833  std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
1834 
1835  Json::Value sDestsValue(Json::arrayValue);
1836 
1837  if (streamDestinations.empty())
1838  throw cms::Exception("EvFDaqDirector")
1839  << " Missing transter system destination(s) for -: " << psKeyItr->first << ", mode:" << mode;
1840 
1841  for (auto& sdest : streamDestinations) {
1842  bool sDestValid = false;
1843  sDestsValue.append(sdest);
1844  for (auto& dest : destinations) {
1845  if (dest == sdest)
1846  sDestValid = true;
1847  }
1848  if (!sDestValid)
1849  throw cms::Exception("EvFDaqDirector")
1850  << " Invalid transter system destination specified for -: " << psKeyItr->first << ", mode:" << mode
1851  << ", dest:" << sdest;
1852  }
1853  streamVal[mode] = sDestsValue;
1854  }
1855  (*transferSystemJson_)[psKeyItr->first] = streamVal;
1856  }
1857  }
1858  } else {
1859  if (requireTSPSet_)
1860  throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
1861  }
1862  }
1863 
1865  std::string streamRequestName;
1866  if (transferSystemJson_->isMember(stream.c_str()))
1867  streamRequestName = stream;
1868  else {
1869  std::stringstream msg;
1870  msg << "Transfer system mode definitions missing for -: " << stream;
1871  if (requireTSPSet_)
1872  throw cms::Exception("EvFDaqDirector") << msg.str();
1873  else {
1874  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1875  return std::string("Failsafe");
1876  }
1877  }
1878  //return empty if strict check parameter is not on
1879  if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1880  edm::LogWarning("EvFDaqDirector")
1881  << "Selected mode string is not provided as DaqDirector parameter."
1882  << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1883  return std::string("Failsafe");
1884  }
1885  if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_ == "null")) {
1886  throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
1887  }
1888  //check if stream has properly listed transfer stream
1889  if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str())) {
1890  std::stringstream msg;
1891  msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
1892  if (requireTSPSet_)
1893  throw cms::Exception("EvFDaqDirector") << msg.str();
1894  else
1895  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1896  return std::string("Failsafe");
1897  }
1898  Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_, "");
1899 
1900  //flatten string json::Array into CSV std::string
1901  std::string ret;
1902  for (Json::Value::iterator it = destsVec.begin(); it != destsVec.end(); it++) {
1903  if (!ret.empty())
1904  ret += ",";
1905  ret += (*it).asString();
1906  }
1907  return ret;
1908  }
1909 
1911  if (mergeTypePset_.empty())
1912  return;
1913  if (!mergeTypeMap_.empty())
1914  return;
1915  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1916  if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_, true)) {
1917  const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
1918  for (std::string pname : tsPset.getParameterNames()) {
1919  std::string streamType = tsPset.getParameter<std::string>(pname);
1920  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
1921  mergeTypeMap_.insert(ac, pname);
1922  ac->second = streamType;
1923  ac.release();
1924  }
1925  }
1926  }
1927 
1929  tbb::concurrent_hash_map<std::string, std::string>::const_accessor search_ac;
1930  if (mergeTypeMap_.find(search_ac, stream))
1931  return search_ac->second;
1932 
1933  edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
1934  std::string defaultName = MergeTypeNames_[defaultType];
1935  tbb::concurrent_hash_map<std::string, std::string>::accessor ac;
1936  mergeTypeMap_.insert(ac, stream);
1937  ac->second = defaultName;
1938  ac.release();
1939  return defaultName;
1940  }
1941 
1943  std::string proc_flag = run_dir_ + "/processing";
1944  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
1945  close(proc_flag_fd);
1946  }
1947 
1948  struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid) {
1949 #ifdef __APPLE__
1950  return {start, len, pid, type, whence};
1951 #else
1952  return {type, whence, start, len, pid};
1953 #endif
1954  }
1955 
1956 } // namespace evf
#define LogDebug(id)
uint16_t getFRDFileHeaderVersion(const std::array< uint8_t, 4 > &id, const std::array< uint8_t, 4 > &version)
Definition: FRDFileHeader.h:41
unsigned int nThreads_
type
Definition: HCALResponse.h:21
unsigned int maxNumberOfThreads() const
Definition: SystemBounds.h:38
T getParameter(std::string const &) const
Definition: fillJson.h:27
std::string getStreamDestinations(std::string const &stream) const
Value get(UInt index, const Value &defaultValue) const
std::vector< std::string > & getData()
Definition: DataPoint.h:56
struct flock fu_rw_flk
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
std::string run_string_
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::string bolsFileName(const unsigned int run, const unsigned int ls)
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
void watchPreallocate(Preallocate::slot_type const &iSlot)
const_iterator begin() const
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:160
boost::asio::io_service io_service_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
std::string fulockfile_
def create(alignables, pedeDump, additionalData, outputFile, config)
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
std::shared_ptr< Json::Value > transferSystemJson_
jsoncollector::DataPointDefinition * dpd_
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
uint16_t eventCount_
Definition: FRDFileHeader.h:36
ret
prodAgent to be discontinued
std::string getInitFilePath(std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
ParameterSet const & getParameterSet(ParameterSetID const &id)
pthread_mutex_t init_lock_
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:55
#define nullptr
uint16_t headerSize_
Definition: FRDFileHeader.h:35
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::array< uint8_t, 4 > id_
Definition: FRDFileHeader.h:33
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
std::string getOpenInitFilePath(std::string const &stream) const
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
Value & append(const Value &value)
Append value to array at the end.
void createProcessingNotificationMaybe() const
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
reader
Definition: DQM.py:105
int grabNextJsonFileAndUnlock(boost::filesystem::path const &jsonSourcePath)
Represents a JSON value.
Definition: value.h:99
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::string getEoRFilePath() const
Definition: Electron.h:6
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
ParameterSetID const & parameterSetID() const
struct flock fu_rw_fulk
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:35
std::string hltSourceDirectory_
unsigned int startFromLS_
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
void setComment(std::string const &value)
std::mutex * fileDeleteLockPtr_
std::string mergeTypePset_
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def chmod(path, mode)
Definition: eostools.py:294
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
std::string stopFilePath_
std::string bu_base_dir_
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::array< uint8_t, 4 > version_
Definition: FRDFileHeader.h:34
void removeFile(unsigned int ls, unsigned int index)
unsigned int stop_ls_override_
std::string selectedTransferMode_
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS=true)
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
std::string eorFileName(const unsigned int run)
int readLastLSEntry(std::string const &file)
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string stopFilePathPid_
unsigned int eolsNFilesIndex_
std::string & baseRunDir()
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
std::string run_nstring_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
void openFULockfileStream(bool create)
int getNFilesFromEoLS(std::string BUEoLSFile)
def ls(path, rec=False)
Definition: eostools.py:349
void preBeginRun(edm::GlobalContext const &globalContext)
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Int asInt() const
unsigned int getLumisectionToStart() const
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned long long uint64_t
Definition: Time.h:13
void checkMergeTypePSet(edm::ProcessContext const &pc)
ParameterSet const & getParameterSet(std::string const &) const
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
void deserialize(Json::Value &root) override
Definition: DataPoint.cc:52
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
LuminosityBlockNumber_t luminosityBlock() const
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::string & getDefinition()
Definition: DataPoint.h:57
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:212
tuple msg
Definition: mps_check.py:285
evf::FastMonitoringService * fms_
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::string bu_run_dir_
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::list< std::pair< int, std::unique_ptr< InputFile > > > * filesToDeletePtr_
def mkdir(path)
Definition: eostools.py:251
std::string findHighestRunDir()
Definition: DirManager.cc:22
std::string getBoLSFilePathOnFU(const unsigned int ls) const
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
void postEndRun(edm::GlobalContext const &globalContext)
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS)
Unserialize a JSON document into a Value.
Definition: reader.h:16
static const std::vector< std::string > MergeTypeNames_
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
const_iterator end() const
void checkTransferSystemPSet(edm::ProcessContext const &pc)
std::string fileBrokerPort_
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
uint64_t fileSize_
Definition: FRDFileHeader.h:38
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, bool &rawHeader, int maxLS)
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, int maxLS)
void preallocate(edm::service::SystemBounds const &bounds)
uint32_t lumiSection_
Definition: FRDFileHeader.h:37
Iterator for object and array value.
Definition: value.h:908
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
std::string getRunOpenDirPath() const
JetCorrectorParameters::Definitions def
Definition: classes.h:6
std::string fileBrokerHost_
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned int fuLockPollInterval_
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
std::string getFFFParamsFilePathOnBU() const
std::string eolsFileName(const unsigned int run, const unsigned int ls)
#define constexpr
unsigned int nStreams_
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string getEoRFilePathOnFU() const
std::string bu_run_open_dir_
array value (ordered list)
Definition: value.h:30