CMS 3D CMS Logo

DAQSource.cc
Go to the documentation of this file.
1 #include <sstream>
2 #include <unistd.h>
3 #include <vector>
4 #include <chrono>
5 #include <algorithm>
6 
11 
21 
26 
27 //JSON file reader
29 
30 using namespace evf::FastMonState;
31 
33  : edm::RawInputSource(pset, desc),
34  dataModeConfig_(pset.getUntrackedParameter<std::string>("dataMode")),
35  eventChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkSize")) << 20),
36  maxChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("maxChunkSize")) << 20),
37  eventChunkBlock_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkBlock")) << 20),
38  numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers")),
39  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles")),
40  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
41  verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum")),
42  useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID")),
43  testTCDSFEDRange_(pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange")),
44  listFileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames")),
45  fileListMode_(pset.getUntrackedParameter<bool>("fileListMode")),
46  fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
47  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
48  processHistoryID_(),
49  currentLumiSection_(0),
50  eventsThisLumi_(0),
51  rng_(std::chrono::system_clock::now().time_since_epoch().count()) {
52  char thishost[256];
53  gethostname(thishost, 255);
54 
55  if (maxChunkSize_ == 0)
57  else if (maxChunkSize_ < eventChunkSize_)
58  throw cms::Exception("DAQSource::DAQSource") << "maxChunkSize must be equal or larger than eventChunkSize";
59 
60  if (eventChunkBlock_ == 0)
63  throw cms::Exception("DAQSource::DAQSource") << "eventChunkBlock must be equal or smaller than eventChunkSize";
64 
65  edm::LogInfo("DAQSource") << "Construction. read-ahead chunk size -: " << std::endl
66  << (eventChunkSize_ >> 20) << " MB on host " << thishost << " in mode " << dataModeConfig_;
67 
68  uint16_t MINTCDSuTCAFEDID = FEDNumbering::MINTCDSuTCAFEDID;
69  uint16_t MAXTCDSuTCAFEDID = FEDNumbering::MAXTCDSuTCAFEDID;
70 
71  if (!testTCDSFEDRange_.empty()) {
72  if (testTCDSFEDRange_.size() != 2) {
73  throw cms::Exception("DAQSource::DAQSource") << "Invalid TCDS Test FED range parameter";
74  }
75  MINTCDSuTCAFEDID = testTCDSFEDRange_[0];
76  MAXTCDSuTCAFEDID = testTCDSFEDRange_[1];
77  }
78 
79  //load mode class based on parameter
80  if (dataModeConfig_ == "FRD") {
81  dataMode_.reset(new DataModeFRD(this));
82  } else if (dataModeConfig_ == "FRDStriped") {
83  dataMode_.reset(new DataModeFRDStriped(this));
84  } else
85  throw cms::Exception("DAQSource::DAQSource") << "Unknown data mode " << dataModeConfig_;
86 
88 
89  dataMode_->setTCDSSearchRange(MINTCDSuTCAFEDID, MAXTCDSuTCAFEDID);
90  dataMode_->setTesting(pset.getUntrackedParameter<bool>("testing", false));
91 
92  long autoRunNumber = -1;
93  if (fileListMode_) {
94  autoRunNumber = initFileList();
95  if (!fileListLoopMode_) {
96  if (autoRunNumber < 0)
97  throw cms::Exception("DAQSource::DAQSource") << "Run number not found from filename";
98  //override run number
99  runNumber_ = (edm::RunNumber_t)autoRunNumber;
100  daqDirector_->overrideRunNumber((unsigned int)autoRunNumber);
101  }
102  }
103 
104  dataMode_->makeDirectoryEntries(daqDirector_->getBUBaseDirs(), daqDirector_->runString());
105 
106  auto& daqProvenanceHelpers = dataMode_->makeDaqProvenanceHelpers();
107  for (const auto& daqProvenanceHelper : daqProvenanceHelpers)
108  processHistoryID_ = daqProvenanceHelper->daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
109  setNewRun();
110  //todo:autodetect from file name (assert if names differ)
112 
113  //make sure that chunk size is N * block size
118 
119  if (!numBuffers_)
120  throw cms::Exception("DAQSource::DAQSource") << "no reading enabled with numBuffers parameter 0";
121 
123  assert(numBuffers_ > 1);
124  readingFilesCount_ = 0;
125 
126  if (!crc32c_hw_test())
127  edm::LogError("DAQSource::DAQSource") << "Intel crc32c checksum computation unavailable";
128 
129  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
130  if (fileListMode_) {
131  try {
133  } catch (cms::Exception const&) {
134  edm::LogInfo("DAQSource") << "No FastMonitoringService found in the configuration";
135  }
136  } else {
138  if (!fms_) {
139  throw cms::Exception("DAQSource") << "FastMonitoringService not found";
140  }
141  }
142 
144  if (!daqDirector_)
145  cms::Exception("DAQSource") << "EvFDaqDirector not found";
146 
147  edm::LogInfo("DAQSource") << "EvFDaqDirector/Source configured to use file service";
148  //set DaqDirector to delete files in preGlobalEndLumi callback
150  if (fms_) {
152  fms_->setInputSource(this);
155  }
156  //should delete chunks when run stops
157  for (unsigned int i = 0; i < numBuffers_; i++) {
159  }
160 
161  quit_threads_ = false;
162 
163  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
164  std::unique_lock<std::mutex> lk(startupLock_);
165  //issue a memory fence here and in threads (constructor was segfaulting without this)
166  thread_quit_signal.push_back(false);
167  workerJob_.push_back(ReaderInfo(nullptr, nullptr));
168  cvReader_.push_back(new std::condition_variable);
169  tid_active_.push_back(0);
170  threadInit_.store(false, std::memory_order_release);
171  workerThreads_.push_back(new std::thread(&DAQSource::readWorker, this, i));
172  startupCv_.wait(lk);
173  }
174 
175  runAuxiliary()->setProcessHistoryID(processHistoryID_);
176 }
177 
179  quit_threads_ = true;
180 
181  //delete any remaining open files
182  if (!fms_ || !fms_->exceptionDetected()) {
183  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
184  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
185  it->second.reset();
186  } else {
187  //skip deleting files with exception
188  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
189  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
190  if (fms_->isExceptionOnData(it->second->lumi_))
191  it->second->unsetDeleteFile();
192  else
193  it->second.reset();
194  }
195  //disable deleting current file with exception
196  if (currentFile_.get())
197  if (fms_->isExceptionOnData(currentFile_->lumi_))
198  currentFile_->unsetDeleteFile();
199  }
200 
202  readSupervisorThread_->join();
203  } else {
204  //join aux threads in case the supervisor thread was not started
205  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
206  std::unique_lock<std::mutex> lk(mReader_);
207  thread_quit_signal[i] = true;
208  cvReader_[i]->notify_one();
209  lk.unlock();
210  workerThreads_[i]->join();
211  delete workerThreads_[i];
212  }
213  }
214  for (unsigned int i = 0; i < numConcurrentReads_; i++)
215  delete cvReader_[i];
216  /*
217  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
218  InputChunk *ch;
219  while (!freeChunks_.try_pop(ch)) {}
220  delete ch;
221  }
222  */
223 }
224 
227  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk (unified)");
228  desc.addUntracked<std::string>("dataMode", "FRD")->setComment("Data mode: event 'FRD', 'FRSStriped', 'ScoutingRun2'");
229  desc.addUntracked<unsigned int>("eventChunkSize", 64)->setComment("Input buffer (chunk) size");
230  desc.addUntracked<unsigned int>("maxChunkSize", 0)
231  ->setComment("Maximum chunk size allowed if buffer is resized to fit data. If 0 is specifier, use chunk size");
232  desc.addUntracked<unsigned int>("eventChunkBlock", 0)
233  ->setComment(
234  "Block size used in a single file read call (must be smaller or equal to the initial chunk buffer size). If "
235  "0 is specified, use chunk size.");
236 
237  desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
238  desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
239  ->setComment("Maximum number of simultaneously buffered raw files");
240  desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
241  ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
242  desc.addUntracked<bool>("verifyChecksum", true)
243  ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
244  desc.addUntracked<bool>("useL1EventID", false)
245  ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
246  desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
247  ->setComment("[min, max] range to search for TCDS FED ID in test setup");
248  desc.addUntracked<bool>("fileListMode", false)
249  ->setComment("Use fileNames parameter to directly specify raw files to open");
250  desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
251  ->setComment("file list used when fileListMode is enabled");
252  desc.setAllowAnything();
253  descriptions.add("source", desc);
254 }
255 
258  std::unique_lock<std::mutex> lk(startupLock_);
259 
260  //this thread opens new files and dispatches reading to worker readers
261  readSupervisorThread_ = std::make_unique<std::thread>(&DAQSource::readSupervisor, this);
263 
264  startupCv_.wait(lk);
265  }
266 
267  //signal hltd to start event accounting
268  if (!currentLumiSection_)
271 
272  auto nextEvent = [this]() {
273  auto getNextEvent = [this]() {
274  //for some models this is always true (if one event is one block)
275  if (dataMode_->dataBlockCompleted()) {
276  return getNextDataBlock();
277  } else {
278  return getNextEventFromDataBlock();
279  }
280  };
281 
283  while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
284  if (edm::shutdown_flag.load(std::memory_order_relaxed))
285  break;
286  }
287  return status;
288  };
289 
290  switch (nextEvent()) {
292  //maybe create EoL file in working directory before ending run
293  struct stat buf;
294  //also create EoR file in FU data directory
295  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
296  if (!eorFound) {
297  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
298  O_RDWR | O_CREAT,
299  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
300  close(eor_fd);
301  }
303  eventsThisLumi_ = 0;
305  edm::LogInfo("DAQSource") << "----------------RUN ENDED----------------";
306  return Next::kStop;
307  }
309  //this is not reachable
310  return Next::kEvent;
311  }
313  //std::cout << "--------------NEW LUMI---------------" << std::endl;
314  return Next::kEvent;
315  }
316  default: {
319  else
320  eventRunNumber_ = dataMode_->run();
321 
322  setEventCached();
323 
324  return Next::kEvent;
325  }
326  }
327 }
328 
329 void DAQSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
330  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
331  currentLumiSection_ = lumiSection;
332 
334 
335  timeval tv;
336  gettimeofday(&tv, nullptr);
337  const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
338 
340  runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
341 
342  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
343  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
344 
345  edm::LogInfo("DAQSource") << "New lumi section was opened. LUMI -: " << lumiSection;
346  }
347 }
348 
351 
352  bool found = dataMode_->nextEventView();
353  //file(s) completely parsed
354  if (!found) {
355  if (dataMode_->dataBlockInitialized()) {
356  dataMode_->setDataBlockInitialized(false);
357  //roll position to the end of the file to close it
358  currentFile_->bufferPosition_ = currentFile_->fileSize_;
359  }
361  }
362 
363  if (verifyChecksum_ && !dataMode_->checksumValid()) {
364  if (fms_)
366  throw cms::Exception("DAQSource::getNextEvent") << dataMode_->getChecksumError();
367  }
369 
370  currentFile_->nProcessed_++;
371 
373 }
374 
376  if (setExceptionState_)
377  threadError();
378  if (!currentFile_.get()) {
381  if (!fileQueue_.try_pop(currentFile_)) {
382  //sleep until wakeup (only in single-buffer mode) or timeout
383  std::unique_lock<std::mutex> lkw(mWakeup_);
384  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
386  }
387  status = currentFile_->status_;
390  currentFile_.reset();
391  return status;
392  } else if (status == evf::EvFDaqDirector::runAbort) {
393  throw cms::Exception("DAQSource::getNextEvent") << "Run has been aborted by the input source reader thread";
394  } else if (status == evf::EvFDaqDirector::newLumi) {
396  if (currentFile_->lumi_ > currentLumiSection_) {
398  eventsThisLumi_ = 0;
400  }
401  currentFile_.reset();
402  return status;
403  } else if (status == evf::EvFDaqDirector::newFile) {
405  } else
406  assert(false);
407  }
409 
410  //file is empty
411  if (!currentFile_->fileSize_) {
413  //try to open new lumi
414  assert(currentFile_->nChunks_ == 0);
415  if (currentFile_->lumi_ > currentLumiSection_) {
417  eventsThisLumi_ = 0;
419  }
420  //immediately delete empty file
421  currentFile_.reset();
423  }
424 
425  //file is finished
426  if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
428  //release last chunk (it is never released elsewhere)
429  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
430  if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
431  throw cms::Exception("DAQSource::getNextEvent")
432  << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
433  << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
434  }
436  //put the file in pending delete list;
437  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
438  filesToDelete_.push_back(
439  std::pair<int, std::unique_ptr<RawInputFile>>(currentFileIndex_, std::move(currentFile_)));
440  } else {
441  //in single-thread and stream jobs, events are already processed
442  currentFile_.reset();
443  }
445  }
446 
447  //assert(currentFile_->status_ == evf::EvFDaqDirector::newFile);
448 
449  //handle RAW file header
450  if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
451  if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
452  if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
453  throw cms::Exception("DAQSource::getNextEvent") << "Premature end of input file while reading file header";
454 
455  edm::LogWarning("DAQSource") << "File with only raw header and no events received in LS " << currentFile_->lumi_;
456  if (currentFile_->lumi_ > currentLumiSection_) {
458  eventsThisLumi_ = 0;
460  }
461  }
462 
463  //advance buffer position to skip file header (chunk will be acquired later)
464  currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
465  currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
466  }
467 
468  //file is too short
469  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < dataMode_->headerSize()) {
470  throw cms::Exception("DAQSource::getNextEvent") << "Premature end of input file while reading event header";
471  }
472 
473  //multibuffer mode
474  //wait for the current chunk to become added to the vector
476  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
477  usleep(10000);
478  if (setExceptionState_)
479  threadError();
480  }
482 
483  //check if header is at the boundary of two chunks
484  chunkIsFree_ = false;
485  unsigned char* dataPosition;
486 
487  //read event header, copy it to a single chunk if necessary
488  bool chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize());
489 
490  //get buffer size of current chunk (can be resized)
491  uint64_t currentChunkSize = currentFile_->currentChunkSize();
492 
493  dataMode_->makeDataBlockView(dataPosition, currentChunkSize, currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
494 
495  const size_t msgSize = dataMode_->dataBlockSize() - dataMode_->headerSize();
496 
497  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
498  throw cms::Exception("DAQSource::getNextEvent") << "Premature end of input file while reading event data";
499  }
500 
501  //for cross-buffer models
502  if (chunkEnd) {
503  //header was at the chunk boundary, we will have to move payload as well
504  currentFile_->moveToPreviousChunk(msgSize, dataMode_->headerSize());
505  chunkIsFree_ = true;
506  } else {
507  //header was contiguous, but check if payload fits the chunk
508  if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) {
509  //rewind to header start position
510  currentFile_->rewindChunk(dataMode_->headerSize());
511  //copy event to a chunk start and move pointers
512 
514 
515  //can already move buffer
516  chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize() + msgSize);
517 
519 
520  assert(chunkEnd);
521  chunkIsFree_ = true;
522  //header is moved
523  dataMode_->makeDataBlockView(
524  dataPosition, currentFile_->currentChunkSize(), currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
525  } else {
526  //everything is in a single chunk, only move pointers forward
527  chunkEnd = currentFile_->advance(dataPosition, msgSize);
528  assert(!chunkEnd);
529  chunkIsFree_ = false;
530  }
531  }
532  //prepare event
533  return getNextEventFromDataBlock();
534 }
535 
536 void DAQSource::read(edm::EventPrincipal& eventPrincipal) {
538 
539  dataMode_->readEvent(eventPrincipal);
540 
541  eventsThisLumi_++;
543 
544  //resize vector if needed
545  while (streamFileTracker_.size() <= eventPrincipal.streamID())
546  streamFileTracker_.push_back(-1);
547 
548  streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
549 
550  //this old file check runs no more often than every 10 events
551  if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
552  //delete files that are not in processing
553  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
554  auto it = filesToDelete_.begin();
555  while (it != filesToDelete_.end()) {
556  bool fileIsBeingProcessed = false;
557  for (unsigned int i = 0; i < nStreams_; i++) {
558  if (it->first == streamFileTracker_.at(i)) {
559  fileIsBeingProcessed = true;
560  break;
561  }
562  }
563  if (!fileIsBeingProcessed && !(fms_ && fms_->isExceptionOnData(it->second->lumi_))) {
564  it = filesToDelete_.erase(it);
565  } else
566  it++;
567  }
568  }
569  if (dataMode_->dataBlockCompleted() && chunkIsFree_) {
570  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
571  chunkIsFree_ = false;
572  }
574  return;
575 }
576 
578 
580 
582  bool stop = false;
583  unsigned int currentLumiSection = 0;
584 
585  {
586  std::unique_lock<std::mutex> lk(startupLock_);
587  startupCv_.notify_one();
588  }
589 
590  uint32_t ls = 0;
591  uint32_t monLS = 1;
592  uint32_t lockCount = 0;
593  uint64_t sumLockWaitTimeUs = 0.;
594 
595  bool requireHeader = dataMode_->requireHeader();
596 
597  while (!stop) {
598  //wait for at least one free thread and chunk
599  int counter = 0;
600 
601  while (workerPool_.empty() || freeChunks_.empty() || readingFilesCount_ >= maxBufferedFiles_) {
602  //report state to monitoring
603  if (fms_) {
604  bool copy_active = false;
605  for (auto j : tid_active_)
606  if (j)
607  copy_active = true;
610  else if (freeChunks_.empty()) {
611  if (copy_active)
613  else
615  } else {
616  if (copy_active)
618  else
620  }
621  }
622  std::unique_lock<std::mutex> lkw(mWakeup_);
623  //sleep until woken up by condition or a timeout
624  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
625  counter++;
626  //if (!(counter%50)) edm::LogInfo("DAQSource") << "No free chunks or threads...";
627  LogDebug("DAQSource") << "No free chunks or threads...";
628  } else {
629  assert(!workerPool_.empty() || freeChunks_.empty());
630  }
631  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
632  stop = true;
633  break;
634  }
635  }
636  //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
637 
638  if (stop)
639  break;
640 
641  //look for a new file
642  std::string nextFile;
643  int64_t fileSizeFromMetadata;
644 
645  if (fms_) {
648  }
649  bool fitToBuffer = dataMode_->fitToBuffer();
650 
652  uint16_t rawHeaderSize = 0;
653  uint32_t lsFromRaw = 0;
654  int32_t serverEventsInNewFile = -1;
655  int rawFd = -1;
656 
657  int backoff_exp = 0;
658 
659  //entering loop which tries to grab new file from ramdisk
661  //check if hltd has signalled to throttle input
662  counter = 0;
663  while (daqDirector_->inputThrottled()) {
664  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
665  break;
666 
667  unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
668  unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
669  unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
670  bool hasDiscardedLumi = false;
671  for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
673  edm::LogWarning("DAQSource") << "Source detected that the lumisection is discarded -: " << i;
674  hasDiscardedLumi = true;
675  break;
676  }
677  }
678  if (hasDiscardedLumi)
679  break;
680 
682  if (!(counter % 50))
683  edm::LogWarning("DAQSource") << "Input throttled detected, reading files is paused...";
684  usleep(100000);
685  counter++;
686  }
687 
688  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
689  stop = true;
690  break;
691  }
692 
693  assert(rawFd == -1);
694  uint64_t thisLockWaitTimeUs = 0.;
696  if (fileListMode_) {
697  //return LS if LS not set, otherwise return file
698  status = getFile(ls, nextFile, thisLockWaitTimeUs);
700  uint16_t rawDataType;
702  rawFd,
703  rawHeaderSize,
704  rawDataType,
705  lsFromRaw,
706  serverEventsInNewFile,
707  fileSizeFromMetadata,
708  requireHeader,
709  false,
710  false) != 0) {
711  //error
712  setExceptionState_ = true;
713  stop = true;
714  break;
715  }
716  }
717  } else {
719  ls,
720  nextFile,
721  rawFd,
722  rawHeaderSize, //which format?
723  serverEventsInNewFile,
724  fileSizeFromMetadata,
725  thisLockWaitTimeUs,
726  requireHeader);
727  }
728 
730 
731  //cycle through all remaining LS even if no files get assigned
734 
735  //monitoring of lock wait time
736  if (thisLockWaitTimeUs > 0.)
737  sumLockWaitTimeUs += thisLockWaitTimeUs;
738  lockCount++;
739  if (ls > monLS) {
740  monLS = ls;
741  if (lockCount)
742  if (fms_)
743  fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
744  lockCount = 0;
745  sumLockWaitTimeUs = 0;
746  }
747 
749  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded));
750  stop = true;
751  break;
752  }
753 
754  //error from filelocking function
756  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
757  stop = true;
758  break;
759  }
760  //queue new lumisection
761  if (ls > currentLumiSection) {
762  //new file service
765  //start transitions from LS specified by env, continue if not reached
766  if (ls < daqDirector_->getStartLumisectionFromEnv()) {
767  //skip file if from earlier LS than specified by env
768  if (rawFd != -1) {
769  close(rawFd);
770  rawFd = -1;
771  }
773  continue;
774  } else {
775  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
776  }
777  } else if (ls < 100) {
778  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
779  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
780 
781  for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
782  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
783  }
784  } else {
785  //start from current LS
786  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
787  }
788  } else {
789  //queue all lumisections after last one seen to avoid gaps
790  for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
791  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
792  }
793  }
795  }
796  //else
798  edm::LogError("DAQSource") << "Got old LS (" << ls
799  << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
800  << ". Aborting execution." << std::endl;
801  if (rawFd != -1)
802  close(rawFd);
803  rawFd = -1;
804  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
805  stop = true;
806  break;
807  }
808 
809  int dbgcount = 0;
812  dbgcount++;
813  if (!(dbgcount % 20))
814  LogDebug("DAQSource") << "No file for me... sleep and try again...";
815 
816  backoff_exp = std::min(4, backoff_exp); // max 1.6 seconds
817  //backoff_exp=0; // disabled!
818  int sleeptime = (int)(100000. * pow(2, backoff_exp));
819  usleep(sleeptime);
820  backoff_exp++;
821  } else
822  backoff_exp = 0;
823  }
824  //end of file grab loop, parse result
827  LogDebug("DAQSource") << "The director says to grab -: " << nextFile;
828 
829  std::string rawFile;
830  //file service will report raw extension
831  rawFile = nextFile;
832 
833  struct stat st;
834  int stat_res = stat(rawFile.c_str(), &st);
835  if (stat_res == -1) {
836  edm::LogError("DAQSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
837  setExceptionState_ = true;
838  break;
839  }
840  uint64_t fileSize = st.st_size;
841 
842  if (fms_) {
846  }
847  int eventsInNewFile;
848  if (fileListMode_) {
849  if (fileSize == 0)
850  eventsInNewFile = 0;
851  else
852  eventsInNewFile = -1;
853  } else {
854  eventsInNewFile = serverEventsInNewFile;
855  assert(eventsInNewFile >= 0);
856  assert((eventsInNewFile > 0) ==
857  (fileSize > rawHeaderSize)); //file without events must be empty or contain only header
858  }
859 
860  std::pair<bool, std::vector<std::string>> additionalFiles =
861  dataMode_->defineAdditionalFiles(rawFile, fileListMode_);
862  if (!additionalFiles.first) {
863  //skip secondary files from file broker
864  if (rawFd > -1)
865  close(rawFd);
866  continue;
867  }
868 
869  std::unique_ptr<RawInputFile> newInputFile(new RawInputFile(evf::EvFDaqDirector::FileStatus::newFile,
870  ls,
871  rawFile,
872  !fileListMode_,
873  rawFd,
874  fileSize,
875  rawHeaderSize, //for which format
876  0,
877  eventsInNewFile,
878  this));
879 
880  uint64_t neededSize = fileSize;
881  for (const auto& addFile : additionalFiles.second) {
882  struct stat buf;
883  //wait for secondary files to appear
884  unsigned int fcnt = 0;
885  while (stat(addFile.c_str(), &buf) != 0) {
886  if (fileListMode_) {
887  edm::LogError("DAQSource") << "additional file is missing -: " << addFile;
888  stop = true;
889  setExceptionState_ = true;
890  break;
891  }
892  usleep(10000);
893  fcnt++;
894  //report and EoR check every 30 seconds
895  if ((fcnt && fcnt % 3000 == 0) || quit_threads_.load(std::memory_order_relaxed)) {
896  edm::LogWarning("DAQSource") << "Additional file is still missing after 30 seconds -: " << addFile;
897  struct stat bufEoR;
898  auto secondaryPath = std::filesystem::path(addFile).parent_path();
900  std::string mainEoR = (std::filesystem::path(daqDirector_->buBaseRunDir()) / eorName).generic_string();
901  std::string secondaryEoR = (secondaryPath / eorName).generic_string();
902  bool prematureEoR = false;
903  if (stat(secondaryEoR.c_str(), &bufEoR) == 0) {
904  if (stat(addFile.c_str(), &bufEoR) != 0) {
905  edm::LogError("DAQSource")
906  << "EoR file appeared in -: " << secondaryPath << " while waiting for index file " << addFile;
907  prematureEoR = true;
908  }
909  } else if (stat(mainEoR.c_str(), &bufEoR) == 0) {
910  //wait another 10 seconds
911  usleep(10000000);
912  if (stat(addFile.c_str(), &bufEoR) != 0) {
913  edm::LogError("DAQSource")
914  << "Main EoR file appeared -: " << mainEoR << " while waiting for index file " << addFile;
915  prematureEoR = true;
916  }
917  }
918  if (prematureEoR) {
919  //queue EoR since this is not FU error
920  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded, 0));
921  stop = true;
922  break;
923  }
924  }
925 
926  if (quit_threads_) {
927  edm::LogError("DAQSource") << "Quitting while waiting for file -: " << addFile;
928  stop = true;
929  setExceptionState_ = true;
930  break;
931  }
932  }
933  LogDebug("DAQSource") << " APPEND NAME " << addFile;
934  if (stop)
935  break;
936 
937  newInputFile->appendFile(addFile, buf.st_size);
938  neededSize += buf.st_size;
939  }
940  if (stop)
941  break;
942 
943  //calculate number of needed chunks and size if resizing will be applied
944  uint16_t neededChunks;
945  uint64_t chunkSize;
946 
947  if (fitToBuffer) {
948  chunkSize = std::min(maxChunkSize_, std::max(eventChunkSize_, neededSize));
949  neededChunks = 1;
950  } else {
951  chunkSize = eventChunkSize_;
952  neededChunks = neededSize / eventChunkSize_ + uint16_t((neededSize % eventChunkSize_) > 0);
953  }
954  newInputFile->setChunks(neededChunks);
955 
956  newInputFile->randomizeOrder(rng_);
957 
959  auto newInputFilePtr = newInputFile.get();
960  fileQueue_.push(std::move(newInputFile));
961 
962  for (size_t i = 0; i < neededChunks; i++) {
963  if (fms_) {
964  bool copy_active = false;
965  for (auto j : tid_active_)
966  if (j)
967  copy_active = true;
968  if (copy_active)
970  else
972  }
973  //get thread
974  unsigned int newTid = 0xffffffff;
975  while (!workerPool_.try_pop(newTid)) {
976  usleep(100000);
977  if (quit_threads_.load(std::memory_order_relaxed)) {
978  stop = true;
979  break;
980  }
981  }
982 
983  if (fms_) {
984  bool copy_active = false;
985  for (auto j : tid_active_)
986  if (j)
987  copy_active = true;
988  if (copy_active)
990  else
992  }
993  InputChunk* newChunk = nullptr;
994  while (!freeChunks_.try_pop(newChunk)) {
995  usleep(100000);
996  if (quit_threads_.load(std::memory_order_relaxed)) {
997  stop = true;
998  break;
999  }
1000  }
1001 
1002  if (newChunk == nullptr) {
1003  //return unused tid if we received shutdown (nullptr chunk)
1004  if (newTid != 0xffffffff)
1005  workerPool_.push(newTid);
1006  stop = true;
1007  break;
1008  }
1009  if (stop)
1010  break;
1012 
1013  std::unique_lock<std::mutex> lk(mReader_);
1014 
1015  uint64_t toRead = chunkSize;
1016  if (i == (uint64_t)neededChunks - 1 && neededSize % chunkSize)
1017  toRead = neededSize % chunkSize;
1018  newChunk->reset(i * chunkSize, toRead, i);
1019 
1020  workerJob_[newTid].first = newInputFilePtr;
1021  workerJob_[newTid].second = newChunk;
1022 
1023  //wake up the worker thread
1024  cvReader_[newTid]->notify_one();
1025  }
1026  }
1027  }
1029  //make sure threads finish reading
1030  unsigned int numFinishedThreads = 0;
1031  while (numFinishedThreads < workerThreads_.size()) {
1032  unsigned int tid = 0;
1033  while (!workerPool_.try_pop(tid)) {
1034  usleep(10000);
1035  }
1036  std::unique_lock<std::mutex> lk(mReader_);
1037  thread_quit_signal[tid] = true;
1038  cvReader_[tid]->notify_one();
1039  numFinishedThreads++;
1040  }
1041  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1042  workerThreads_[i]->join();
1043  delete workerThreads_[i];
1044  }
1045 }
1046 
1047 void DAQSource::readWorker(unsigned int tid) {
1048  bool init = true;
1049  threadInit_.exchange(true, std::memory_order_acquire);
1050 
1051  while (true) {
1052  tid_active_[tid] = false;
1053  std::unique_lock<std::mutex> lk(mReader_);
1054  workerJob_[tid].first = nullptr;
1055  workerJob_[tid].first = nullptr;
1056 
1057  assert(!thread_quit_signal[tid]); //should never get it here
1058  workerPool_.push(tid);
1059 
1060  if (init) {
1061  std::unique_lock<std::mutex> lk(startupLock_);
1062  init = false;
1063  startupCv_.notify_one();
1064  }
1065  cvReader_[tid]->wait(lk);
1066 
1067  if (thread_quit_signal[tid])
1068  return;
1069  tid_active_[tid] = true;
1070 
1071  RawInputFile* file;
1072  InputChunk* chunk;
1073 
1074  assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1075 
1076  file = workerJob_[tid].first;
1077  chunk = workerJob_[tid].second;
1078 
1079  bool fitToBuffer = dataMode_->fitToBuffer();
1080 
1081  //resize if multi-chunked reading is not possible
1082  if (fitToBuffer) {
1083  uint64_t accum = 0;
1084  for (auto s : file->diskFileSizes_)
1085  accum += s;
1086  if (accum > eventChunkSize_) {
1087  if (!chunk->resize(accum, maxChunkSize_)) {
1088  edm::LogError("DAQSource")
1089  << "maxChunkSize can not accomodate the file set. Try increasing chunk size and/or chunk maximum size.";
1090  if (file->rawFd_ != -1 && (numConcurrentReads_ == 1 || chunk->offset_ == 0))
1091  close(file->rawFd_);
1092  setExceptionState_ = true;
1093  continue;
1094  } else {
1095  edm::LogInfo("DAQSource") << "chunk size was increased to " << (chunk->size_ >> 20) << " MB";
1096  }
1097  }
1098  }
1099 
1100  //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1101  unsigned int bufferLeftInitial = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1102  const uint16_t readBlocks = chunk->size_ / eventChunkBlock_ + uint16_t(chunk->size_ % eventChunkBlock_ > 0);
1103 
1104  auto readPrimary = [&](uint64_t bufferLeft) {
1105  //BEGIN reading primary file - check if file descriptor is already open
1106  //in multi-threaded chunked mode, only first thread will use already open fd for reading the first file
1107  //fd will not be closed in other case (used by other threads)
1108  int fileDescriptor = -1;
1109  bool fileOpenedHere = false;
1110 
1111  if (numConcurrentReads_ == 1) {
1112  fileDescriptor = file->rawFd_;
1113  file->rawFd_ = -1;
1114  if (fileDescriptor == -1) {
1115  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1116  fileOpenedHere = true;
1117  }
1118  } else {
1119  if (chunk->offset_ == 0) {
1120  fileDescriptor = file->rawFd_;
1121  file->rawFd_ = -1;
1122  if (fileDescriptor == -1) {
1123  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1124  fileOpenedHere = true;
1125  }
1126  } else {
1127  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1128  fileOpenedHere = true;
1129  }
1130  }
1131 
1132  if (fileDescriptor == -1) {
1133  edm::LogError("DAQSource") << "readWorker failed to open file -: " << file->fileName_
1134  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1135  setExceptionState_ = true;
1136  return;
1137  }
1138 
1139  if (fileOpenedHere) { //fast forward to this chunk position
1140  off_t pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1141  if (pos == -1) {
1142  edm::LogError("DAQSource") << "readWorker failed to seek file -: " << file->fileName_
1143  << " fd:" << fileDescriptor << " to offset " << chunk->offset_
1144  << " error: " << strerror(errno);
1145  setExceptionState_ = true;
1146  return;
1147  }
1148  }
1149 
1150  LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1151  << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1152 
1153  size_t skipped = bufferLeft;
1155  for (unsigned int i = 0; i < readBlocks; i++) {
1156  ssize_t last;
1157  edm::LogInfo("DAQSource") << "readWorker read -: " << (int64_t)(chunk->usedSize_ - bufferLeft) << " or "
1158  << (int64_t)eventChunkBlock_;
1159 
1160  //protect against reading into next block
1161  last = ::read(fileDescriptor,
1162  (void*)(chunk->buf_ + bufferLeft),
1163  std::min((int64_t)(chunk->usedSize_ - bufferLeft), (int64_t)eventChunkBlock_));
1164 
1165  if (last < 0) {
1166  edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1167  << " fd:" << fileDescriptor << " last: " << last << " error: " << strerror(errno);
1168  setExceptionState_ = true;
1169  break;
1170  }
1171  if (last > 0) {
1172  bufferLeft += last;
1173  }
1174  if ((uint64_t)last < eventChunkBlock_) { //last read
1175  edm::LogInfo("DAQSource") << "chunkUsedSize" << chunk->usedSize_ << " u-s:" << (chunk->usedSize_ - skipped)
1176  << " ix:" << i * eventChunkBlock_ << " " << (size_t)last;
1177  //check if this is last block if single file, then total read size must match file size
1178  if (file->numFiles_ == 1 && !(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (size_t)last)) {
1179  edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1180  << " fd:" << fileDescriptor << " last:" << last
1181  << " expectedChunkSize:" << chunk->usedSize_
1182  << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last)
1183  << " skipped:" << skipped << " block:" << (i + 1) << "/" << readBlocks
1184  << " error: " << strerror(errno);
1185  setExceptionState_ = true;
1186  }
1187  break;
1188  }
1189  }
1190  if (setExceptionState_)
1191  return;
1192 
1193  file->fileSizes_[0] = bufferLeft;
1194 
1195  if (chunk->offset_ + bufferLeft == file->diskFileSizes_[0] || bufferLeft == chunk->size_) {
1196  //file reading finished using this fd
1197  //or the whole buffer is filled (single sequential file spread over more chunks)
1198  close(fileDescriptor);
1199  fileDescriptor = -1;
1200  } else
1201  assert(fileDescriptor == -1);
1202 
1203  if (fitToBuffer && bufferLeft != file->diskFileSizes_[0]) {
1204  edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[0]
1205  << " read:" << bufferLeft << " expected:" << file->diskFileSizes_[0];
1206  setExceptionState_ = true;
1207  return;
1208  }
1209 
1211  auto diff = end - start;
1212  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1213  LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1214  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1215  << " GB/s)";
1216  };
1217  //END primary function
1218 
1219  //SECONDARY files function
1220  auto readSecondary = [&](uint64_t bufferLeft, unsigned int j) {
1221  size_t fileLen = 0;
1222 
1223  std::string const& addFile = file->fileNames_[j];
1224  int fileDescriptor = open(addFile.c_str(), O_RDONLY);
1225 
1226  if (fileDescriptor < 0) {
1227  edm::LogError("DAQSource") << "readWorker failed to open file -: " << addFile << " fd:" << fileDescriptor
1228  << " error: " << strerror(errno);
1229  setExceptionState_ = true;
1230  return;
1231  }
1232 
1233  LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << addFile << " at offset "
1234  << lseek(fileDescriptor, 0, SEEK_CUR);
1235 
1236  //size_t skipped = 0;//file is newly opened, read with header
1238  for (unsigned int i = 0; i < readBlocks; i++) {
1239  ssize_t last;
1240 
1241  //protect against reading into next block
1242  //use bufferLeft for the write offset
1243  last = ::read(fileDescriptor,
1244  (void*)(chunk->buf_ + bufferLeft),
1245  std::min((uint64_t)file->diskFileSizes_[j], (uint64_t)eventChunkBlock_));
1246 
1247  if (last < 0) {
1248  edm::LogError("DAQSource") << "readWorker failed to read file -: " << addFile << " fd:" << fileDescriptor
1249  << " error: " << strerror(errno);
1250  setExceptionState_ = true;
1251  close(fileDescriptor);
1252  break;
1253  }
1254  if (last > 0) {
1255  bufferLeft += last;
1256  fileLen += last;
1257  file->fileSize_ += last;
1258  }
1259  };
1260 
1261  close(fileDescriptor);
1262  file->fileSizes_[j] = fileLen;
1263  assert(fileLen > 0);
1264 
1265  if (fitToBuffer && fileLen != file->diskFileSizes_[j]) {
1266  edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[j]
1267  << " read:" << fileLen << " expected:" << file->diskFileSizes_[j];
1268  setExceptionState_ = true;
1269  return;
1270  }
1271 
1273  auto diff = end - start;
1274  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1275  LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1276  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1277  << " GB/s)";
1278  };
1279 
1280  //randomized order multi-file loop
1281  for (unsigned int j : file->fileOrder_) {
1282  if (j == 0) {
1283  readPrimary(bufferLeftInitial);
1284  } else
1285  readSecondary(file->bufferOffsets_[j], j);
1286 
1287  if (setExceptionState_)
1288  break;
1289  }
1290 
1291  if (setExceptionState_)
1292  continue;
1293 
1294  //detect FRD event version. Skip file Header if it exists
1295  if (dataMode_->dataVersion() == 0 && chunk->offset_ == 0) {
1296  dataMode_->detectVersion(chunk->buf_, file->rawHeaderSize_);
1297  }
1298  assert(dataMode_->versionCheck());
1299 
1300  chunk->readComplete_ =
1301  true; //this is atomic to secure the sequential buffer fill before becoming available for processing)
1302  file->chunks_[chunk->fileIndex_] = chunk; //put the completed chunk in the file chunk vector at predetermined index
1303  }
1304 }
1305 
1307  quit_threads_ = true;
1308  throw cms::Exception("DAQSource:threadError") << " file reader thread error ";
1309 }
1310 
1312  if (fms_)
1313  fms_->setInState(state);
1314 }
1315 
1317  if (fms_)
1319 }
1320 
1321 bool RawInputFile::advance(unsigned char*& dataPosition, const size_t size) {
1322  //wait for chunk
1323 
1324  while (!waitForChunk(currentChunk_)) {
1326  usleep(100000);
1330  }
1331 
1332  dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1333  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1334 
1335  if (currentLeft < size) {
1336  //we need next chunk
1337  while (!waitForChunk(currentChunk_ + 1)) {
1339  usleep(100000);
1343  }
1344  //copy everything to beginning of the first chunk
1345  dataPosition -= chunkPosition_;
1346  assert(dataPosition == chunks_[currentChunk_]->buf_);
1347  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1348  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1349  //set pointers at the end of the old data position
1350  bufferPosition_ += size;
1351  chunkPosition_ = size - currentLeft;
1352  currentChunk_++;
1353  return true;
1354  } else {
1355  chunkPosition_ += size;
1356  bufferPosition_ += size;
1357  return false;
1358  }
1359 }
1360 
1361 void DAQSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1362  std::lock_guard<std::mutex> lock(monlock_);
1363  auto itr = sourceEventsReport_.find(lumi);
1364  if (itr != sourceEventsReport_.end())
1365  itr->second += events;
1366  else
1368 }
1369 
1370 std::pair<bool, unsigned int> DAQSource::getEventReport(unsigned int lumi, bool erase) {
1371  std::lock_guard<std::mutex> lock(monlock_);
1372  auto itr = sourceEventsReport_.find(lumi);
1373  if (itr != sourceEventsReport_.end()) {
1374  std::pair<bool, unsigned int> ret(true, itr->second);
1375  if (erase)
1376  sourceEventsReport_.erase(itr);
1377  return ret;
1378  } else
1379  return std::pair<bool, unsigned int>(false, 0);
1380 }
1381 
1384  if (a.rfind('/') != std::string::npos)
1385  a = a.substr(a.rfind('/'));
1386  if (b.rfind('/') != std::string::npos)
1387  b = b.substr(b.rfind('/'));
1388  return b > a;
1389  });
1390 
1391  if (!listFileNames_.empty()) {
1392  //get run number from first file in the vector
1394  std::string fileStem = fileName.stem().string();
1395  if (fileStem.find("file://") == 0)
1396  fileStem = fileStem.substr(7);
1397  else if (fileStem.find("file:") == 0)
1398  fileStem = fileStem.substr(5);
1399  auto end = fileStem.find('_');
1400 
1401  if (fileStem.find("run") == 0) {
1402  std::string runStr = fileStem.substr(3, end - 3);
1403  try {
1404  //get long to support test run numbers < 2^32
1405  long rval = std::stol(runStr);
1406  edm::LogInfo("DAQSource") << "Autodetected run number in fileListMode -: " << rval;
1407  return rval;
1408  } catch (const std::exception&) {
1409  edm::LogWarning("DAQSource") << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1410  }
1411  }
1412  }
1413  return -1;
1414 }
1415 
1417  if (fileListIndex_ < listFileNames_.size()) {
1418  nextFile = listFileNames_[fileListIndex_];
1419  if (nextFile.find("file://") == 0)
1420  nextFile = nextFile.substr(7);
1421  else if (nextFile.find("file:") == 0)
1422  nextFile = nextFile.substr(5);
1423  std::filesystem::path fileName = nextFile;
1424  std::string fileStem = fileName.stem().string();
1425  if (fileStem.find("ls"))
1426  fileStem = fileStem.substr(fileStem.find("ls") + 2);
1427  if (fileStem.find('_'))
1428  fileStem = fileStem.substr(0, fileStem.find('_'));
1429 
1430  if (!fileListLoopMode_)
1431  ls = std::stoul(fileStem);
1432  else //always starting from LS 1 in loop mode
1433  ls = 1 + loopModeIterationInc_;
1434 
1435  //fsize = 0;
1436  //lockWaitTime = 0;
1437  fileListIndex_++;
1439  } else {
1440  if (!fileListLoopMode_)
1442  else {
1443  //loop through files until interrupted
1445  fileListIndex_ = 0;
1446  return getFile(ls, nextFile, lockWaitTime);
1447  }
1448  }
1449 }
std::string & buBaseRunDir()
size
Write out results.
static const char runNumber_[]
long initFileList()
Definition: DAQSource.cc:1382
Definition: start.py:1
Definition: fillJson.h:27
std::unique_ptr< RawInputFile > currentFile_
Definition: DAQSource.h:134
std::string const & runString() const
std::pair< RawInputFile *, InputChunk * > ReaderInfo
Definition: DAQSource.h:132
int currentFileIndex_
Definition: DAQSource.h:158
edm::RunNumber_t runNumber_
Definition: DAQSource.h:114
void threadError()
Definition: DAQSource.cc:1306
DAQSource * sourceParent_
Definition: DAQSource.h:197
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:232
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:330
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
unsigned int checkEvery_
Definition: DAQSource.h:163
std::unique_ptr< std::thread > readSupervisorThread_
Definition: DAQSource.h:138
bool crc32c_hw_test()
Definition: crc32c.cc:354
uint64_t maxChunkSize_
Definition: DAQSource.h:92
std::mutex startupLock_
Definition: DAQSource.h:155
std::default_random_engine rng_
Definition: DAQSource.h:124
std::map< unsigned int, unsigned int > sourceEventsReport_
Definition: DAQSource.h:174
bool chunkIsFree_
Definition: DAQSource.h:135
std::condition_variable startupCv_
Definition: DAQSource.h:156
ret
prodAgent to be discontinued
std::atomic< bool > threadInit_
Definition: DAQSource.h:172
unsigned int loopModeIterationInc_
Definition: DAQSource.h:112
Next checkNext() override
Definition: DAQSource.cc:256
void read(edm::EventPrincipal &eventPrincipal) override
Definition: DAQSource.cc:536
volatile std::atomic< bool > shutdown_flag
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:457
bool lumisectionDiscarded(unsigned int ls)
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
Definition: DAQSource.cc:1370
void maybeOpenNewLumiSection(const uint32_t lumiSection)
Definition: DAQSource.cc:329
std::mutex fileDeleteLock_
Definition: DAQSource.h:160
const std::string dataModeConfig_
Definition: DAQSource.h:90
std::vector< std::thread * > workerThreads_
Definition: DAQSource.h:140
Log< level::Error, false > LogError
std::mutex mReader_
Definition: DAQSource.h:148
unsigned int numConcurrentLumis() const
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
Definition: DAQSource.cc:1361
StreamID streamID() const
int timeout
Definition: mps_check.py:53
assert(be >=bs)
bool isExceptionOnData(unsigned int ls)
edm::ProcessHistoryID processHistoryID_
Definition: DAQSource.h:117
std::mutex mWakeup_
Definition: DAQSource.h:166
bool waitForChunk(unsigned int chunkid)
unsigned int currentLumiSection_
Definition: DAQSource.h:119
void overrideRunNumber(unsigned int run)
std::atomic< unsigned int > readingFilesCount_
Definition: DAQSource.h:98
evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock()
Definition: DAQSource.cc:349
std::string getEoRFilePathOnFU() const
uint64_t eventChunkSize_
Definition: DAQSource.h:91
U second(std::pair< T, U > const &p)
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
static Timestamp beginOfTime()
Definition: Timestamp.h:77
bool advance(unsigned char *&dataPosition, const size_t size)
Definition: DAQSource.cc:1321
DAQSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
Definition: DAQSource.cc:32
uint64_t eventChunkBlock_
Definition: DAQSource.h:93
evf::EvFDaqDirector::FileStatus getNextDataBlock()
Definition: DAQSource.cc:375
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
int currentLumiSection() const
Definition: DAQSource.h:52
unsigned int fileListIndex_
Definition: DAQSource.h:110
unsigned int eventsThisLumi_
Definition: DAQSource.h:122
const bool fileListLoopMode_
Definition: DAQSource.h:111
std::condition_variable cvWakeup_
Definition: DAQSource.h:167
friend struct InputChunk
Definition: DAQSource.h:43
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:337
std::vector< std::condition_variable * > cvReader_
Definition: DAQSource.h:149
void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex)
unsigned int maxBufferedFiles_
Definition: DAQSource.h:96
unsigned char * buf_
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
void readWorker(unsigned int tid)
Definition: DAQSource.cc:1047
unsigned int numBuffers_
Definition: DAQSource.h:95
std::atomic< bool > quit_threads_
Definition: DAQSource.h:152
const bool alwaysStartFromFirstLS_
Definition: DAQSource.h:101
std::shared_ptr< DataMode > dataMode_
Definition: DAQSource.h:177
unsigned int fileIndex_
tbb::concurrent_queue< std::unique_ptr< RawInputFile > > fileQueue_
Definition: DAQSource.h:146
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:362
const bool fileListMode_
Definition: DAQSource.h:109
unsigned int nStreams_
Definition: DAQSource.h:162
void createProcessingNotificationMaybe() const
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
std::atomic< bool > readComplete_
Definition: init.py:1
def ls(path, rec=False)
Definition: eostools.py:349
const std::vector< unsigned int > testTCDSFEDRange_
Definition: DAQSource.h:104
tbb::concurrent_vector< InputChunk * > chunks_
unsigned int getStartLumisectionFromEnv() const
def getRunNumber(filename)
uint32_t eventRunNumber_
Definition: DAQSource.h:120
void setInStateSup(FastMonState::InputState inputState)
unsigned long long uint64_t
Definition: Time.h:13
void dataArranger()
Definition: DAQSource.cc:579
std::vector< std::string > listFileNames_
Definition: DAQSource.h:105
~DAQSource() override
Definition: DAQSource.cc:178
def load(fileName)
Definition: svgfig.py:547
uint32_t chunkPosition_
std::vector< int > streamFileTracker_
Definition: DAQSource.h:161
double b
Definition: hdecay.h:118
void rewind_() override
Definition: DAQSource.cc:577
std::string getEoRFileName() const
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:345
std::vector< unsigned int > tid_active_
Definition: DAQSource.h:150
void stoppedLookingForFile(unsigned int lumi)
std::vector< unsigned int > thread_quit_signal
Definition: DAQSource.h:153
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:331
ItemType state() const
Definition: InputSource.h:332
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:462
unsigned int readBlocks_
Definition: DAQSource.h:94
HLT enums.
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:333
double a
Definition: hdecay.h:119
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:235
void setMonStateSup(evf::FastMonState::InputState state)
Definition: DAQSource.cc:1316
friend class RawInputFile
Definition: DAQSource.h:42
bool setExceptionState_
Definition: DAQSource.h:154
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: DAQSource.cc:225
unsigned int getLumisectionToStart() const
bool startedSupervisorThread_
Definition: DAQSource.h:137
evf::EvFDaqDirector * daqDirector_
Definition: DAQSource.h:88
tbb::concurrent_queue< InputChunk * > freeChunks_
Definition: DAQSource.h:145
unsigned int numConcurrentReads_
Definition: DAQSource.h:97
int addFile(MEStore &micromes, int fd)
Definition: fastHadd.cc:352
unsigned int RunNumber_t
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
tbb::concurrent_queue< unsigned int > workerPool_
Definition: DAQSource.h:142
bool resize(uint64_t wantedSize, uint64_t maxSize)
Log< level::Warning, false > LogWarning
unsigned int currentChunk_
std::vector< ReaderInfo > workerJob_
Definition: DAQSource.h:143
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint64_t &lockWaitTime)
Definition: DAQSource.cc:1416
void setMonState(evf::FastMonState::InputState state)
Definition: DAQSource.cc:1311
void setFMS(evf::FastMonitoringService *fms)
bool exceptionState()
Definition: DAQSource.h:78
const bool verifyChecksum_
Definition: DAQSource.h:102
void readSupervisor()
Definition: DAQSource.cc:581
int events
Power< A, B >::type pow(const A &a, const B &b)
Definition: Power.h:29
std::vector< std::string > const & getBUBaseDirs() const
def move(src, dest)
Definition: eostools.py:511
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
Definition: DAQSource.h:159
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
#define LogDebug(id)
void setInState(FastMonState::InputState inputState)
std::mutex monlock_
Definition: DAQSource.h:175
uint32_t bufferPosition_