CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
EvFDaqDirector.cc
Go to the documentation of this file.
1 #include "EvFDaqDirector.h"
6 #include <iostream>
7 #include <sstream>
8 #include <sys/time.h>
9 #include <unistd.h>
10 #include <stdio.h>
11 
12 using std::stringstream;
13 
14 namespace evf {
15 
16  namespace {
17  struct flock make_flock(short type, short whence, off_t start, off_t len, pid_t pid)
18  {
19 #ifdef __APPLE__
20  return {start, len, pid, type, whence};
21 #else
22  return {type, whence, start, len, pid};
23 #endif
24  }
25  }
26 
28  edm::ActivityRegistry& reg) :
29  testModeNoBuilderUnit_(
30  pset.getUntrackedParameter<bool> ("testModeNoBuilderUnit",
31  false)
32  ),
33  base_dir_(
34  pset.getUntrackedParameter<std::string> ("baseDir", "/data")
35  ),
36  bu_base_dir_(
37  pset.getUntrackedParameter<std::string> ("buBaseDir", "/data")
38  ),
39  sm_base_dir_(
40  pset.getUntrackedParameter<std::string> ("smBaseDir", "/sm")
41  ),
42  monitor_base_dir_(
43  pset.getUntrackedParameter<std::string> ("monBaseDir",
44  "MON")
45  ),
46  directorBu_(
47  pset.getUntrackedParameter<bool> ("directorIsBu", false)
48  ),
49  run_(pset.getUntrackedParameter<unsigned int> ("runNumber",0)),
50  hostname_(""),
51  bu_readlock_fd_(-1),
52  bu_writelock_fd_(-1),
53  fu_readwritelock_fd_(-1),
54  data_readwrite_fd_(-1),
55 
56  bu_w_lock_stream(0),
57  bu_r_lock_stream(0),
58  fu_rw_lock_stream(0),
59  bu_w_monitor_stream(0),
60  bu_t_monitor_stream(0),
61  data_rw_stream(0),
62 
63  dirManager_(base_dir_),
64 
65  previousFileSize_(0),
66  jumpLS_(0),
67  jumpIndex_(0),
68 
69  bu_w_flk( make_flock( F_WRLCK, SEEK_SET, 0, 0, 0 )),
70  bu_r_flk( make_flock( F_RDLCK, SEEK_SET, 0, 0, 0 )),
71  bu_w_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, 0 )),
72  bu_r_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, 0 )),
73  fu_rw_flk( make_flock ( F_WRLCK, SEEK_SET, 0, 0, getpid() )),
74  fu_rw_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, getpid() )),
75  data_rw_flk( make_flock ( F_WRLCK, SEEK_SET, 0, 0, getpid() )),
76  data_rw_fulk( make_flock( F_UNLCK, SEEK_SET, 0, 0, getpid() ))
77  {
80 
81  std::stringstream ss;
82  ss << "run" << std::setfill('0') << std::setw(6) << run_;
83  run_string_ = ss.str();
85 
86  //save hostname for later
87  char hostname[33];
88  gethostname(hostname,32);
89  hostname_ = hostname;
90  // check if base dir exists or create it accordingly
91  int retval = mkdir(base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
92  if (retval != 0 && errno != EEXIST) {
93  throw cms::Exception("DaqDirector") << " Error checking for base dir "
94  << base_dir_ << " mkdir error:" << strerror(errno) << "\n";
95  }
96 
97  //bu_run_dir: for FU, for which the base dir is local and the BU is remote, it is expected to be there
98  //for BU, it is created at this point
99 
100 
101  if (directorBu_)
102  {
104  std::string bulockfile = bu_run_dir_ + "/bu.lock";
105  std::string fulockfile = bu_run_dir_ + "/fu.lock";
106 
107  //make or find bu run dir
108  retval = mkdir(bu_run_dir_.c_str(),
109  S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
110  if (retval != 0 && errno != EEXIST) {
111  throw cms::Exception("DaqDirector")
112  << " Error creating bu run dir " << bu_run_dir_
113  << " mkdir error:" << strerror(errno) << "\n";
114  }
115  bu_run_open_dir_ = bu_run_dir_ + "/open";
116  retval = mkdir(bu_run_open_dir_.c_str(),
117  S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
118  if (retval != 0 && errno != EEXIST) {
119  throw cms::Exception("DaqDirector") << " Error creating bu run open dir "
120  << bu_run_open_dir_ << " mkdir error:" << strerror(errno)
121  << "\n";
122  }
123 
124  //make or find monitor base dir
125  //@@EM make sure this is still needed
126 
127  stringstream ost;
128  ost << bu_run_dir_ << "/" << monitor_base_dir_;
129  monitor_base_dir_ = ost.str() + "_OLD";
130  retval = mkdir(monitor_base_dir_.c_str(),
131  S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
132  if (retval != 0 && errno != EEXIST) {
133  throw cms::Exception("DaqDirector")
134  << " Error creating monitor dir " << monitor_base_dir_
135  << " mkdir error:" << strerror(errno) << "\n";
136  }
137 
138  // the BU director does not need to know about the fu lock
139  bu_writelock_fd_ = open(bulockfile.c_str(),
140  O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU);
141  if (bu_writelock_fd_ == -1)
142  edm::LogWarning("EvFDaqDirector") << "problem with creating filedesc for buwritelock "
143  << strerror(errno);
144  else
145  edm::LogInfo("EvFDaqDirector") << "creating filedesc for buwritelock "
146  << bu_writelock_fd_;
147  bu_w_lock_stream = fdopen(bu_writelock_fd_, "w");
148  if (bu_w_lock_stream == 0)
149  edm::LogWarning("EvFDaqDirector")<< "Error creating write lock stream " << strerror(errno);
150  std::string filename = monitor_base_dir_ + "/bumonitor.txt";
151  bu_w_monitor_stream = fopen(filename.c_str(), "w+");
152  filename = monitor_base_dir_ + "/diskmonitor.txt";
153  bu_t_monitor_stream = fopen(filename.c_str(), "w+");
154  if (bu_t_monitor_stream == 0)
155  edm::LogWarning("EvFDaqDirector") << "Error creating bu write lock stream " << strerror(errno);
156 
157  // BU INITIALIZES LOCK FILE
158  // FU LOCK FILE OPEN
159  openFULockfileStream(fulockfile, true);
161  fflush(fu_rw_lock_stream);
162  close(fu_readwritelock_fd_);
163  createOutputDirectory(); // this should act not on the bu base dir but on the output disk
164  }
165  else
166  {
167  // for FU, check if bu base dir exists
168 
169  retval = mkdir(bu_base_dir_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
170  if (retval != 0 && errno != EEXIST) {
171  throw cms::Exception("DaqDirector") << " Error checking for bu base dir "
172  << base_dir_ << " mkdir error:" << strerror(errno) << "\n";
173  }
174 
176  std::string fulockfile = bu_run_dir_ + "/fu.lock";
177  openFULockfileStream(fulockfile, false);
178  }
179 
180 
181  }
182 
184  close(bu_readlock_fd_);
185  close(bu_writelock_fd_);
186  if (directorBu_) {
187  std::string filename = bu_base_dir_ + "/bu.lock";
188  removeFile(filename);
189  }
190  }
191 
193 
194  assert(run_ == id.run());
195 
196  // check if run dir exists or make it.
197  umask(0);
198  int retval = mkdir(run_dir_.c_str(),
199  S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
200  if (retval != 0 && errno != EEXIST) {
201  throw cms::Exception("DaqDirector") << " Error creating run dir "
202  << run_dir_ << " mkdir error:" << strerror(errno) << "\n";
203  }
204 
205  // check if the requested run is the latest one - issue a warning if it isn't
207  edm::LogWarning("EvFDaqDirector") << "DaqDirector Warning checking run dir "
208  << run_dir_ << " this is not the highest run "
210  }
211 
212 
213 
214  }
215 
217  int retval = mkdir(sm_base_dir_.c_str(),
218  S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
219  if (retval != 0 && errno != EEXIST) {
220  throw cms::Exception("DaqDirector") << " Error creating output dir "
221  << sm_base_dir_ << " mkdir error:" << strerror(errno) << "\n";
222  return false;
223  }
224  std::string mergedRunDir = sm_base_dir_ + "/" + run_string_;
225  std::string mergedDataDir = mergedRunDir + "/data";
226  std::string mergedMonDir = mergedRunDir + "/mon";
227  retval = mkdir(mergedRunDir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
228  if (retval != 0 && errno != EEXIST) {
229  throw cms::Exception("DaqDirector")
230  << " Error creating merged Run dir " << mergedDataDir
231  << " mkdir error:" << strerror(errno) << "\n";
232  return false;
233  }
234  retval
235  = mkdir(mergedDataDir.c_str(),
236  S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
237  if (retval != 0 && errno != EEXIST) {
238  throw cms::Exception("DaqDirector")
239  << " Error creating merged data dir " << mergedDataDir
240  << " mkdir error:" << strerror(errno) << "\n";
241  return false;
242  }
243  retval = mkdir(mergedMonDir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
244  if (retval != 0 && errno != EEXIST) {
245  throw cms::Exception("DaqDirector")
246  << " Error creating merged mon dir " << mergedMonDir
247  << " mkdir error:" << strerror(errno) << "\n";
248  return false;
249  }
250  return true;
251  }
252 
253  std::string EvFDaqDirector::getRawFilePath(const unsigned int ls, const unsigned int index) const {
254  return bu_run_dir_ + "/" + inputFileNameStem(ls, index) + ".raw";
255  }
256 
257  std::string EvFDaqDirector::getOpenRawFilePath(const unsigned int ls, const unsigned int index) const {
258  return bu_run_dir_ + "/open/" + inputFileNameStem(ls, index) + ".raw";
259  }
260 
261  std::string EvFDaqDirector::getOpenDatFilePath(const unsigned int ls, std::string const& stream) const {
262  return run_dir_ + "/open/" + outputFileNameStem(ls,stream) + ".dat";
263  }
264 
265  std::string EvFDaqDirector::getOutputJsonFilePath(const unsigned int ls, std::string const& stream) const {
266  return run_dir_ + "/" + outputFileNameStem(ls,stream) + ".jsn";
267  }
268 
269  std::string EvFDaqDirector::getMergedDatFilePath(const unsigned int ls, std::string const& stream) const {
270  return run_dir_ + "/" + mergedFileNameStem(ls,stream) + ".dat";
271  }
272 
274  return run_dir_ + "/" + initFileName(stream);
275  }
276 
278  return bu_run_dir_ + "/" + eolsFileName(ls);
279  }
280 
282  return run_dir_ + "/" + eolsFileName(ls);
283  }
284 
286  return bu_run_dir_ + "/" + eorFileName();
287  }
288 
290  return sm_base_dir_;
291  }
292 
294  int retval = remove(filename.c_str());
295  if (retval != 0)
296  std::cout << "Could not remove used file " << filename << " error "
297  << strerror(errno) << "\n";
298  }
299 
300  void EvFDaqDirector::removeFile(unsigned int ls, unsigned int index) {
301  removeFile(getRawFilePath(ls,index));
302  }
303 
304  void EvFDaqDirector::updateBuLock(unsigned int ls) {
305  int check = 0;
306  fcntl(bu_writelock_fd_, F_SETLKW, &bu_w_flk);
307  if (bu_w_lock_stream != 0) {
308  check = fseek(bu_w_lock_stream, 0, SEEK_SET);
309  if (check == 0)
310  fprintf(bu_w_lock_stream, "%u", ls);
311  else
312  edm::LogError("EvFDaqDirector")
313  << "seek on bu write lock for updating failed with error "
314  << strerror(errno);
315  } else
316  edm::LogError("EvFDaqDirector") << "bu write lock stream is invalid " << strerror(errno);
317 
318  fcntl(bu_writelock_fd_, F_SETLKW, &bu_w_fulk);
319  }
320 
321  bool EvFDaqDirector::updateFuLock(unsigned int& ls, std::string& nextFile,
322  bool& eorSeen) {
323  // close and reopen the stream / fd
324  close(fu_readwritelock_fd_);
325  std::string fulockfile = bu_run_dir_ + "/fu.lock";
326  fu_readwritelock_fd_ = open(fulockfile.c_str(), O_RDWR | O_NONBLOCK, S_IRWXU);
327  if (fu_readwritelock_fd_ == -1)
328  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock "
329  << strerror(errno);
330  else
331  edm::LogInfo("EvFDaqDirector") << "created filedesc for fureadwritelock "
333  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
334  edm::LogInfo("EvFDaqDirector") << "Reopened the fw FD & STREAM";
335 
336  // obtain lock on the fulock file - this call will block if the lock is held by another process
337  int retval = fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_flk);
338  //if locking fails just return here
339  if(retval!=0) return false;
340 
341  eorSeen = false;
342  struct stat buf;
343  eorSeen = (stat(getEoRFilePath().c_str(), &buf) == 0);
344  bool valid = false;
345 
346  // if the stream is readable
347  if (fu_rw_lock_stream != 0) {
348  unsigned int readLs, readIndex, jumpLs, jumpIndex;
349  int check = 0;
350  // rewind the stream
351  check = fseek(fu_rw_lock_stream, 0, SEEK_SET);
352  // if rewinded ok
353  if (check == 0) {
354  // read its' values
356  fscanf(fu_rw_lock_stream, "%u %u %u %u", &readLs, &readIndex,
357  &jumpLs, &jumpIndex);
358  else
359  fscanf(fu_rw_lock_stream, "%u %u", &readLs, &readIndex);
360 
361  // try to bump
362  bool bumpedOk = bumpFile(readLs, readIndex, nextFile);
363 
364  ls = readLs;
365  // there is a new file to grab or lumisection ended
366  if (bumpedOk) {
367  // rewind and clear
368  check = fseek(fu_rw_lock_stream, 0, SEEK_SET);
369  if (check == 0) {
370  ftruncate(fu_readwritelock_fd_, 0);
371  fflush(fu_rw_lock_stream); //this should not be needed ???
372  } else
373  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for updating failed with error "
374  << strerror(errno);
375  // write new data
376  check = fseek(fu_rw_lock_stream, 0, SEEK_SET);
377  if (check == 0) {
378  // write next index in the file, which is the file the next process should take
380  fprintf(fu_rw_lock_stream, "%u %u %u %u", readLs,
381  readIndex + 1, readLs + 2, readIndex + 1);
382  jumpLS_ = readLs + 2;
384  } else {
385  fprintf(fu_rw_lock_stream, "%u %u", readLs,
386  readIndex + 1);
387  }
388  fflush(fu_rw_lock_stream);
389  fsync(fu_readwritelock_fd_);
390 
391  valid = true;
392 
394  edm::LogInfo("EvFDaqDirector")<< "Written to file: " << readLs << ":"
395  << readIndex + 1 << " --> " << readLs + 2
396  << ":" << readIndex + 1;
397  else
398  edm::LogInfo("EvFDaqDirector")<< "Written to file: " << readLs << ":"
399  << readIndex + 1;
400 
401  } else
402  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for updating failed with error "
403  << strerror(errno) << std::endl;
404  }
405  } else
406  edm::LogError("EvFDaqDirector") << "seek on fu read/write lock for reading failed with error "
407  << strerror(errno) << std::endl;
408  } else
409  edm::LogError("EvFDaqDirector") << "fu read/write lock stream is invalid " << strerror(errno);
410  //release flock at this point
411  fcntl(fu_readwritelock_fd_, F_SETLKW, &fu_rw_fulk);
412 
413  return valid;
414  }
415 
417  int retval = -1;
418  // check if lock file has disappeared and if so whether directory is empty (signal end of run)
420  return retval;
421  if (fcntl(bu_readlock_fd_, F_SETLKW, &bu_r_flk) != 0)
422  retval = 0;
423  if (bu_r_lock_stream) {
424  unsigned int readval;
425  int check = 0;
426  unsigned int *p = &readval;
427  check = fseek(bu_r_lock_stream, 0, SEEK_SET);
428  if (check == 0) {
429  fscanf(bu_r_lock_stream, "%u", p);
430  retval = readval;
431  }
432  } else {
433  edm::LogError("EvFDaqDirector") << "error reading bu lock file " << strerror(errno);
434  retval = -1;
435  }
436  fcntl(bu_readlock_fd_, F_SETLKW, &bu_r_fulk);
437  std::cout << "readbulock returning " << retval << std::endl;
438  return retval;
439  }
440 
441 
442 
444  struct stat buf;
445  std::string lockfile = bu_base_dir_;
446  lockfile += "/bu.lock";
447  bool retval = (stat(lockfile.c_str(), &buf) == 0);
448  if (!retval) {
449  close(bu_readlock_fd_);
450  close(bu_writelock_fd_);
451  }
452  std::cout << "stat of lockfile returned " << retval << std::endl;
453  return retval;
454  }
455 
457  struct stat buf;
458  std::string lockfile = bu_base_dir_;
459  lockfile += "/fu.lock";
460  bool retval = (stat(lockfile.c_str(), &buf) == 0);
461  if (!retval) {
462  close(fu_readwritelock_fd_);
463  close(fu_readwritelock_fd_); // why the second close ?
464  }
465  std::cout << "stat of lockfile returned " << retval << std::endl;
466  return retval;
467  }
468 
469  void EvFDaqDirector::writeLsStatisticsBU(unsigned int ls, unsigned int events,
470  unsigned long long totsize, long long lsusec) {
471  if (bu_w_monitor_stream != 0) {
472  int check = fseek(bu_w_monitor_stream, 0, SEEK_SET);
473  if (check == 0) {
474  fprintf(bu_w_monitor_stream, "%u %u %llu %f %f %012lld", ls,
475  events, totsize,
476  double(totsize) / double(events) / 1000000.,
477  double(totsize) / double(lsusec), lsusec);
478  fflush(bu_w_monitor_stream);
479  } else
480  std::cout
481  << "seek on bu write monitor for updating failed with error "
482  << strerror(errno) << std::endl;
483  } else
484  std::cout << "bu write monitor stream is invalid " << strerror(errno)
485  << std::endl;
486 
487  }
488  void EvFDaqDirector::writeDiskAndThrottleStat(double fraction, int highWater,
489  int lowWater) {
490  if (bu_t_monitor_stream != 0) {
491  int check = fseek(bu_t_monitor_stream, 0, SEEK_SET);
492  if (check == 0)
493  fprintf(bu_t_monitor_stream, "%f %d %d", fraction, highWater,
494  lowWater);
495  else
496  std::cout
497  << "seek on disk write monitor for updating failed with error "
498  << strerror(errno) << std::endl;
499  } else
500  std::cout << "disk write monitor stream is invalid " << strerror(errno)
501  << std::endl;
502 
503  }
504 
505  bool EvFDaqDirector::bumpFile(unsigned int& ls, unsigned int& index, std::string& nextFile) {
506 
507  if (previousFileSize_ != 0) {
509  evf::MicroStateService>().operator->());
511  previousFileSize_ = 0;
512  }
513 
514  struct stat buf;
515  std::stringstream ss;
516  unsigned int nextIndex = index;
517  nextIndex++;
518 
519  // 1. Check suggested file
520  nextFile = getRawFilePath(ls,index);
521  bool found = (stat(nextFile.c_str(), &buf) == 0);
522  // if found
523  if (found) {
524  //grabbedFileSize = buf.st_size;
525  previousFileSize_ = buf.st_size;
526  return true;
527  }
528  // 2. No file -> lumi ended? (and how many?)
529  else {
530  bool eolFound = (stat(getEoLSFilePathOnBU(ls).c_str(), &buf) == 0);
531  unsigned int startingLumi = ls;
532  while (eolFound) {
533  // this lumi ended, check for files
534  ++ls;
535  nextFile = getRawFilePath(ls,0);
536  found = (stat(nextFile.c_str(), &buf) == 0);
537  // update highest ls even if there is no file
538  // input source can now end its' LS when an EoL jsn file is seen
539  if (found) {
540  // a new file was found at new lumisection, index 0
541  index = 0;
542  //grabbedFileSize = buf.st_size;
543  previousFileSize_ = buf.st_size;
544 
546  // rename ended lumi to + 2
547  string sourceEol = getEoLSFilePathOnBU(startingLumi);
548 
549  string destEol = getEoLSFilePathOnBU(startingLumi+2);
550 
551  string cpCmd = "cp " + sourceEol + " " + destEol;
552  edm::LogInfo("EvFDaqDirector") << " testmode: Running copy cmd = " << cpCmd;
553  int rc = system(cpCmd.c_str());
554  if (rc != 0) {
555  edm::LogError("EvFDaqDirector") << " testmode: COPY EOL FAILED!!!!!: " << cpCmd;
556  }
557  }
558 
559  return true;
560  }
561  eolFound = (stat(getEoLSFilePathOnBU(ls).c_str(), &buf) == 0);
562  }
563  }
564  // no new file found
565  return false;
566  }
567 
569  boost::filesystem::path highestRunDirPath (findHighestRunDir());
570  return highestRunDirPath.filename().string();
571  }
572 
574  if (fu_rw_lock_stream == 0)
575  edm::LogError("EvFDaqDirector") << "Error creating fu read/write lock stream "
576  << strerror(errno);
577  else {
578  edm::LogInfo("EvFDaqDirector") << "Initializing FU LOCK FILE";
579  unsigned int readLs = 1, readIndex = 0, jumpLs = 3, jumpIndex = 0;
581  fprintf(fu_rw_lock_stream, "%u %u %u %u", readLs, readIndex,
582  jumpLs, jumpIndex);
583  else
584  fprintf(fu_rw_lock_stream, "%u %u", readLs, readIndex);
585  }
586  }
587 
589  if (create) {
590  fu_readwritelock_fd_ = open(fulockfile.c_str(), O_RDWR | O_CREAT,
591  S_IRWXU | S_IRWXG | S_IROTH | S_IRWXO | S_IXOTH);
592  } else {
593  fu_readwritelock_fd_ = open(fulockfile.c_str(), O_RDWR, S_IRWXU);
594  }
595  if (fu_readwritelock_fd_ == -1)
596  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for fuwritelock "
597  << strerror(errno);
598  else
599  edm::LogInfo("EvFDaqDirector") << "creating filedesc for fureadwritelock "
601 
602  fu_rw_lock_stream = fdopen(fu_readwritelock_fd_, "r+");
603  }
604 
605  //create if does not exist then lock the merge destination file
607  data_rw_stream = fopen(getMergedDatFilePath(ls,stream).c_str(), "a"); //open stream for appending
609  if (data_readwrite_fd_ == -1)
610  edm::LogError("EvFDaqDirector") << "problem with creating filedesc for datamerge "
611  << strerror(errno);
612  else
613  edm::LogInfo("EvFDaqDirector") << "creating filedesc for datamerge "
615  fcntl(data_readwrite_fd_, F_SETLKW, &data_rw_flk);
616 
617  return data_rw_stream;
618  }
619 
621  fflush(data_rw_stream);
622  fcntl(data_readwrite_fd_, F_SETLKW, &data_rw_fulk);
623  fclose(data_rw_stream);
624  }
625 
626  std::string EvFDaqDirector::inputFileNameStem(const unsigned int ls, const unsigned int index) const {
627  std::stringstream ss;
628  ss << run_string_
629  << "_ls" << std::setfill('0') << std::setw(4) << ls
630  << "_index" << std::setfill('0') << std::setw(6) << index;
631  return ss.str();
632  }
633 
634  std::string EvFDaqDirector::outputFileNameStem(const unsigned int ls, std::string const& stream) const {
635  std::stringstream ss;
636  ss << run_string_
637  << "_ls" << std::setfill('0') << std::setw(4) << ls
638  << "_" << stream
639  << "_pid" << std::setfill('0') << std::setw(5) << getpid();
640  return ss.str();
641  }
642 
643  std::string EvFDaqDirector::mergedFileNameStem(const unsigned int ls, std::string const& stream) const {
644  std::stringstream ss;
645  ss << run_string_
646  << "_ls" << std::setfill('0') << std::setw(4) << ls
647  << "_" << stream
648  << "_" << hostname_;
649  return ss.str();
650  }
651 
653  std::stringstream ss;
654  ss << run_string_
655  << "_" << stream
656  << "_pid" << std::setfill('0') << std::setw(5) << getpid()
657  << ".ini";
658  return ss.str();
659  }
660 
661  std::string EvFDaqDirector::eolsFileName(const unsigned int ls) const {
662  std::stringstream ss;
663  ss << "EoLS_" << std::setfill('0') << std::setw(4) << ls << ".jsn";
664  return ss.str();
665  }
666 
668  std::stringstream ss;
669  ss << "EoR_" << std::setfill('0') << std::setw(6) << run_ << ".jsn";
670  return ss.str();
671  }
672 }
struct flock bu_w_fulk
type
Definition: HCALResponse.h:21
struct flock fu_rw_flk
std::string run_string_
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
void postEndRun(edm::Run const &run, edm::EventSetup const &es)
std::string outputFileNameStem(const unsigned int ls, std::string const &stream) const
bool checkDirEmpty(std::string &)
Definition: DirManager.cc:53
void openFULockfileStream(std::string &fuLockFilePath, bool create)
std::string getInitFilePath(std::string const &stream) const
struct flock bu_r_fulk
std::string getRawFilePath(const unsigned int ls, const unsigned int index) const
std::string getOpenRawFilePath(const unsigned int ls, const unsigned int index) const
struct flock data_rw_flk
std::string eolsFileName(const unsigned int ls) const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::string getEoRFilePath() const
std::string findHighestRunDirStem()
unsigned long previousFileSize_
std::string sm_base_dir_
unsigned int jumpIndex_
std::string getPathForFU() const
std::string monitor_base_dir_
struct flock fu_rw_fulk
EvFDaqDirector(const edm::ParameterSet &pset, edm::ActivityRegistry &reg)
std::string getMergedDatFilePath(const unsigned int ls, std::string const &stream) const
bool check(const DataFrame &df, bool capcheck, bool dvercheck)
bool updateFuLock(unsigned int &ls, std::string &nextFile, bool &eorSeen)
std::string getOpenDatFilePath(const unsigned int ls, std::string const &stream) const
std::string inputFileNameStem(const unsigned int ls, const unsigned int index) const
FILE * maybeCreateAndLockFileHeadForStream(unsigned int ls, std::string &stream)
std::string findHighestRunDir()
std::string bu_base_dir_
void removeFile(unsigned int ls, unsigned int index)
void preBeginRun(edm::RunID const &id, edm::Timestamp const &ts)
void updateBuLock(unsigned int ls)
std::string getOutputJsonFilePath(const unsigned int ls, std::string const &stream) const
struct flock bu_w_flk
tuple pid
Definition: sysUtil.py:22
std::string bu_run_dir_
std::string eorFileName() const
bool bumpFile(unsigned int &ls, unsigned int &index, std::string &nextFile)
std::string base_dir_
std::string findHighestRunDir()
Definition: DirManager.cc:20
tuple events
Definition: patZpeak.py:19
tuple filename
Definition: lut2db_cfg.py:20
tuple cout
Definition: gather_cfg.py:121
void writeDiskAndThrottleStat(double, int, int)
#define O_NONBLOCK
Definition: SysFile.h:21
unsigned int jumpLS_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void watchPostEndRun(PostEndRun::slot_type const &iSlot)
volatile std::atomic< bool > shutdown_flag false
struct flock data_rw_fulk
void accummulateFileSize(unsigned long fileSize)
SurfaceDeformation * create(int type, const std::vector< double > &params)
struct flock bu_r_flk
std::string mergedFileNameStem(const unsigned int ls, std::string const &stream) const
void writeLsStatisticsBU(unsigned int, unsigned int, unsigned long long, long long)
Definition: Run.h:41
std::string bu_run_open_dir_
std::string initFileName(std::string const &stream) const
void watchPreBeginRun(PreBeginRun::slot_type const &iSlot)