CMS 3D CMS Logo

EvFDaqDirector.cc
Go to the documentation of this file.
10 
16 
17 #include <iostream>
18 //#include <istream>
19 #include <sstream>
20 #include <sys/time.h>
21 #include <unistd.h>
22 #include <cstdio>
23 #include <boost/lexical_cast.hpp>
24 #include <boost/filesystem/fstream.hpp>
25 #include <boost/algorithm/string.hpp>
26 
27 //using boost::asio::ip::tcp;
28 
29 //#define DEBUG
30 
31 using namespace jsoncollector;
32 
33 
34 namespace evf {
35 
36  //for enum MergeType
37  const std::vector<std::string> EvFDaqDirector::MergeTypeNames_ = {"","DAT","PB","JSNDATA"};
38 
39  EvFDaqDirector::EvFDaqDirector(const edm::ParameterSet &pset,
40  edm::ActivityRegistry& reg) :
41  base_dir_(pset.getUntrackedParameter<std::string> ("baseDir", ".")),
42  bu_base_dir_(pset.getUntrackedParameter<std::string> ("buBaseDir", ".")),
43  directorBu_(pset.getUntrackedParameter<bool> ("directorIsBu", false)),
44  run_(pset.getUntrackedParameter<unsigned int> ("runNumber",0)),
45  useFileBroker_(pset.getUntrackedParameter<bool> ("useFileBroker", false)),
46  fileBrokerHost_(pset.getUntrackedParameter<std::string>("fileBrokerHost","")),
47  fileBrokerPort_(pset.getUntrackedParameter<std::string>("fileBrokerPort","8080")),
48  fileBrokerKeepAlive_(pset.getUntrackedParameter<bool>("fileBrokerKeepAlive", true)),
49  fileBrokerUseLocalLock_(pset.getUntrackedParameter<bool>("fileBrokerUseLocalLock",true)),
50  outputAdler32Recheck_(pset.getUntrackedParameter<bool>("outputAdler32Recheck",false)),
51  requireTSPSet_(pset.getUntrackedParameter<bool>("requireTransfersPSet",false)),
52  selectedTransferMode_(pset.getUntrackedParameter<std::string>("selectedTransferMode","")),
53  hltSourceDirectory_(pset.getUntrackedParameter<std::string>("hltSourceDirectory","")),
54  fuLockPollInterval_(pset.getUntrackedParameter<unsigned int>("fuLockPollInterval",2000)),
55  mergeTypePset_(pset.getUntrackedParameter<std::string>("mergeTypePset","")),
56  hostname_(""),
57  bu_readlock_fd_(-1),
58  bu_writelock_fd_(-1),
59  fu_readwritelock_fd_(-1),
60  fulocal_rwlock_fd_(-1),
61  fulocal_rwlock_fd2_(-1),
62  bu_w_lock_stream(nullptr),
63  bu_r_lock_stream(nullptr),
64  fu_rw_lock_stream(nullptr),
65  dirManager_(base_dir_),
66  previousFileSize_(0),
67  bu_w_flk( make_flock( F_WRLCK, SEEK_SET, 0, 0, 0 )),
68  bu_r_flk( make_flock( F_RDLCK, SEEK_SET, 0, 0, 0 )),
69  bu_w_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, 0 )),
70  bu_r_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, 0 )),
71  fu_rw_flk( make_flock ( F_WRLCK, SEEK_SET, 0, 0, getpid() )),
72  fu_rw_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, getpid() ))
73  {
79 
80  //save hostname for later
81  char hostname[33];
82  gethostname(hostname,32);
83  hostname_ = hostname;
84 
85  char * fuLockPollIntervalPtr = getenv("FFF_LOCKPOLLINTERVAL");
86  if (fuLockPollIntervalPtr) {
87  try {
88  fuLockPollInterval_=boost::lexical_cast<unsigned int>(std::string(fuLockPollIntervalPtr));
89  edm::LogInfo("EvFDaqDirector") << "Setting fu lock poll interval by environment string: " << fuLockPollInterval_ << " us";
90  }
91  catch( boost::bad_lexical_cast const& ) {
92  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fuLockPollIntervalPtr);
93  }
94  }
95 
96  //override file service parameter if specified by environment
97  char * fileBrokerParamPtr = getenv("FFF_USEFILEBROKER");
98  if (fileBrokerParamPtr) {
99  try {
100  useFileBroker_=(boost::lexical_cast<unsigned int>(std::string(fileBrokerParamPtr)))>0;
101  edm::LogInfo("EvFDaqDirector") << "Setting useFileBroker parameter by environment string: " << useFileBroker_;
102  }
103  catch( boost::bad_lexical_cast const& ) {
104  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerParamPtr);
105  }
106  }
107  if (useFileBroker_) {
108  if (fileBrokerHost_.empty()) {
109  //find BU data address from hltd configuration
110  struct stat buf;
111  if (stat("/etc/appliance/bus.config",&buf)==0) {
112  std::ifstream busconfig("/etc/appliance/bus.config",std::ifstream::in);
113  std::getline(busconfig,fileBrokerHost_);
114  }
115  }
116  if (!fileBrokerHost_.empty()) {
117  resolver_ = std::make_unique<boost::asio::ip::tcp::resolver>(io_service_);
118  query_ = std::make_unique<boost::asio::ip::tcp::resolver::query>(fileBrokerHost_, fileBrokerPort_); //default port
119  endpoint_iterator_ = std::make_unique<boost::asio::ip::tcp::resolver::iterator>(resolver_->resolve(*query_));
120  socket_=std::make_unique<boost::asio::ip::tcp::socket>(io_service_);
121  }
122  else {
123  throw cms::Exception("EvFDaqDirector") << "No file service or BU data address information";
124  }
125  }
126 
127  char * startFromLSPtr = getenv("FFF_STARTFROMLS");
128  if (startFromLSPtr) {
129  try {
130  startFromLS_=boost::lexical_cast<unsigned int>(std::string(startFromLSPtr));
131  edm::LogInfo("EvFDaqDirector") << "Setting start from LS by environment string: " << startFromLS_;
132  }
133  catch( boost::bad_lexical_cast const& ) {
134  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(startFromLSPtr);
135  }
136  }
137 
138  //override file service parameter if specified by environment
139  char * fileBrokerUseLockParamPtr = getenv("FFF_FILEBROKERUSELOCALLOCK");
140  if (fileBrokerUseLockParamPtr) {
141  try {
142  fileBrokerUseLocalLock_=(boost::lexical_cast<unsigned int>(std::string(fileBrokerUseLockParamPtr)))>0;
143  edm::LogInfo("EvFDaqDirector") << "Setting fileBrokerUseLocalLock parameter by environment string: " << fileBrokerUseLocalLock_;
144  }
145  catch( boost::bad_lexical_cast const& ) {
146  edm::LogWarning("EvFDaqDirector") << "Bad lexical cast in parsing: " << std::string(fileBrokerUseLockParamPtr);
147  }
148  }
149 
150  }
151 
153  {
154  std::stringstream ss;
155  ss << "run" << std::setfill('0') << std::setw(6) << run_;
156  run_string_ = ss.str();
157  ss = std::stringstream();
158  ss << run_;
159  run_nstring_ = ss.str();
161  ss = std::stringstream();
162  ss << getpid();
163  pid_ = ss.str();
164 
165  // check if base dir exists or create it accordingly
166  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
167  if (retval != 0 && errno != EEXIST) {
168  throw cms::Exception("DaqDirector") << " Error checking for base dir -: "
169  << base_dir_ << " mkdir error:" << strerror(errno);
170  }
171 
172  //create run dir in base dir
173  umask(0);
174  retval = mkdir(run_dir_.c_str(),
175  S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
176  if (retval != 0 && errno != EEXIST) {
177  throw cms::Exception("DaqDirector") << " Error creating run dir -: "
178  << run_dir_ << " mkdir error:" << strerror(errno);
179  }
180 
181  //create fu-local.lock in run open dir
182  if (!directorBu_) {
183 
185  std::string fulocal_lock_ = getRunOpenDirPath() +"/fu-local.lock";
186  fulocal_rwlock_fd_ = open(fulocal_lock_.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);//O_RDWR?
187  if (fulocal_rwlock_fd_==-1)
188  throw cms::Exception("DaqDirector") << " Error creating/opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
189  chmod(fulocal_lock_.c_str(),0777);
190  fsync(fulocal_rwlock_fd_);
191  //open second fd for another input source thread
192  fulocal_rwlock_fd2_ = open(fulocal_lock_.c_str(), O_RDWR, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);//O_RDWR?
193  if (fulocal_rwlock_fd2_==-1)
194  throw cms::Exception("DaqDirector") << " Error opening a local lock file -: " << fulocal_lock_.c_str() << " : " << strerror(errno);
195  }
196 
197  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
198  //for BU, it is created at this point
199  if (directorBu_)
200  {
202  std::string bulockfile = bu_run_dir_ + "/bu.lock";
203  fulockfile_ = bu_run_dir_ + "/fu.lock";
204 
205  //make or find bu run dir
206  retval = mkdir(bu_run_dir_.c_str(),
207  S_IRWXU | S_IRWXG | S_IRWXO);
208  if (retval != 0 && errno != EEXIST) {
209  throw cms::Exception("DaqDirector")
210  << " Error creating bu run dir -: " << bu_run_dir_
211  << " mkdir error:" << strerror(errno) << "\n";
212  }
213  bu_run_open_dir_ = bu_run_dir_ + "/open";
214  retval = mkdir(bu_run_open_dir_.c_str(),
215  S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
216  if (retval != 0 && errno != EEXIST) {
217  throw cms::Exception("DaqDirector") << " Error creating bu run open dir -: "
218  << bu_run_open_dir_ << " mkdir error:" << strerror(errno)
219  << "\n";
220  }
221 
222  // the BU director does not need to know about the fu lock
223  bu_writelock_fd_ = open(bulockfile.c_str(),
224  O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
225  if (bu_writelock_fd_ == -1)
226  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock -: "
227  << strerror(errno);
228  else
229  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock -: "
230  << bu_writelock_fd_;
231  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
232  if (bu_w_lock_stream == nullptr)
233  edm::LogWarning("EvFDaqDirector")<< "Error creating write lock stream -: " << strerror(errno);
234 
235  // BU INITIALIZES LOCK FILE
236  // FU LOCK FILE OPEN
237  openFULockfileStream(true);
239  fflush(fu_rw_lock_stream);
240  close(fu_readwritelock_fd_);
241 
242  if (!hltSourceDirectory_.empty())
243  {
244  struct stat buf;
245  if (stat(hltSourceDirectory_.c_str(),&buf)==0) {
246  std::string hltdir=bu_run_dir_+"/hlt";
247  std::string tmphltdir=bu_run_open_dir_+"/hlt";
248  retval = mkdir(tmphltdir.c_str(),S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
249  if (retval != 0 && errno != EEXIST)
250  throw cms::Exception("DaqDirector")
251  << " Error creating bu run dir -: " << hltdir
252  << " mkdir error:" << strerror(errno) << "\n";
253 
254  boost::filesystem::copy_file(hltSourceDirectory_+"/HltConfig.py",tmphltdir+"/HltConfig.py");
255 
256  boost::filesystem::copy_file(hltSourceDirectory_+"/fffParameters.jsn",tmphltdir+"/fffParameters.jsn");
257 
258  boost::filesystem::rename(tmphltdir,hltdir);
259  }
260  else
261  throw cms::Exception("DaqDirector") << " Error looking for HLT configuration -: " << hltSourceDirectory_;
262  }
263  //else{}//no configuration specified
264  }
265  else
266  {
267  // for FU, check if bu base dir exists
268 
269  retval = mkdir(bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
270  if (retval != 0 && errno != EEXIST) {
271  throw cms::Exception("DaqDirector") << " Error checking for bu base dir -: "
272  << bu_base_dir_ << " mkdir error:" << strerror(errno) << "\n";
273  }
274 
276  fulockfile_ = bu_run_dir_ + "/fu.lock";
277  openFULockfileStream(false);
278  }
279 
280  pthread_mutex_init(&init_lock_,nullptr);
281 
282  stopFilePath_ = run_dir_+"/CMSSW_STOP";
283  std::stringstream sstp;
284  sstp << stopFilePath_ << "_pid" << pid_;
285  stopFilePathPid_ = sstp.str();
286 
287 
288  if (!directorBu_) {
289  std::string defPath = bu_run_dir_ + "/jsd/rawData.jsd";
290  struct stat statbuf;
291  if (!stat(defPath.c_str(), &statbuf))
292  edm::LogInfo("EvFDaqDirector") << "found JSD file in ramdisk -: " << defPath;
293  else {
294  //look in source directory if not present in ramdisk
295  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
296  defPath = std::string(getenv("CMSSW_BASE")) + "/" + defPathSuffix;
297  if (stat(defPath.c_str(), &statbuf)) {
298  defPath = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
299  if (stat(defPath.c_str(), &statbuf)) {
300  defPath = defPathSuffix;
301  }
302  }
303  }
304  dpd_ = new DataPointDefinition();
305  std::string defLabel = "data";
306  DataPointDefinition::getDataPointDefinitionFor(defPath, dpd_,&defLabel);
307  }
308 
309  }
310 
312  {
313 
314  //close server connection
315  if (socket_.get() && socket_->is_open()) {
316  boost::system::error_code ec;
317  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
318  socket_->close(ec);
319  }
320 
321  if (fulocal_rwlock_fd_!=-1) {
322  unlockFULocal();
323  close(fulocal_rwlock_fd_);
324  }
325 
326  if (fulocal_rwlock_fd2_!=-1) {
327  unlockFULocal2();
328  close(fulocal_rwlock_fd2_);
329  }
330 
331  }
332 
333 
335 
336  initRun();
337 
338  nThreads_=bounds.maxNumberOfStreams();
339  nStreams_=bounds.maxNumberOfThreads();
340  }
341 
343  {
345  desc.setComment("Service used for file locking arbitration and for propagating information between other EvF components");
346  desc.addUntracked<std::string> ("baseDir", ".")->setComment("Local base directory for run output");
347  desc.addUntracked<std::string> ("buBaseDir", ".")->setComment("BU base ramdisk directory ");
348  desc.addUntracked<unsigned int> ("runNumber",0)->setComment("Run Number in ramdisk to open");
349  desc.addUntracked<bool> ("useFileBroker", false)->setComment("Use BU file service to grab input data instead of NFS file locking");
350  desc.addUntracked<std::string> ("fileBrokerHost", "")->setComment("BU file service host");
351  desc.addUntracked<std::string> ("fileBrokerPort", "8080")->setComment("BU file service port");
352  desc.addUntracked<bool> ("fileBrokerKeepAlive", true)->setComment("Use keep alive to avoid using large number of sockets");
353  desc.addUntracked<bool> ("fileBrokerUseLocalLock", true)->setComment("Use local lock file to synchronize appearance of index and EoLS file markers for hltd");
354  desc.addUntracked<bool>("outputAdler32Recheck",false)->setComment("Check Adler32 of per-process output files while micro-merging");
355  desc.addUntracked<bool>("requireTransfersPSet",false)->setComment("Require complete transferSystem PSet in the process configuration");
356  desc.addUntracked<std::string>("selectedTransferMode","")->setComment("Selected transfer mode (choice in Lvl0 propagated as Python parameter");
357  desc.addUntracked<unsigned int>("fuLockPollInterval",2000)->setComment("Lock polling interval in microseconds for the input directory file lock");
358  desc.addUntracked<std::string>("mergingPset","")->setComment("Name of merging PSet to look for merging type definitions for streams");
359  desc.setAllowAnything();
360  descriptions.add("EvFDaqDirector", desc);
361  }
362 
364  edm::ProcessContext const& pc) {
366  checkMergeTypePSet(pc);
367  }
368 
369  void EvFDaqDirector::preBeginRun(edm::GlobalContext const& globalContext) {
370 
371  //assert(run_ == id.run());
372 
373  // check if the requested run is the latest one - issue a warning if it isn't
375  edm::LogWarning("EvFDaqDirector") << "WARNING - checking run dir -: "
376  << run_dir_ << ". This is not the highest run "
378  }
379  }
380 
381  void EvFDaqDirector::postEndRun(edm::GlobalContext const& globalContext) {
382  close(bu_readlock_fd_);
383  close(bu_writelock_fd_);
384  if (directorBu_) {
385  std::string filename = bu_run_dir_ + "/bu.lock";
386  removeFile(filename);
387  }
388  }
389 
391  {
392  //delete all files belonging to just closed lumi
393  unsigned int ls = globalContext.luminosityBlockID().luminosityBlock();
395  edm::LogWarning("EvFDaqDirector") << " Handles to check for files to delete were not set by the input source...";
396  return;
397  }
398 
399  std::unique_lock<std::mutex> lkw(*fileDeleteLockPtr_);
400  auto it = filesToDeletePtr_->begin();
401  while (it!=filesToDeletePtr_->end()) {
402  if (it->second->lumi_ == ls) {
403  const boost::filesystem::path filePath(it->second->fileName_);
404  LogDebug("EvFDaqDirector") << "Deleting input file -:" << it->second->fileName_;
405  try {
406  //rarely this fails but file gets deleted
407  boost::filesystem::remove(filePath);
408  }
409  catch (boost::filesystem::filesystem_error const& ex)
410  {
411  edm::LogError("EvFDaqDirector") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() << ". Trying again.";
412  usleep(10000);
413  try {
414  boost::filesystem::remove(filePath);
415  }
416  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
417  }
418  catch (std::exception const& ex)
419  {
420  edm::LogError("EvFDaqDirector") << " - deleteFile std::exception CAUGHT -: " << ex.what() << ". Trying again.";
421  usleep(10000);
422  try {
423  boost::filesystem::remove(filePath);
424  } catch (std::exception const&) {/*file gets deleted first time but exception is still thrown*/}
425  }
426 
427  delete it->second;
428  it = filesToDeletePtr_->erase(it);
429  }
430  else it++;
431  }
432  }
433 
434  std::string EvFDaqDirector::getInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
435  return bu_run_dir_ + "/" + fffnaming::inputJsonFileName(run_,ls,index);
436  }
437 
438  std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
439  return bu_run_dir_ + "/" + fffnaming::inputRawFileName(run_,ls,index);
440  }
441 
442  std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
443  return bu_run_dir_ + "/open/" + fffnaming::inputRawFileName(run_,ls,index);
444  }
445 
446  std::string EvFDaqDirector::getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const {
447  return bu_run_dir_ + "/open/" + fffnaming::inputJsonFileName(run_,ls,index);
448  }
449 
450  std::string EvFDaqDirector::getDatFilePath(const unsigned int ls, std::string const& stream) const {
451  return run_dir_ + "/" + fffnaming::streamerDataFileNameWithPid(run_,ls,stream);
452  }
453 
454  std::string EvFDaqDirector::getOpenDatFilePath(const unsigned int ls, std::string const& stream) const {
455  return run_dir_ + "/open/" + fffnaming::streamerDataFileNameWithPid(run_,ls,stream);
456  }
457 
458  std::string EvFDaqDirector::getOpenOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
459  return run_dir_ + "/open/" + fffnaming::streamerJsonFileNameWithPid(run_,ls,stream);
460  }
461 
462  std::string EvFDaqDirector::getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
463  return run_dir_ + "/" + fffnaming::streamerJsonFileNameWithPid(run_,ls,stream);
464  }
465 
466  std::string EvFDaqDirector::getMergedDatFilePath(const unsigned int ls, std::string const& stream) const {
468  }
469 
470  std::string EvFDaqDirector::getMergedDatChecksumFilePath(const unsigned int ls, std::string const& stream) const {
472  }
473 
475  return run_dir_ + "/open/" + fffnaming::initFileNameWithPid(run_,0,stream);
476  }
477 
479  return run_dir_ + "/" + fffnaming::initFileNameWithPid(run_,0,stream);
480  }
481 
483  return run_dir_ + "/open/" + fffnaming::protocolBufferHistogramFileNameWithPid(run_,ls,stream);
484  }
485 
488  }
489 
492  }
493 
494  std::string EvFDaqDirector::getOpenRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
495  return run_dir_ + "/open/" + fffnaming::rootHistogramFileNameWithPid(run_,ls,stream);
496  }
497 
498  std::string EvFDaqDirector::getRootHistogramFilePath(const unsigned int ls, std::string const& stream) const {
499  return run_dir_ + "/" + fffnaming::rootHistogramFileNameWithPid(run_,ls,stream);
500  }
501 
504  }
505 
507  return bu_run_dir_ + "/" + fffnaming::eolsFileName(run_,ls);
508  }
509 
511  return run_dir_ + "/" + fffnaming::eolsFileName(run_,ls);
512  }
513 
515  return run_dir_ + "/" + fffnaming::bolsFileName(run_,ls);
516  }
517 
519  return bu_run_dir_ + "/" + fffnaming::eorFileName(run_);
520  }
521 
523  return run_dir_ + "/" + fffnaming::eorFileName(run_);
524  }
525 
527  return bu_run_dir_ + "/hlt/fffParameters.jsn";
528  }
529 
531  int retval = remove(filename.c_str());
532  if (retval != 0)
533  edm::LogError("EvFDaqDirector") << "Could not remove used file -: " << filename << ". error = "
534  << strerror(errno);
535  }
536 
537  void EvFDaqDirector::removeFile(unsigned int ls, unsigned int index) {
538  removeFile(getRawFilePath(ls,index));
539  }
540 
541  EvFDaqDirector::FileStatus EvFDaqDirector::updateFuLock(unsigned int& ls, std::string& nextFile, uint32_t& fsize, uint64_t& lockWaitTime) {
542  EvFDaqDirector::FileStatus fileStatus = noFile;
543 
544  int retval = -1;
545  int lock_attempts = 0;
546  long total_lock_attempts = 0;
547 
548  struct stat buf;
549  int stopFileLS = -1;
550  int stopFileCheck = stat(stopFilePath_.c_str(),&buf);
551  int stopFilePidCheck = stat(stopFilePathPid_.c_str(),&buf);
552  if (stopFileCheck==0 || stopFilePidCheck==0) {
553  if (stopFileCheck==0)
554  stopFileLS = readLastLSEntry(stopFilePath_);
555  else
556  stopFileLS = 1;//stop without drain if only pid is stopped
557  if (!stop_ls_override_) {
558  //if lumisection is higher than in stop file, should quit at next from current
559  if (stopFileLS>=0 && (int)ls>=stopFileLS) stopFileLS = stop_ls_override_ = ls;
560  }
561  else stopFileLS = stop_ls_override_;
562  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: " << stopFileLS;
563  //return runEnded;
564  }
565  else //if file was removed before reaching stop condition, reset this
566  stop_ls_override_ = 0;
567 
568  timeval ts_lockbegin;
569  gettimeofday(&ts_lockbegin,nullptr);
570 
571  while (retval==-1) {
572  retval = fcntl(fu_readwritelock_fd_, F_SETLK, &fu_rw_flk);
573  if (retval==-1) usleep(fuLockPollInterval_);
574  else continue;
575 
576  lock_attempts+=fuLockPollInterval_;
577  total_lock_attempts+=fuLockPollInterval_;
578  if (lock_attempts>5000000 || errno==116) {
579  if (errno==116)
580  edm::LogWarning("EvFDaqDirector") << "Stale lock file handle. Checking if run directory and fu.lock file are present" << std::endl;
581  else
582  edm::LogWarning("EvFDaqDirector") << "Unable to obtain a lock for 5 seconds. Checking if run directory and fu.lock file are present -: errno "
583  << errno <<":"<< strerror(errno) << std::endl;
584 
585 
586  if (stat(getEoLSFilePathOnFU(ls).c_str(),&buf)==0) {
587  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< ls ;
588  ls++;
589  return noFile;
590  }
591 
592  if (stat(bu_run_dir_.c_str(), &buf)!=0) return runEnded;
593  if (stat(fulockfile_.c_str(), &buf)!=0) return runEnded;
594 
595  lock_attempts=0;
596  }
597  if (total_lock_attempts>5*60000000) {
598  edm::LogError("EvFDaqDirector") << "Unable to obtain a lock for 5 minutes. Stopping polling activity.";
599  return runAbort;
600  }
601  }
602 
603  timeval ts_lockend;
604  gettimeofday(&ts_lockend,nullptr);
605  long deltat = (ts_lockend.tv_usec-ts_lockbegin.tv_usec) + (ts_lockend.tv_sec-ts_lockbegin.tv_sec)*1000000;
606  if (deltat>0.) lockWaitTime=deltat;
607 
608  if(retval!=0) return fileStatus;
609 
610 #ifdef DEBUG
611  timeval ts_lockend;
612  gettimeofday(&ts_lockend,0);
613 #endif
614 
615  //open another lock file FD after the lock using main fd has been acquired
616  int fu_readwritelock_fd2 = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
617  if (fu_readwritelock_fd2 == -1)
618  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
619  << " create. error:" << strerror(errno);
620 
621  FILE * fu_rw_lock_stream2 = fdopen(fu_readwritelock_fd2, "r+");
622 
623  // if the stream is readable
624  if (fu_rw_lock_stream2 != nullptr) {
625  unsigned int readLs, readIndex;
626  int check = 0;
627  // rewind the stream
628  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
629  // if rewinded ok
630  if (check == 0) {
631  // read its' values
632  fscanf(fu_rw_lock_stream2, "%u %u", &readLs, &readIndex);
633  edm::LogInfo("EvFDaqDirector") << "Read fu.lock file file -: " << readLs << ":" << readIndex;
634 
635  unsigned int currentLs = readLs;
636  bool bumpedOk = false;
637  //if next lumisection in a lock file is not +1 wrt. source, cycle through the next empty one, unless initial lumi not yet set
638  //no lock file write in this case
639  if (ls && ls+1 < currentLs) ls++;
640  else {
641  // try to bump (look for new index or EoLS file)
642  bumpedOk = bumpFile(readLs, readIndex, nextFile, fsize, stopFileLS);
643  //avoid 2 lumisections jump
644  if (ls && readLs>currentLs && currentLs > ls) {
645  ls++;
646  readLs=currentLs=ls;
647  readIndex=0;
648  bumpedOk=false;
649  //no write to lock file
650  }
651  else {
652  if (ls==0 && readLs>currentLs) {
653  //make sure to intialize always with LS found in the lock file, with possibility of grabbing index file immediately
654  //in this case there is no new file in the same LS
655  //this covers case where run has empty first lumisections and CMSSW are late to the lock file. always one process will start with LS 1,... and create empty files for them
656  readLs=currentLs;
657  readIndex=0;
658  bumpedOk=false;
659  //no write to lock file
660  }
661  //update return LS value
662  ls = readLs;
663  }
664  }
665  if (bumpedOk) {
666  // there is a new index file to grab, lock file needs to be updated
667  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
668  if (check == 0) {
669  ftruncate(fu_readwritelock_fd2, 0);
670  // write next index in the file, which is the file the next process should take
671  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex + 1);
672  fflush(fu_rw_lock_stream2);
673  fsync(fu_readwritelock_fd2);
674  fileStatus = newFile;
675  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex + 1;
676  }
677  else {
678  throw cms::Exception("EvFDaqDirector") << "seek on fu read/write lock for updating failed with error " << strerror(errno);
679  }
680  }
681  else if (currentLs < readLs) {
682  //there is no new file in next LS (yet), but lock file can be updated to the next LS
683  check = fseek(fu_rw_lock_stream2, 0, SEEK_SET);
684  if (check == 0) {
685  ftruncate(fu_readwritelock_fd2, 0);
686  // in this case LS was bumped, but no new file. Thus readIndex is 0 (set by bumpFile)
687  fprintf(fu_rw_lock_stream2, "%u %u", readLs, readIndex);
688  fflush(fu_rw_lock_stream2);
689  fsync(fu_readwritelock_fd2);
690  LogDebug("EvFDaqDirector") << "Written to file -: " << readLs << ":" << readIndex;
691  }
692  else {
693  throw cms::Exception("EvFDaqDirector") << "seek on fu read/write lock for updating failed with error " << strerror(errno);
694  }
695  }
696  } else {
697  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error " << strerror(errno);
698  }
699  } else {
700  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
701  }
702  fclose(fu_rw_lock_stream2);// = fdopen(fu_readwritelock_fd2, "r+");
703 
704 #ifdef DEBUG
705  timeval ts_preunlock;
706  gettimeofday(&ts_preunlock,0);
707  int locked_period_int = ts_preunlock.tv_sec - ts_lockend.tv_sec;
708  double locked_period=locked_period_int+double(ts_preunlock.tv_usec - ts_lockend.tv_usec)/1000000;
709 #endif
710 
711  //if new json is present, lock file which FedRawDataInputSource will later unlock
712  if (fileStatus==newFile) lockFULocal();
713 
714  //release lock at this point
715  int retvalu=-1;
716  retvalu=fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
717  if (retvalu==-1) edm::LogError("EvFDaqDirector") << "Error unlocking the fu.lock " << strerror(errno);
718 
719 #ifdef DEBUG
720  edm::LogDebug("EvFDaqDirector") << "Waited during lock -: " << locked_period << " seconds";
721 #endif
722 
723  if ( fileStatus == noFile ) {
724  struct stat buf;
725  //edm::LogInfo("EvFDaqDirector") << " looking for EoR file: " << getEoRFilePath().c_str();
726  if ( stat(getEoRFilePath().c_str(), &buf) == 0 || stat(bu_run_dir_.c_str(), &buf)!=0)
727  fileStatus = runEnded;
728  if (stopFileLS>=0 && (int)ls > stopFileLS) {
729  edm::LogInfo("EvFDaqDirector") << "Reached maximum lumisection set by hltd";
730  fileStatus = runEnded;
731  }
732  }
733  return fileStatus;
734  }
735 
737 
738  boost::filesystem::ifstream ij(BUEoLSFile);
739  Json::Value deserializeRoot;
741 
742  if (!reader.parse(ij, deserializeRoot)) {
743  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << BUEoLSFile;
744  return -1;
745  }
746 
748  DataPoint dp;
749  dp.deserialize(deserializeRoot);
750 
751  //read definition
752  if (readEolsDefinition_) {
753  //std::string def = boost::algorithm::trim(dp.getDefinition());
755  if (def.empty()) readEolsDefinition_=false;
756  while (!def.empty()) {
757  std::string fullpath;
758  if (def.find('/')==0)
759  fullpath = def;
760  else
761  fullpath = bu_run_dir_+'/'+def;
762  struct stat buf;
763  if (stat(fullpath.c_str(), &buf) == 0) {
764  DataPointDefinition eolsDpd;
765  std::string defLabel = "legend";
766  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd,&defLabel);
767  if (eolsDpd.getNames().empty()) {
768  //try with "data" label if "legend" format is not used
769  eolsDpd = DataPointDefinition();
770  defLabel="data";
771  DataPointDefinition::getDataPointDefinitionFor(fullpath, &eolsDpd,&defLabel);
772  }
773  for (unsigned int i=0;i<eolsDpd.getNames().size();i++)
774  if (eolsDpd.getNames().at(i)=="NFiles")
776  readEolsDefinition_=false;
777  break;
778  }
779  //check if we can still find definition
780  if (def.size()<=1 || def.find('/')==std::string::npos) {
781  readEolsDefinition_=false;
782  break;
783  }
784  def = def.substr(def.find('/')+1);
785  }
786  }
787 
788  if (dp.getData().size()>eolsNFilesIndex_)
789  data = dp.getData()[eolsNFilesIndex_];
790  else {
791  edm::LogError("EvFDaqDirector") << " error reading number of files from BU JSON -: " << BUEoLSFile;
792  return -1;
793  }
794  return boost::lexical_cast<int>(data);
795  }
796 
797  bool EvFDaqDirector::bumpFile(unsigned int& ls, unsigned int& index, std::string& nextFile, uint32_t& fsize, int maxLS) {
798 
799  if (previousFileSize_ != 0) {
800  if (!fms_) {
802  }
804  previousFileSize_ = 0;
805  }
806 
807  //reached limit
808  if (maxLS>=0 && ls > (unsigned int)maxLS) return false;
809 
810  struct stat buf;
811  std::stringstream ss;
812  unsigned int nextIndex = index;
813  nextIndex++;
814 
815  // 1. Check suggested file
816  nextFile = getInputJsonFilePath(ls,index);
817  if (stat(nextFile.c_str(), &buf) == 0) {
818 
819  previousFileSize_ = buf.st_size;
820  fsize = buf.st_size;
821  return true;
822  }
823  // 2. No file -> lumi ended? (and how many?)
824  else {
825  std::string BUEoLSFile = getEoLSFilePathOnBU(ls);
826  bool eolFound = (stat(BUEoLSFile.c_str(), &buf) == 0);
827  while (eolFound) {
828 
829  // recheck that no raw file appeared in the meantime
830  if (stat(nextFile.c_str(), &buf) == 0) {
831  previousFileSize_ = buf.st_size;
832  fsize = buf.st_size;
833  return true;
834  }
835 
836  int indexFilesInLS = getNFilesFromEoLS(BUEoLSFile);
837  if (indexFilesInLS < 0)
838  //parsing failed
839  return false;
840  else {
841  //check index
842  if ((int)index<indexFilesInLS) {
843  //we have 2 files, and check for 1 failed... retry (2 will never be here)
844  edm::LogError("EvFDaqDirector") << "Potential miss of index file in LS -: " << ls << ". Missing "
845  << nextFile << " because " << indexFilesInLS-1 << " is the highest index expected. Will not update fu.lock file";
846  return false;
847  }
848  }
849  // this lumi ended, check for files
850  ++ls;
851  index = 0;
852 
853  //reached limit
854  if (maxLS>=0 && ls > (unsigned int)maxLS) return false;
855 
856  nextFile = getInputJsonFilePath(ls,0);
857  if (stat(nextFile.c_str(), &buf) == 0) {
858  // a new file was found at new lumisection, index 0
859  previousFileSize_ = buf.st_size;
860  fsize = buf.st_size;
861  return true;
862  }
863  else {
864  //change of policy: we need to cycle through each LS
865  return false;
866  }
867  BUEoLSFile = getEoLSFilePathOnBU(ls);
868  eolFound = (stat(BUEoLSFile.c_str(), &buf) == 0);
869  }
870  }
871  // no new file found
872  return false;
873  }
874 
876  if (fu_rw_lock_stream == nullptr)
877  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream "
878  << strerror(errno);
879  else {
880  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
881  unsigned int readLs = 1, readIndex = 0;
882  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
883  }
884  }
885 
887  if (create) {
888  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR | O_CREAT,
889  S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
890  chmod(fulockfile_.c_str(),0766);
891  } else {
892  fu_readwritelock_fd_ = open(fulockfile_.c_str(), O_RDWR, S_IRWXU);
893  }
894  if (fu_readwritelock_fd_ == -1)
895  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock -: " << fulockfile_
896  << " create:" << create << " error:" << strerror(errno);
897  else
898  LogDebug("EvFDaqDirector") << "creating filedesc for fureadwritelock -: "
900 
901  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
902  if (fu_rw_lock_stream == nullptr)
903  edm::LogError("EvFDaqDirector") << "problem with opening fuwritelock file stream -: " << strerror(errno);
904 
905  }
906 
908  pthread_mutex_lock(&init_lock_);
909  }
910 
912  pthread_mutex_unlock(&init_lock_);
913  }
914 
916  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_flk);
917  flock(fulocal_rwlock_fd_,LOCK_SH);
918  }
919 
921  //fcntl(fulocal_rwlock_fd_, F_SETLKW, &fulocal_rw_fulk);
922  flock(fulocal_rwlock_fd_,LOCK_UN);
923  }
924 
925 
927  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_flk2);
928  flock(fulocal_rwlock_fd2_,LOCK_EX);
929  }
930 
932  //fcntl(fulocal_rwlock_fd2_, F_SETLKW, &fulocal_rw_fulk2);
933  flock(fulocal_rwlock_fd2_,LOCK_UN);
934  }
935 
936  void EvFDaqDirector::createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
937  {
938  //used for backpressure mechanisms and monitoring
939  const std::string fuBoLS = getBoLSFilePathOnFU(lumiSection);
940  struct stat buf;
941  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
942  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
943  close(bol_fd);
944  }
945  }
946 
947  void EvFDaqDirector::createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS) {
948  if ( currentLumiSection > 0) {
949  const std::string fuEoLS =
950  getEoLSFilePathOnFU(currentLumiSection);
951  struct stat buf;
952  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
953  if ( !found ) {
954  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
955  close(eol_fd);
956  if (doCreateBoLS) createBoLSFile(lumiSection,false);
957  }
958  }
959  else if (doCreateBoLS) {
960  createBoLSFile(lumiSection,true);//needed for initial lumisection
961  }
962  }
963 
964  int EvFDaqDirector::grabNextJsonFile(std::string const& jsonSourcePath, std::string const& rawSourcePath, int64_t& fileSizeFromJson, bool& fileFound)
965  {
966  fileFound=true;
967 
968  //should be ported to use fffnaming
969  std::ostringstream fileNameWithPID;
970  fileNameWithPID << boost::filesystem::path(rawSourcePath).stem().string() << "_pid"
971  << std::setfill('0') << std::setw(5) << pid_ << ".jsn";
972 
973  // assemble json destination path
974  std::string jsonDestPath = baseRunDir() + "/" + fileNameWithPID.str();
975 
976  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to " << jsonDestPath;
977 
978  int infile=-1, outfile=-1;
979 
980  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY))< 0) {
981  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to open input file -: " << jsonSourcePath << " : " << strerror(errno);
982  usleep(100000);
983  if ((infile = ::open(jsonSourcePath.c_str(), O_RDONLY))< 0) {
984  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open input file (on retry) -: " << jsonSourcePath << " : " << strerror(errno);
985  if (errno==ENOENT) fileFound=false;
986  return -1;
987  }
988  }
989 
990  int oflag = O_CREAT | O_WRONLY | O_TRUNC | O_EXCL; //file should not exist
991  int omode = S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH;
992  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode))< 0)
993  {
994  if (errno==EEXIST) {
995  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - destination file already exists -: " << jsonDestPath << " : ";
996  ::close(infile);
997  return -1;
998  }
999  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file -: " << jsonDestPath << " : " << strerror(errno);
1000  usleep(100000);
1001  struct stat out_stat;
1002  if (stat(jsonDestPath.c_str(),&out_stat)==0) {
1003  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - output file possibly got created with error, deleting and retry -: " << jsonDestPath;
1004  if (unlink(jsonDestPath.c_str())==-1) {
1005  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonDestPath << " : "<< strerror(errno);
1006  }
1007  }
1008  if ((outfile = ::open(jsonDestPath.c_str(), oflag, omode))< 0) {
1009  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to open output file (on retry) -: " << jsonDestPath << " : " << strerror(errno);
1010  ::close(infile);
1011  return -1;
1012  }
1013  }
1014  //copy contents
1015  const std::size_t buf_sz = 512;
1016  std::size_t tot_written = 0;
1017  std::unique_ptr<char> buf(new char [buf_sz]);
1018 
1019  ssize_t sz, sz_read=1, sz_write;
1020  while (sz_read > 0 && (sz_read = ::read(infile, buf.get(), buf_sz)) > 0)
1021  {
1022  sz_write = 0;
1023  do {
1024  assert(sz_read - sz_write > 0);
1025  if ((sz = ::write(outfile, buf.get() + sz_write,sz_read - sz_write)) < 0)
1026  {
1027  sz_read = sz; // cause read loop termination
1028  break;
1029  }
1030  assert(sz > 0);
1031  sz_write += sz;
1032  tot_written+=sz;
1033  } while (sz_write < sz_read);
1034  }
1035  close(infile);
1036  close(outfile);
1037 
1038  if (tot_written>0) {
1039  //leave file if it was empty for diagnosis
1040  if (unlink(jsonSourcePath.c_str()) == -1) {
1041  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to remove -: " << jsonSourcePath << " : "<< strerror(errno);
1042  return -1;
1043  }
1044  } else {
1045  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - failed to copy json file or file was empty -: " << jsonSourcePath;
1046  return -1;
1047  }
1048 
1049  Json::Value deserializeRoot;
1051 
1052  std::string data;
1053  std::stringstream ss;
1054  bool result;
1055  try {
1056  if (tot_written<=buf_sz) {
1057  result = reader.parse(buf.get(), deserializeRoot);
1058  }
1059  else {
1060  //json will normally not be bigger than buf_sz bytes
1061  try {
1062  boost::filesystem::ifstream ij(jsonDestPath);
1063  ss << ij.rdbuf();
1064  }
1065  catch (boost::filesystem::filesystem_error const& ex) {
1066  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1067  return -1;
1068  }
1069  result = reader.parse(ss.str(), deserializeRoot);
1070  }
1071  if (!result) {
1072  if (tot_written<=buf_sz) ss << buf.get();
1073  edm::LogError("EvFDaqDirector") << "Failed to deserialize JSON file -: " << jsonDestPath
1074  << "\nERROR:\n" << reader.getFormatedErrorMessages()
1075  << "CONTENT:\n" << ss.str()<<".";
1076  return -1;
1077  }
1078 
1079  //read BU JSON
1080  DataPoint dp;
1081  dp.deserialize(deserializeRoot);
1082  bool success = false;
1083  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
1084  if (dpd_->getNames().at(i)=="NEvents")
1085  if (i<dp.getData().size()) {
1086  data = dp.getData()[i];
1087  success=true;
1088  break;
1089  }
1090  }
1091  if (!success) {
1092  if (!dp.getData().empty())
1093  data = dp.getData()[0];
1094  else {
1095  edm::LogError("EvFDaqDirector::grabNextJsonFile") << "grabNextJsonFile - " <<
1096  " error reading number of events from BU JSON; No input value. data -: " << data;
1097  return -1;
1098  }
1099  }
1100 
1101  //try to read raw file size
1102  fileSizeFromJson=-1;
1103  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
1104  if (dpd_->getNames().at(i)=="NBytes") {
1105  if (i<dp.getData().size()) {
1106  data = dp.getData()[i];
1107  try {
1108  fileSizeFromJson = boost::lexical_cast<long>(data);
1109  }
1110  catch( boost::bad_lexical_cast const& ) {
1111  //non-fatal currently, processing can continue without this value
1112  edm::LogWarning("EvFDaqDirector") << "grabNextJsonFile - error parsing number of Bytes from BU JSON. "
1113  << "Input value is -: " << data;
1114  }
1115  break;
1116  }
1117  }
1118  }
1119  return boost::lexical_cast<int>(data);
1120  }
1121  catch( boost::bad_lexical_cast const& e) {
1122  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - error parsing number of events from BU JSON. "
1123  << "Input value is -: " << data;
1124  }
1125  catch (std::runtime_error const& e)
1126  {
1127  //Can be thrown by Json parser
1128  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - std::runtime_error exception -: " << e.what();
1129  }
1130 
1131  catch (std::exception const& e)
1132  {
1133  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED! -: " << e.what();
1134  }
1135  catch (...)
1136  {
1137  //unknown exception
1138  edm::LogError("EvFDaqDirector") << "grabNextJsonFile - SOME OTHER EXCEPTION OCCURED!";
1139  }
1140 
1141  return -1;
1142  }
1143 
1145  {
1146  std::string data;
1147  try {
1148  // assemble json destination path
1149  boost::filesystem::path jsonDestPath(baseRunDir());
1150 
1151  //should be ported to use fffnaming
1152  std::ostringstream fileNameWithPID;
1153  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
1154  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
1155  jsonDestPath /= fileNameWithPID.str();
1156 
1157  LogDebug("EvFDaqDirector") << "JSON rename -: " << jsonSourcePath << " to "
1158  << jsonDestPath;
1159  try {
1160  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
1161  }
1162  catch (boost::filesystem::filesystem_error const& ex)
1163  {
1164  // Input dir gone?
1165  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1166  // << " Maybe the file is not yet visible by FU. Trying again in one second";
1167  sleep(1);
1168  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
1169  }
1170  unlockFULocal();
1171 
1172  try {
1173  //sometimes this fails but file gets deleted
1174  boost::filesystem::remove(jsonSourcePath);
1175  }
1176  catch (boost::filesystem::filesystem_error const& ex)
1177  {
1178  // Input dir gone?
1179  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1180  }
1181  catch (std::exception const& ex)
1182  {
1183  // Input dir gone?
1184  edm::LogError("EvFDaqDirector") << "grabNextFile std::exception CAUGHT -: " << ex.what();
1185  }
1186 
1187  boost::filesystem::ifstream ij(jsonDestPath);
1188  Json::Value deserializeRoot;
1190 
1191  std::stringstream ss;
1192  ss << ij.rdbuf();
1193  if (!reader.parse(ss.str(), deserializeRoot)) {
1194 
1195  edm::LogError("EvFDaqDirector") << "grabNextFile Failed to deserialize JSON file -: " << jsonDestPath
1196  << "\nERROR:\n" << reader.getFormatedErrorMessages()
1197  << "CONTENT:\n" << ss.str()<<".";
1198  throw std::runtime_error("Cannot deserialize input JSON file");
1199  }
1200 
1201  //read BU JSON
1202  std::string data;
1203  DataPoint dp;
1204  dp.deserialize(deserializeRoot);
1205  bool success = false;
1206  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
1207  if (dpd_->getNames().at(i)=="NEvents")
1208  if (i<dp.getData().size()) {
1209  data = dp.getData()[i];
1210  success=true;
1211  }
1212  }
1213  if (!success) {
1214  if (!dp.getData().empty())
1215  data = dp.getData()[0];
1216  else
1217  throw cms::Exception("EvFDaqDirector::grabNextJsonFileUnlock") <<
1218  " error reading number of events from BU JSON -: No input value " << data;
1219  }
1220  return boost::lexical_cast<int>(data);
1221  }
1222  catch (boost::filesystem::filesystem_error const& ex)
1223  {
1224  // Input dir gone?
1225  unlockFULocal();
1226  edm::LogError("EvFDaqDirector") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
1227  }
1228  catch (std::runtime_error const& e)
1229  {
1230  // Another process grabbed the file and NFS did not register this
1231  unlockFULocal();
1232  edm::LogError("EvFDaqDirector") << "grabNextFile runtime Exception -: " << e.what();
1233  }
1234  catch( boost::bad_lexical_cast const& ) {
1235  edm::LogError("EvFDaqDirector") << "grabNextFile error parsing number of events from BU JSON. "
1236  << "Input value is -: " << data;
1237  }
1238  catch (std::exception const& e)
1239  {
1240  // BU run directory disappeared?
1241  unlockFULocal();
1242  edm::LogError("EvFDaqDirector") << "grabNextFile SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
1243  }
1244 
1245  return -1;
1246  }
1247 
1248  EvFDaqDirector::FileStatus EvFDaqDirector::contactFileBroker(unsigned int& serverHttpStatus, bool& serverError,
1249  uint32_t& serverLS, uint32_t& closedServerLS,
1250  std::string& nextFileJson, std::string& nextFileRaw, int maxLS) {
1251 
1252  EvFDaqDirector::FileStatus fileStatus = noFile;
1253  serverError = false;
1254 
1255  boost::system::error_code ec;
1256  try {
1257  while (true) {
1258 
1259  //socket connect
1260  if (!fileBrokerKeepAlive_ || !socket_->is_open()) {
1262 
1263  if (ec) {
1264  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1265  serverError = true;
1266  break;
1267  }
1268  }
1269 
1270  boost::asio::streambuf request;
1271  std::ostream request_stream(&request);
1272  std::string path = "/popfile?runnumber="+run_nstring_+"&pid="+pid_;
1273  if (maxLS>=0) {
1274  std::stringstream spath;
1275  spath << path << "&stopls=" << maxLS;
1276  path = spath.str();
1277  edm::LogWarning("EvFDaqDirector") << "Stop LS requested " << maxLS;
1278  }
1279  request_stream << "GET " << path << " HTTP/1.1\r\n";
1280  request_stream << "Host: " << fileBrokerHost_ << "\r\n";
1281  request_stream << "Accept: */*\r\n";
1282  request_stream << "Connection: keep-alive\r\n\r\n";
1283 
1284  boost::asio::write(*socket_, request,ec);
1285  if (ec) {
1286  if (fileBrokerKeepAlive_ && ec == boost::asio::error::connection_reset) {
1287  edm::LogInfo("EvFDaqDirector") << "reconnecting socket on received connection_reset";
1288  //we got disconnected, try to reconnect to the server before writing the request
1290  if (ec) {
1291  edm::LogWarning("EvFDaqDirector") << "boost::asio::connect error -:" << ec;
1292  serverError = true;
1293  break;
1294  }
1295  continue;
1296  }
1297  edm::LogWarning("EvFDaqDirector") << "boost::asio::write error -:" << ec;
1298  serverError = true;
1299  break;
1300  }
1301 
1302  boost::asio::streambuf response;
1303  boost::asio::read_until(*socket_, response, "\r\n",ec);
1304  if (ec) {
1305  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1306  serverError = true;
1307  break;
1308  }
1309 
1310  std::istream response_stream(&response);
1311 
1312  std::string http_version;
1313  response_stream >> http_version;
1314 
1315  response_stream >> serverHttpStatus;
1316 
1317  std::string status_message;
1318  std::getline(response_stream, status_message);
1319  if (!response_stream || http_version.substr(0, 5) != "HTTP/")
1320  {
1321  edm::LogWarning("EvFDaqDirector") << "Invalid server response";
1322  serverError = true;
1323  break;
1324  }
1325  if (serverHttpStatus != 200)
1326  {
1327  edm::LogWarning("EvFDaqDirector") << "Response returned with status code " << serverHttpStatus;
1328  serverError = true;
1329  break;
1330  }
1331 
1332  // Process the response headers.
1334  while (std::getline(response_stream, header) && header != "\r") {
1335  }
1336 
1337  std::string fileInfo;
1338  std::map<std::string,std::string> serverMap;
1339  while (std::getline(response_stream, fileInfo) && fileInfo != "\r") {
1340  auto pos=fileInfo.find("=");
1341  if (pos==std::string::npos) continue;
1342  auto stitle = fileInfo.substr(0,pos);
1343  auto svalue = fileInfo.substr(pos+1);
1344  serverMap[stitle]=svalue;
1345  }
1346 
1347  //check that response run number if correct
1348  auto server_version = serverMap.find("version");
1349  assert(server_version!=serverMap.end());
1350 
1351  auto server_run = serverMap.find("runnumber");
1352  assert(server_run!=serverMap.end());
1353  assert(run_nstring_==server_run->second);
1354 
1355  auto server_state = serverMap.find("state");
1356  assert(server_state!=serverMap.end());
1357 
1358  auto server_eols = serverMap.find("lasteols");
1359  assert(server_eols!=serverMap.end());
1360 
1361  auto server_ls = serverMap.find("lumisection");
1362 
1363 
1364  closedServerLS = (uint64_t)std::max(0,atoi(server_eols->second.c_str()));
1365  if (server_ls!=serverMap.end())
1366  serverLS = (uint64_t)std::max(1,atoi(server_ls->second.c_str()));
1367  else
1368  serverLS = closedServerLS+1;
1369 
1370  std::string s_state = server_state->second;
1371  if (s_state == "STARTING") //initial, always empty starting with LS 1
1372  {
1373  auto server_file = serverMap.find("file");
1374  assert(server_file==serverMap.end());//no file with starting state
1375  fileStatus=noFile;
1376  edm::LogInfo("EvFDaqDirector") << "Got STARTING notification with last EOLS " << closedServerLS;
1377  }
1378  else if (s_state=="READY")
1379  {
1380  auto server_file = serverMap.find("file");
1381  if (server_file==serverMap.end()) {
1382  //can be returned by server if files from new LS already appeared but LS is not yet closed
1383  if (serverLS<=closedServerLS) serverLS=closedServerLS+1;
1384  fileStatus=noFile;
1385  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " and no new file";
1386  }
1387  else {
1388  std::string filestem;
1389  std::string fileprefix;
1390  auto server_fileprefix = serverMap.find("fileprefix");
1391 
1392  if (server_fileprefix!=serverMap.end()) {
1393  auto pssize = server_fileprefix->second.size();
1394  if (pssize>1 && server_fileprefix->second[0]=='"' && server_fileprefix->second[pssize-1]=='"')
1395  fileprefix = server_fileprefix->second.substr(1,pssize-2);
1396  else
1397  fileprefix = server_fileprefix->second;
1398  }
1399 
1400  //remove string literals
1401  auto ssize = server_file->second.size();
1402  if (ssize>1 && server_file->second[0]=='"' && server_file->second[ssize-1]=='"')
1403  filestem = server_file->second.substr(1,ssize-2);
1404  else
1405  filestem = server_file->second;
1406  assert(!filestem.empty());
1407  nextFileRaw = bu_run_dir_ +"/" + filestem+".raw";//raw files are not moved
1408  filestem = bu_run_dir_ + "/" + fileprefix + filestem;
1409  nextFileJson = filestem + ".jsn";
1410  fileStatus=newFile;
1411  edm::LogInfo("EvFDaqDirector") << "Got READY notification with last EOLS " << closedServerLS << " new LS " << serverLS << " file:" << filestem;
1412  }
1413  }
1414  else if (s_state== "EOLS") {
1415  serverLS=closedServerLS+1;
1416  edm::LogInfo("EvFDaqDirector") << "Got EOLS notification with last EOLS " << closedServerLS;
1417  fileStatus=noFile;
1418  }
1419  else if (s_state=="EOR") {
1420  //server_eor = serverMap.find("iseor");
1421  edm::LogInfo("EvFDaqDirector") << "Got EOR notification with last EOLS " << closedServerLS;
1422  fileStatus=runEnded;
1423  }
1424  else if (s_state=="NORUN") {
1425  auto err_msg = serverMap.find("errormessage");
1426  if (err_msg!=serverMap.end())
1427  edm::LogWarning("EvFDaqDirector") << "Server NORUN -:" << server_state->second << " : " << err_msg->second;
1428  else
1429  edm::LogWarning("EvFDaqDirector") << "Server NORUN ";
1430  edm::LogWarning("EvFDaqDirector") << "executing run end";
1431  fileStatus=runEnded;
1432  }
1433  else if (s_state=="ERROR") {
1434  auto err_msg = serverMap.find("errormessage");
1435  if (err_msg!=serverMap.end())
1436  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second << " : " << err_msg->second;
1437  else
1438  edm::LogWarning("EvFDaqDirector") << "Server error -:" << server_state->second;
1439  fileStatus=noFile;
1440  serverError=true;
1441  }
1442  else {
1443  edm::LogWarning("EvFDaqDirector") << "Unknown Server state -:" << server_state->second;
1444  fileStatus=noFile;
1445  serverError=true;
1446  }
1447 
1448  // Read until EOF, writing data to output as we go.
1449  if (!fileBrokerKeepAlive_) {
1450  while (boost::asio::read(*socket_, response,
1451  boost::asio::transfer_at_least(1), ec)) {
1452  }
1453  if (ec != boost::asio::error::eof) {
1454  edm::LogWarning("EvFDaqDirector") << "boost::asio::read_until error -:" << ec;
1455  serverError = true;
1456  }
1457  }
1458  break;
1459  }
1460  }
1461  catch (std::exception const& e)
1462  {
1463  edm::LogWarning("EvFDaqDirector") << "Exception in socket handling";
1464  serverError= true;
1465  }
1466 
1467  if (!fileBrokerKeepAlive_ && socket_->is_open()) {
1468  socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
1469  if (ec) {
1470  edm::LogWarning("EvFDaqDirector") << "socket shutdown error -:" << ec;
1471  }
1472  socket_->close(ec);
1473  if (ec) {
1474  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1475  }
1476  }
1477 
1478  if (serverError) {
1479  if (socket_->is_open()) socket_->close(ec);
1480  if (ec) {
1481  edm::LogWarning("EvFDaqDirector") << "socket close error -:" << ec;
1482  }
1483  fileStatus = noFile;
1484  sleep(1);//back-off if error detected
1485  }
1486  return fileStatus;
1487  }
1488 
1489 
1490  EvFDaqDirector::FileStatus EvFDaqDirector::getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int& ls, std::string& nextFileRaw,
1491  int& serverEventsInNewFile, int64_t& fileSizeFromJson, uint64_t& thisLockWaitTimeUs) {
1492 
1493  EvFDaqDirector::FileStatus fileStatus = noFile;
1494 
1495  //int retval = -1;
1496  //int lock_attempts = 0;
1497  //long total_lock_attempts = 0;
1498 
1499  struct stat buf;
1500  int stopFileLS = -1;
1501  int stopFileCheck = stat(stopFilePath_.c_str(),&buf);
1502  int stopFilePidCheck = stat(stopFilePathPid_.c_str(),&buf);
1503  if (stopFileCheck==0 || stopFilePidCheck==0) {
1504  if (stopFileCheck==0)
1505  stopFileLS = readLastLSEntry(stopFilePath_);
1506  else
1507  stopFileLS = 1;//stop without drain if only pid is stopped
1508  if (!stop_ls_override_) {
1509  //if lumisection is higher than in stop file, should quit at next from current
1510  if (stopFileLS>=0 && (int)ls>=stopFileLS) stopFileLS = stop_ls_override_ = ls;
1511  }
1512  else stopFileLS = stop_ls_override_;
1513  edm::LogWarning("EvFDaqDirector") << "Detected stop request from hltd. Ending run for this process after LS -: " << stopFileLS;
1514  //return runEnded;
1515  }
1516  else //if file was removed before reaching stop condition, reset this
1517  stop_ls_override_ = 0;
1518 
1519  /* look for EoLS
1520  if (stat(getEoLSFilePathOnFU(currentLumiSection).c_str(),&buf)==0) {
1521  edm::LogWarning("EvFDaqDirector") << "Detected local EoLS for lumisection "<< currentLumiSection ;
1522  ls++;
1523  return noFile;
1524  }
1525  */
1526 
1527 
1528  timeval ts_lockbegin;
1529  gettimeofday(&ts_lockbegin,nullptr);
1530 
1531  std::string nextFileJson;
1532  uint32_t serverLS, closedServerLS;
1533  unsigned int serverHttpStatus;
1534  bool serverError;
1535 
1536  //local lock to force index json and EoLS files to appear in order
1538  lockFULocal2();
1539 
1540  int maxLS = stopFileLS < 0 ? -1: std::max(stopFileLS,(int)currentLumiSection);
1541  fileStatus = contactFileBroker(serverHttpStatus, serverError, serverLS, closedServerLS, nextFileJson, nextFileRaw, maxLS);
1542 
1543  if (serverError) {
1544  //do not update anything
1546  unlockFULocal2();
1547  return noFile;
1548  }
1549 
1550  //handle creation of EoLS and BoLS files if lumisection has changed
1551  if (currentLumiSection==0) {
1552  if (fileStatus == runEnded) {
1553  createLumiSectionFiles(closedServerLS,0);
1554  createLumiSectionFiles(serverLS,closedServerLS,false); // +1
1555  }
1556  else
1557  createLumiSectionFiles(serverLS,0);
1558  } else {
1559  //loop over and create any EoLS files missing
1560  if (closedServerLS>=currentLumiSection) {
1561  for (uint32_t i=std::max(currentLumiSection,1U);i<=closedServerLS;i++)
1563  }
1564  }
1565 
1566  bool fileFound=true;
1567 
1568  if (fileStatus == newFile)
1569  serverEventsInNewFile = grabNextJsonFile(nextFileJson,nextFileRaw,fileSizeFromJson,fileFound);
1570 
1571  if (!fileFound) {
1572  //catch condition where directory got deleted
1573  fileStatus = noFile;
1574  struct stat buf;
1575  if (stat(bu_run_dir_.c_str(),&buf)!=0) {
1576  edm::LogWarning("EvFDaqDirector") << "BU run directory not found:" << bu_run_dir_;
1577  fileStatus=runEnded;
1578  }
1579  }
1580 
1581  //can unlock because all files have been created locally
1583  unlockFULocal2();
1584 
1585 
1586  if (fileStatus==runEnded)
1587  ls = std::max(currentLumiSection,serverLS);
1588  else if (fileStatus==newFile) {
1589  assert(serverLS>=ls);
1590  ls = serverLS;
1591  }
1592  else if (fileStatus==noFile) {
1593  if (serverLS>=ls)
1594  ls = serverLS;
1595  else {
1596  edm::LogWarning("EvFDaqDirector") << "Server reported LS " << serverLS << " which is smaller than currently open LS " << ls << ". Ignoring response";
1597  sleep(1);
1598  }
1599  }
1600 
1601  return fileStatus;
1602  }
1603 
1605  // create open dir if not already there
1606 
1608  if (!boost::filesystem::is_directory(openPath)) {
1609  LogDebug("EvFDaqDirector") << "<open> FU dir not found. Creating... -:" << openPath.string();
1610  boost::filesystem::create_directories(openPath);
1611  }
1612  }
1613 
1614 
1616 
1617  boost::filesystem::ifstream ij(file);
1618  Json::Value deserializeRoot;
1620 
1621  if (!reader.parse(ij, deserializeRoot)) {
1622  edm::LogError("EvFDaqDirector") << "Cannot deserialize input JSON file -:" << file;
1623  return -1;
1624  }
1625 
1626  int ret = deserializeRoot.get("lastLS","").asInt();
1627  return ret;
1628 
1629  }
1630 
1632  std::string fileprefix = run_dir_ +"/"+run_string_+"_ls";
1633  std::string fullpath;
1634  struct stat buf;
1635  unsigned int lscount=startFromLS_;
1636  do {
1637  std::stringstream ss;
1638  ss << fileprefix << std::setfill('0') << std::setw(4) << lscount << "_EoLS.jsn";
1639  fullpath = ss.str();
1640  lscount++;
1641  }
1642  while(stat(fullpath.c_str(),&buf)==0);
1643  return lscount-1;
1644  }
1645 
1646  //if transferSystem PSet is present in the menu, we require it to be complete and consistent for all specified streams
1648  {
1649  if(transferSystemJson_) return;
1650 
1651  transferSystemJson_.reset(new Json::Value);
1652  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1653  if (topPset.existsAs<edm::ParameterSet>("transferSystem",true))
1654  {
1655  const edm::ParameterSet& tsPset(topPset.getParameterSet("transferSystem"));
1656 
1657  Json::Value destinationsVal(Json::arrayValue);
1658  std::vector<std::string> destinations = tsPset.getParameter<std::vector<std::string>>("destinations");
1659  for (auto & dest: destinations) destinationsVal.append(dest);
1660  (*transferSystemJson_)["destinations"]=destinationsVal;
1661 
1662  Json::Value modesVal(Json::arrayValue);
1663  std::vector<std::string> modes = tsPset.getParameter< std::vector<std::string> >("transferModes");
1664  for (auto & mode: modes) modesVal.append(mode);
1665  (*transferSystemJson_)["transferModes"]=modesVal;
1666 
1667  for (auto psKeyItr =tsPset.psetTable().begin();psKeyItr!=tsPset.psetTable().end(); ++ psKeyItr) {
1668  if (psKeyItr->first!="destinations" && psKeyItr->first!="transferModes") {
1669  const edm::ParameterSet & streamDef = tsPset.getParameterSet(psKeyItr->first);
1670  Json::Value streamVal;
1671  for (auto & mode : modes) {
1672  //validation
1673  if (!streamDef.existsAs<std::vector<std::string>>(mode,true))
1674  throw cms::Exception("EvFDaqDirector") << " Missing transfer system specification for -:" << psKeyItr->first << " (transferMode " << mode << ")";
1675  std::vector<std::string> streamDestinations = streamDef.getParameter<std::vector<std::string>>(mode);
1676 
1677  Json::Value sDestsValue(Json::arrayValue);
1678 
1679  if (streamDestinations.empty())
1680  throw cms::Exception("EvFDaqDirector") << " Missing transter system destination(s) for -: "<< psKeyItr->first << ", mode:" << mode;
1681 
1682  for (auto & sdest:streamDestinations) {
1683  bool sDestValid=false;
1684  sDestsValue.append(sdest);
1685  for (auto & dest: destinations) {
1686  if (dest==sdest) sDestValid=true;
1687  }
1688  if (!sDestValid)
1689  throw cms::Exception("EvFDaqDirector") << " Invalid transter system destination specified for -: "<< psKeyItr->first << ", mode:" << mode << ", dest:"<<sdest;
1690  }
1691  streamVal[mode]=sDestsValue;
1692  }
1693  (*transferSystemJson_)[psKeyItr->first] = streamVal;
1694  }
1695  }
1696  }
1697  else {
1698  if (requireTSPSet_)
1699  throw cms::Exception("EvFDaqDirector") << "transferSystem PSet not found";
1700  }
1701  }
1702 
1704  {
1705  std::string streamRequestName;
1706  if (transferSystemJson_->isMember(stream.c_str()))
1707  streamRequestName = stream;
1708  else {
1709  std::stringstream msg;
1710  msg << "Transfer system mode definitions missing for -: " << stream;
1711  if (requireTSPSet_)
1712  throw cms::Exception("EvFDaqDirector") << msg.str();
1713  else {
1714  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1715  return std::string("Failsafe");
1716  }
1717  }
1718  //return empty if strict check parameter is not on
1719  if (!requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_=="null")) {
1720  edm::LogWarning("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter."
1721  << "Switch on requireTSPSet parameter to enforce this requirement. Setting mode to empty string.";
1722  return std::string("Failsafe");
1723  }
1724  if (requireTSPSet_ && (selectedTransferMode_.empty() || selectedTransferMode_=="null")) {
1725  throw cms::Exception("EvFDaqDirector") << "Selected mode string is not provided as DaqDirector parameter.";
1726  }
1727  //check if stream has properly listed transfer stream
1728  if (!transferSystemJson_->get(streamRequestName, "").isMember(selectedTransferMode_.c_str()))
1729  {
1730  std::stringstream msg;
1731  msg << "Selected transfer mode " << selectedTransferMode_ << " is not specified for stream " << streamRequestName;
1732  if (requireTSPSet_)
1733  throw cms::Exception("EvFDaqDirector") << msg.str();
1734  else
1735  edm::LogWarning("EvFDaqDirector") << msg.str() << " (permissive mode)";
1736  return std::string("Failsafe");
1737  }
1738  Json::Value destsVec = transferSystemJson_->get(streamRequestName, "").get(selectedTransferMode_,"");
1739 
1740  //flatten string json::Array into CSV std::string
1741  std::string ret;
1742  for (Json::Value::iterator it = destsVec.begin(); it!=destsVec.end(); it++)
1743  {
1744  if (!ret.empty()) ret +=",";
1745  ret+=(*it).asString();
1746  }
1747  return ret;
1748  }
1749 
1751  {
1752  if (mergeTypePset_.empty()) return;
1753  if(!mergeTypeMap_.empty()) return;
1754  edm::ParameterSet const& topPset = edm::getParameterSet(pc.parameterSetID());
1755  if (topPset.existsAs<edm::ParameterSet>(mergeTypePset_,true))
1756  {
1757  const edm::ParameterSet& tsPset(topPset.getParameterSet(mergeTypePset_));
1758  for (std::string pname : tsPset.getParameterNames()) {
1759  std::string streamType = tsPset.getParameter<std::string>(pname);
1760  tbb::concurrent_hash_map<std::string,std::string>::accessor ac;
1761  mergeTypeMap_.insert(ac,pname);
1762  ac->second = streamType;
1763  ac.release();
1764  }
1765  }
1766  }
1767 
1769  {
1770  tbb::concurrent_hash_map<std::string,std::string>::const_accessor search_ac;
1771  if (mergeTypeMap_.find(search_ac,stream))
1772  return search_ac->second;
1773 
1774  edm::LogInfo("EvFDaqDirector") << " No merging type specified for stream " << stream << ". Using default value";
1775  std::string defaultName = MergeTypeNames_[defaultType];
1776  tbb::concurrent_hash_map<std::string,std::string>::accessor ac;
1777  mergeTypeMap_.insert(ac,stream);
1778  ac->second = defaultName;
1779  ac.release();
1780  return defaultName;
1781  }
1782 
1784  std::string proc_flag = run_dir_ + "/processing";
1785  int proc_flag_fd = open(proc_flag.c_str(), O_RDWR | O_CREAT, S_IRWXU | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH);
1786  close(proc_flag_fd);
1787  }
1788 
1789  struct flock EvFDaqDirector::make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
1790  {
1791 #ifdef __APPLE__
1792  return {start, len, pid, type, whence};
1793 #else
1794  return {type, whence, start, len, pid};
1795 #endif
1796  }
1797 
1798 }
1799 
#define LogDebug(id)
unsigned int nThreads_
type
Definition: HCALResponse.h:21
unsigned int maxNumberOfThreads() const
Definition: SystemBounds.h:46
T getParameter(std::string const &) const
std::string getStreamDestinations(std::string const &stream) const
Value get(UInt index, const Value &defaultValue) const
std::vector< std::string > & getData()
Definition: DataPoint.h:58
struct flock fu_rw_flk
static struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
std::string run_string_
std::string protocolBufferHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::string bolsFileName(const unsigned int run, const unsigned int ls)
std::string streamerDataChecksumFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
std::string getMergedProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
void watchPreallocate(Preallocate::slot_type const &iSlot)
const_iterator begin() const
bool existsAs(std::string const &parameterName, bool trackiness=true) const
checks if a parameter exists as a given type
Definition: ParameterSet.h:185
boost::asio::io_service io_service_
std::list< std::pair< int, InputFile * > > * filesToDeletePtr_
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
std::string fulockfile_
def copy(args, dbName)
def create(alignables, pedeDump, additionalData, outputFile, config)
std::string getMergedDatChecksumFilePath(const unsigned int ls, std::string const &stream) const
std::shared_ptr< Json::Value > transferSystemJson_
void setAllowAnything()
allow any parameter label/value pairs
jsoncollector::DataPointDefinition * dpd_
void accumulateFileSize(unsigned int lumi, unsigned long fileSize)
std::unique_ptr< boost::asio::ip::tcp::resolver > resolver_
void watchPreGlobalEndLumi(PreGlobalEndLumi::slot_type const &iSlot)
std::string getInitFilePath(std::string const &stream) const
std::string getMergedRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string inputRawFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
ParameterSet const & getParameterSet(ParameterSetID const &id)
pthread_mutex_t init_lock_
LuminosityBlockID const & luminosityBlockID() const
Definition: GlobalContext.h:57
std::string getProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
std::string getOpenInitFilePath(std::string const &stream) const
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
Value & append(const Value &value)
Append value to array at the end.
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
EvFDaqDirector::FileStatus contactFileBroker(unsigned int &serverHttpStatus, bool &serverState, uint32_t &serverLS, uint32_t &closedServerLS, std::string &nextFileJson, std::string &nextFileRaw, int maxLS)
int grabNextJsonFileAndUnlock(boost::filesystem::path const &jsonSourcePath)
Represents a JSON value.
Definition: value.h:111
std::string getEoLSFilePathOnBU(const unsigned int ls) const
#define nullptr
std::string getEoRFilePath() const
unsigned long previousFileSize_
std::unique_ptr< boost::asio::ip::tcp::resolver::iterator > endpoint_iterator_
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
ParameterSetID const & parameterSetID() const
struct flock fu_rw_fulk
unsigned int maxNumberOfStreams() const
Definition: SystemBounds.h:43
std::string hltSourceDirectory_
unsigned int startFromLS_
std::string getOpenProtocolBufferHistogramFilePath(const unsigned int ls, std::string const &stream) const
void setComment(std::string const &value)
std::mutex * fileDeleteLockPtr_
tbb::concurrent_hash_map< std::string, std::string > mergeTypeMap_
std::string mergeTypePset_
std::string streamerDataFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def chmod(path, mode)
Definition: eostools.py:294
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
std::unique_ptr< boost::asio::ip::tcp::socket > socket_
std::string getStreamMergeType(std::string const &stream, MergeType defaultType)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
std::string getOpenOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
std::string getOpenRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getRootHistogramFilePath(const unsigned int ls, std::string const &stream) const
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
std::string inputJsonFileName(const unsigned int run, const unsigned int ls, const unsigned int index)
std::string stopFilePath_
std::string bu_base_dir_
void removeFile(unsigned int ls, unsigned int index)
unsigned int stop_ls_override_
std::string selectedTransferMode_
std::string streamerJsonFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
void createLumiSectionFiles(const uint32_t lumiSection, const uint32_t currentLumiSection, bool doCreateBoLS=true)
std::string getOpenInputJsonFilePath(const unsigned int ls, const unsigned int index) const
std::string eorFileName(const unsigned int run)
int readLastLSEntry(std::string const &file)
std::unique_ptr< boost::asio::ip::tcp::resolver::query > query_
std::string stopFilePathPid_
unsigned int eolsNFilesIndex_
std::string & baseRunDir()
std::string getDatFilePath(const unsigned int ls, std::string const &stream) const
std::string run_nstring_
void preBeginJob(edm::PathsAndConsumesOfModulesBase const &, edm::ProcessContext const &)
void watchPreGlobalBeginRun(PreGlobalBeginRun::slot_type const &iSlot)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
void openFULockfileStream(bool create)
int getNFilesFromEoLS(std::string BUEoLSFile)
def ls(path, rec=False)
Definition: eostools.py:349
void preBeginRun(edm::GlobalContext const &globalContext)
std::string rootHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
Int asInt() const
unsigned int getLumisectionToStart() const
std::string initFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned long long uint64_t
Definition: Time.h:15
void checkMergeTypePSet(edm::ProcessContext const &pc)
ParameterSet const & getParameterSet(std::string const &) const
void preGlobalEndLumi(edm::GlobalContext const &globalContext)
void deserialize(Json::Value &root) override
Definition: DataPoint.cc:56
void watchPostGlobalEndRun(PostGlobalEndRun::slot_type const &iSlot)
LuminosityBlockNumber_t luminosityBlock() const
void add(std::string const &label, ParameterSetDescription const &psetDescription)
std::string & getDefinition()
Definition: DataPoint.h:59
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:212
tuple msg
Definition: mps_check.py:278
evf::FastMonitoringService * fms_
void watchPreBeginJob(PreBeginJob::slot_type const &iSlot)
convenience function for attaching to signal
std::string bu_run_dir_
std::string rootHistogramFileNameWithInstance(const unsigned int run, const unsigned int ls, std::string const &stream, std::string const &instance)
def mkdir(path)
Definition: eostools.py:251
std::string findHighestRunDir()
Definition: DirManager.cc:20
std::string getBoLSFilePathOnFU(const unsigned int ls) const
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
void postEndRun(edm::GlobalContext const &globalContext)
Unserialize a JSON document into a Value.
Definition: reader.h:16
static const std::vector< std::string > MergeTypeNames_
int grabNextJsonFile(std::string const &jsonSourcePath, std::string const &rawSourcePath, int64_t &fileSizeFromJson, bool &fileFound)
const_iterator end() const
void checkTransferSystemPSet(edm::ProcessContext const &pc)
std::string fileBrokerPort_
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile, uint32_t &fsize, int maxLS)
void preallocate(edm::service::SystemBounds const &bounds)
Iterator for object and array value.
Definition: value.h:1007
def write(self, setup)
std::string getInputJsonFilePath(const unsigned int ls, const unsigned int index) const
def check(config)
Definition: trackerTree.py:14
std::string getRunOpenDirPath() const
JetCorrectorParameters::Definitions def
Definition: classes.h:6
std::string fileBrokerHost_
std::string protocolBufferHistogramFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
unsigned int fuLockPollInterval_
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
std::string getFFFParamsFilePathOnBU() const
std::string eolsFileName(const unsigned int run, const unsigned int ls)
unsigned int nStreams_
std::string streamerDataFileNameWithPid(const unsigned int run, const unsigned int ls, std::string const &stream)
std::string getEoRFilePathOnFU() const
std::string bu_run_open_dir_
array value (ordered list)
Definition: value.h:31