CMS 3D CMS Logo

DAQSource.cc
Go to the documentation of this file.
1 #include <sstream>
2 #include <unistd.h>
3 #include <vector>
4 #include <chrono>
5 #include <algorithm>
6 
11 
21 
26 
27 //JSON file reader
29 
30 using namespace evf::FastMonState;
31 
33  : edm::RawInputSource(pset, desc),
34  dataModeConfig_(pset.getUntrackedParameter<std::string>("dataMode")),
35  eventChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkSize")) << 20),
36  maxChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("maxChunkSize")) << 20),
37  eventChunkBlock_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkBlock")) << 20),
38  numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers")),
39  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles")),
40  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
41  verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum")),
42  useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID")),
43  testTCDSFEDRange_(pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange")),
44  listFileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames")),
45  fileListMode_(pset.getUntrackedParameter<bool>("fileListMode")),
46  fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
47  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
48  processHistoryID_(),
49  currentLumiSection_(0),
50  eventsThisLumi_(0),
51  rng_(std::chrono::system_clock::now().time_since_epoch().count()) {
52  char thishost[256];
53  gethostname(thishost, 255);
54 
55  if (maxChunkSize_ == 0)
57  else if (maxChunkSize_ < eventChunkSize_)
58  throw cms::Exception("DAQSource::DAQSource") << "maxChunkSize must be equal or larger than eventChunkSize";
59 
60  if (eventChunkBlock_ == 0)
63  throw cms::Exception("DAQSource::DAQSource") << "eventChunkBlock must be equal or smaller than eventChunkSize";
64 
65  edm::LogInfo("DAQSource") << "Construction. read-ahead chunk size -: " << std::endl
66  << (eventChunkSize_ >> 20) << " MB on host " << thishost << " in mode " << dataModeConfig_;
67 
68  uint16_t MINTCDSuTCAFEDID = FEDNumbering::MINTCDSuTCAFEDID;
69  uint16_t MAXTCDSuTCAFEDID = FEDNumbering::MAXTCDSuTCAFEDID;
70 
71  if (!testTCDSFEDRange_.empty()) {
72  if (testTCDSFEDRange_.size() != 2) {
73  throw cms::Exception("DAQSource::DAQSource") << "Invalid TCDS Test FED range parameter";
74  }
75  MINTCDSuTCAFEDID = testTCDSFEDRange_[0];
76  MAXTCDSuTCAFEDID = testTCDSFEDRange_[1];
77  }
78 
79  //load mode class based on parameter
80  if (dataModeConfig_ == "FRD") {
81  dataMode_.reset(new DataModeFRD(this));
82  } else if (dataModeConfig_ == "FRDStriped") {
83  dataMode_.reset(new DataModeFRDStriped(this));
84  } else if (dataModeConfig_ == "ScoutingRun3") {
85  dataMode_.reset(new DataModeScoutingRun3(this));
86  } else
87  throw cms::Exception("DAQSource::DAQSource") << "Unknown data mode " << dataModeConfig_;
88 
90 
91  dataMode_->setTCDSSearchRange(MINTCDSuTCAFEDID, MAXTCDSuTCAFEDID);
92  dataMode_->setTesting(pset.getUntrackedParameter<bool>("testing", false));
93 
94  long autoRunNumber = -1;
95  if (fileListMode_) {
96  autoRunNumber = initFileList();
97  if (!fileListLoopMode_) {
98  if (autoRunNumber < 0)
99  throw cms::Exception("DAQSource::DAQSource") << "Run number not found from filename";
100  //override run number
101  runNumber_ = (edm::RunNumber_t)autoRunNumber;
102  daqDirector_->overrideRunNumber((unsigned int)autoRunNumber);
103  }
104  }
105 
106  dataMode_->makeDirectoryEntries(
108 
109  auto& daqProvenanceHelpers = dataMode_->makeDaqProvenanceHelpers();
110  for (const auto& daqProvenanceHelper : daqProvenanceHelpers)
111  processHistoryID_ = daqProvenanceHelper->daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
112  setNewRun();
113  //todo:autodetect from file name (assert if names differ)
115 
116  //make sure that chunk size is N * block size
121 
122  if (!numBuffers_)
123  throw cms::Exception("DAQSource::DAQSource") << "no reading enabled with numBuffers parameter 0";
124 
126  assert(numBuffers_ > 1);
127  readingFilesCount_ = 0;
128 
129  if (!crc32c_hw_test())
130  edm::LogError("DAQSource::DAQSource") << "Intel crc32c checksum computation unavailable";
131 
132  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
133  if (fileListMode_) {
134  try {
136  } catch (cms::Exception const&) {
137  edm::LogInfo("DAQSource") << "No FastMonitoringService found in the configuration";
138  }
139  } else {
141  if (!fms_) {
142  throw cms::Exception("DAQSource") << "FastMonitoringService not found";
143  }
144  }
145 
147  if (!daqDirector_)
148  cms::Exception("DAQSource") << "EvFDaqDirector not found";
149 
150  edm::LogInfo("DAQSource") << "EvFDaqDirector/Source configured to use file service";
151  //set DaqDirector to delete files in preGlobalEndLumi callback
153  if (fms_) {
155  fms_->setInputSource(this);
158  }
159  //should delete chunks when run stops
160  for (unsigned int i = 0; i < numBuffers_; i++) {
162  }
163 
164  quit_threads_ = false;
165 
166  //prepare data shared by threads
167  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
168  thread_quit_signal.push_back(false);
169  workerJob_.push_back(ReaderInfo(nullptr, nullptr));
170  cvReader_.push_back(std::make_unique<std::condition_variable>());
171  tid_active_.push_back(0);
172  }
173 
174  //start threads
175  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
176  //wait for each thread to complete initialization
177  std::unique_lock<std::mutex> lk(startupLock_);
178  workerThreads_.push_back(new std::thread(&DAQSource::readWorker, this, i));
179  startupCv_.wait(lk);
180  }
181 
182  runAuxiliary()->setProcessHistoryID(processHistoryID_);
183 }
184 
186  quit_threads_ = true;
187 
188  //delete any remaining open files
189  if (!fms_ || !fms_->exceptionDetected()) {
190  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
191  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
192  it->second.reset();
193  } else {
194  //skip deleting files with exception
195  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
196  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
197  if (fms_->isExceptionOnData(it->second->lumi_))
198  it->second->unsetDeleteFile();
199  else
200  it->second.reset();
201  }
202  //disable deleting current file with exception
203  if (currentFile_.get())
204  if (fms_->isExceptionOnData(currentFile_->lumi_))
205  currentFile_->unsetDeleteFile();
206  }
207 
209  readSupervisorThread_->join();
210  } else {
211  //join aux threads in case the supervisor thread was not started
212  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
213  std::unique_lock<std::mutex> lk(mReader_);
214  thread_quit_signal[i] = true;
215  cvReader_[i]->notify_one();
216  lk.unlock();
217  workerThreads_[i]->join();
218  delete workerThreads_[i];
219  }
220  }
221 }
222 
225  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk (unified)");
226  desc.addUntracked<std::string>("dataMode", "FRD")->setComment("Data mode: event 'FRD', 'FRSStriped', 'ScoutingRun2'");
227  desc.addUntracked<unsigned int>("eventChunkSize", 64)->setComment("Input buffer (chunk) size");
228  desc.addUntracked<unsigned int>("maxChunkSize", 0)
229  ->setComment("Maximum chunk size allowed if buffer is resized to fit data. If 0 is specifier, use chunk size");
230  desc.addUntracked<unsigned int>("eventChunkBlock", 0)
231  ->setComment(
232  "Block size used in a single file read call (must be smaller or equal to the initial chunk buffer size). If "
233  "0 is specified, use chunk size.");
234 
235  desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
236  desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
237  ->setComment("Maximum number of simultaneously buffered raw files");
238  desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
239  ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
240  desc.addUntracked<bool>("verifyChecksum", true)
241  ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
242  desc.addUntracked<bool>("useL1EventID", false)
243  ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
244  desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
245  ->setComment("[min, max] range to search for TCDS FED ID in test setup");
246  desc.addUntracked<bool>("fileListMode", false)
247  ->setComment("Use fileNames parameter to directly specify raw files to open");
248  desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
249  ->setComment("file list used when fileListMode is enabled");
250  desc.setAllowAnything();
251  descriptions.add("source", desc);
252 }
253 
256  std::unique_lock<std::mutex> lk(startupLock_);
257 
258  //this thread opens new files and dispatches reading to worker readers
259  readSupervisorThread_ = std::make_unique<std::thread>(&DAQSource::readSupervisor, this);
261 
262  startupCv_.wait(lk);
263  }
264 
265  //signal hltd to start event accounting
266  if (!currentLumiSection_)
269 
270  auto nextEvent = [this]() {
271  auto getNextEvent = [this]() {
272  //for some models this is always true (if one event is one block)
273  if (dataMode_->dataBlockCompleted()) {
274  return getNextDataBlock();
275  } else {
276  return getNextEventFromDataBlock();
277  }
278  };
279 
281  while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
282  if (edm::shutdown_flag.load(std::memory_order_relaxed))
283  break;
284  }
285  return status;
286  };
287 
288  switch (nextEvent()) {
290  //maybe create EoL file in working directory before ending run
291  struct stat buf;
292  //also create EoR file in FU data directory
293  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
294  if (!eorFound) {
295  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
296  O_RDWR | O_CREAT,
297  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
298  close(eor_fd);
299  }
301  eventsThisLumi_ = 0;
303  edm::LogInfo("DAQSource") << "----------------RUN ENDED----------------";
304  return Next::kStop;
305  }
307  //this is not reachable
308  return Next::kEvent;
309  }
311  //std::cout << "--------------NEW LUMI---------------" << std::endl;
312  return Next::kEvent;
313  }
314  default: {
317  else
318  eventRunNumber_ = dataMode_->run();
319 
320  setEventCached();
321 
322  return Next::kEvent;
323  }
324  }
325 }
326 
327 void DAQSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
328  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
329  currentLumiSection_ = lumiSection;
330 
332 
333  timeval tv;
334  gettimeofday(&tv, nullptr);
335  const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
336 
338  runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
339 
340  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
341  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
342 
343  edm::LogInfo("DAQSource") << "New lumi section was opened. LUMI -: " << lumiSection;
344  }
345 }
346 
349 
350  bool found = dataMode_->nextEventView();
351  //file(s) completely parsed
352  if (!found) {
353  if (dataMode_->dataBlockInitialized()) {
354  dataMode_->setDataBlockInitialized(false);
355  //roll position to the end of the file to close it
356  currentFile_->bufferPosition_ = currentFile_->fileSize_;
357  }
359  }
360 
361  if (verifyChecksum_ && !dataMode_->checksumValid()) {
362  if (fms_)
364  throw cms::Exception("DAQSource::getNextEventFromDataBlock") << dataMode_->getChecksumError();
365  }
367 
368  currentFile_->nProcessed_++;
369 
371 }
372 
374  if (setExceptionState_)
375  threadError();
376  if (!currentFile_.get()) {
379  {
380  IdleSourceSentry ids(fms_);
381  if (!fileQueue_.try_pop(currentFile_)) {
382  //sleep until wakeup (only in single-buffer mode) or timeout
383  std::unique_lock<std::mutex> lkw(mWakeup_);
384  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
386  }
387  }
388  status = currentFile_->status_;
391  currentFile_.reset();
392  return status;
393  } else if (status == evf::EvFDaqDirector::runAbort) {
394  throw cms::Exception("DAQSource::getNextDataBlock") << "Run has been aborted by the input source reader thread";
395  } else if (status == evf::EvFDaqDirector::newLumi) {
397  if (currentFile_->lumi_ > currentLumiSection_) {
399  eventsThisLumi_ = 0;
401  }
402  currentFile_.reset();
403  return status;
404  } else if (status == evf::EvFDaqDirector::newFile) {
406  } else
407  assert(false);
408  }
410 
411  //file is empty
412  if (!currentFile_->fileSize_) {
414  //try to open new lumi
415  assert(currentFile_->nChunks_ == 0);
416  if (currentFile_->lumi_ > currentLumiSection_) {
418  eventsThisLumi_ = 0;
420  }
421  //immediately delete empty file
422  currentFile_.reset();
424  }
425 
426  //file is finished
427  if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
429  //release last chunk (it is never released elsewhere)
430  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
431  if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
432  throw cms::Exception("DAQSource::getNextDataBlock")
433  << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
434  << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
435  }
437  //put the file in pending delete list;
438  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
439  filesToDelete_.push_back(
440  std::pair<int, std::unique_ptr<RawInputFile>>(currentFileIndex_, std::move(currentFile_)));
441  } else {
442  //in single-thread and stream jobs, events are already processed
443  currentFile_.reset();
444  }
446  }
447 
448  //assert(currentFile_->status_ == evf::EvFDaqDirector::newFile);
449 
450  //handle RAW file header
451  if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
452  if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
453  if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
454  throw cms::Exception("DAQSource::getNextDataBlock") << "Premature end of input file while reading file header";
455 
456  edm::LogWarning("DAQSource") << "File with only raw header and no events received in LS " << currentFile_->lumi_;
457  if (currentFile_->lumi_ > currentLumiSection_) {
459  eventsThisLumi_ = 0;
461  }
462  }
463 
464  //advance buffer position to skip file header (chunk will be acquired later)
465  currentFile_->advance(currentFile_->rawHeaderSize_);
466  }
467 
468  //file is too short to fit event header
469  if (currentFile_->fileSizeLeft() < dataMode_->headerSize())
470  throw cms::Exception("DAQSource::getNextDataBlock")
471  << "Premature end of input file while reading event header. Missing: "
472  << (dataMode_->headerSize() - currentFile_->fileSizeLeft()) << " bytes";
473 
474  //multibuffer mode
475  //wait for the current chunk to become added to the vector
477  {
478  IdleSourceSentry ids(fms_);
479  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
480  usleep(10000);
481  if (setExceptionState_)
482  threadError();
483  }
484  }
486 
487  chunkIsFree_ = false;
488  bool chunkEnd;
489  unsigned char* dataPosition;
490 
491  //read event header, copy it to a single chunk if necessary
492  chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize());
493 
494  //get buffer size of current chunk (can be resized)
495  uint64_t currentChunkSize = currentFile_->currentChunkSize();
496 
497  //prepare view based on header that was read
498  dataMode_->makeDataBlockView(dataPosition, currentChunkSize, currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
499 
500  //check that payload size is within the file
501  const size_t msgSize = dataMode_->dataBlockSize() - dataMode_->headerSize();
502 
503  if (currentFile_->fileSizeLeft() < (int64_t)msgSize)
504  throw cms::Exception("DAQSource::getNextEventDataBlock")
505  << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
506  << ") while parsing block";
507 
508  //for cross-buffer models
509  if (chunkEnd) {
510  //header was at the chunk boundary, move payload into the starting chunk as well. No need to update block view here
511  currentFile_->moveToPreviousChunk(msgSize, dataMode_->headerSize());
512  //mark to release old chunk
513  chunkIsFree_ = true;
514  } else if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) {
515  //header was contiguous, but payload does not fit in the chunk
516  //rewind to header start position and then together with payload will be copied together to the old chunk
517  currentFile_->rewindChunk(dataMode_->headerSize());
518 
520  {
521  IdleSourceSentry ids(fms_);
522  //do the copy to the beginning of the starting chunk. move pointers for next event in the next chunk
523  chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize() + msgSize);
524  assert(chunkEnd);
525  //mark to release old chunk
526  chunkIsFree_ = true;
527  }
529  //header and payload is moved, update view
530  dataMode_->makeDataBlockView(
531  dataPosition, currentFile_->currentChunkSize(), currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
532  } else {
533  //everything is in a single chunk, only move pointers forward
534  chunkEnd = currentFile_->advance(dataPosition, msgSize);
535  assert(!chunkEnd);
536  chunkIsFree_ = false;
537  }
538 
539  //sanity-check check that the buffer position has not exceeded file size after preparing event
540  if (currentFile_->fileSize_ < currentFile_->bufferPosition_)
541  throw cms::Exception("DAQSource::getNextEventDataBlock")
542  << "Exceeded file size by " << (currentFile_->bufferPosition_ - currentFile_->fileSize_);
543 
544  //prepare event
545  return getNextEventFromDataBlock();
546 }
547 
548 void DAQSource::read(edm::EventPrincipal& eventPrincipal) {
550 
551  dataMode_->readEvent(eventPrincipal);
552 
553  eventsThisLumi_++;
555 
556  //resize vector if needed
557  while (streamFileTracker_.size() <= eventPrincipal.streamID())
558  streamFileTracker_.push_back(-1);
559 
560  streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
561 
562  //this old file check runs no more often than every 10 events
563  if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
564  //delete files that are not in processing
565  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
566  auto it = filesToDelete_.begin();
567  while (it != filesToDelete_.end()) {
568  bool fileIsBeingProcessed = false;
569  for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
570  if (it->first == streamFileTracker_.at(i)) {
571  fileIsBeingProcessed = true;
572  break;
573  }
574  }
575  if (!fileIsBeingProcessed && !(fms_ && fms_->isExceptionOnData(it->second->lumi_))) {
576  it = filesToDelete_.erase(it);
577  } else
578  it++;
579  }
580  }
581  if (dataMode_->dataBlockCompleted() && chunkIsFree_) {
582  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
583  chunkIsFree_ = false;
584  }
586  return;
587 }
588 
590 
592 
594  bool stop = false;
595  unsigned int currentLumiSection = 0;
596 
597  {
598  std::unique_lock<std::mutex> lk(startupLock_);
599  startupCv_.notify_one();
600  }
601 
602  uint32_t ls = 0;
603  uint32_t monLS = 1;
604  uint32_t lockCount = 0;
605  uint64_t sumLockWaitTimeUs = 0.;
606 
607  bool requireHeader = dataMode_->requireHeader();
608 
609  while (!stop) {
610  //wait for at least one free thread and chunk
611  int counter = 0;
612 
613  while (workerPool_.empty() || freeChunks_.empty() || readingFilesCount_ >= maxBufferedFiles_) {
614  //report state to monitoring
615  if (fms_) {
616  bool copy_active = false;
617  for (auto j : tid_active_)
618  if (j)
619  copy_active = true;
622  else if (freeChunks_.empty()) {
623  if (copy_active)
625  else
627  } else {
628  if (copy_active)
630  else
632  }
633  }
634  std::unique_lock<std::mutex> lkw(mWakeup_);
635  //sleep until woken up by condition or a timeout
636  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
637  counter++;
638  if (!(counter % 6000)) {
639  edm::LogWarning("FedRawDataInputSource")
640  << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
641  << ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
642  << " / " << maxBufferedFiles_;
643  }
644  LogDebug("DAQSource") << "No free chunks or threads...";
645  } else {
646  assert(!workerPool_.empty() || freeChunks_.empty());
647  }
648  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
649  stop = true;
650  break;
651  }
652  }
653  //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
654 
655  if (stop)
656  break;
657 
658  //look for a new file
659  std::string nextFile;
660  int64_t fileSizeFromMetadata;
661 
662  if (fms_) {
665  }
666  bool fitToBuffer = dataMode_->fitToBuffer();
667 
669  uint16_t rawHeaderSize = 0;
670  uint32_t lsFromRaw = 0;
671  int32_t serverEventsInNewFile = -1;
672  int rawFd = -1;
673 
674  int backoff_exp = 0;
675 
676  //entering loop which tries to grab new file from ramdisk
678  //check if hltd has signalled to throttle input
679  counter = 0;
680  while (daqDirector_->inputThrottled()) {
681  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
682  break;
683 
684  unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
685  unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
686  unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
687  bool hasDiscardedLumi = false;
688  for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
690  edm::LogWarning("DAQSource") << "Source detected that the lumisection is discarded -: " << i;
691  hasDiscardedLumi = true;
692  break;
693  }
694  }
695  if (hasDiscardedLumi)
696  break;
697 
699  if (!(counter % 50))
700  edm::LogWarning("DAQSource") << "Input throttled detected, reading files is paused...";
701  usleep(100000);
702  counter++;
703  }
704 
705  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
706  stop = true;
707  break;
708  }
709 
710  assert(rawFd == -1);
711  uint64_t thisLockWaitTimeUs = 0.;
713  if (fileListMode_) {
714  //return LS if LS not set, otherwise return file
715  status = getFile(ls, nextFile, thisLockWaitTimeUs);
717  uint16_t rawDataType;
719  rawFd,
720  rawHeaderSize,
721  rawDataType,
722  lsFromRaw,
723  serverEventsInNewFile,
724  fileSizeFromMetadata,
725  requireHeader,
726  false,
727  false) != 0) {
728  //error
729  setExceptionState_ = true;
730  stop = true;
731  break;
732  }
733  }
734  } else {
736  ls,
737  nextFile,
738  rawFd,
739  rawHeaderSize, //which format?
740  serverEventsInNewFile,
741  fileSizeFromMetadata,
742  thisLockWaitTimeUs,
743  requireHeader);
744  }
745 
747 
748  //cycle through all remaining LS even if no files get assigned
751 
752  //monitoring of lock wait time
753  if (thisLockWaitTimeUs > 0.)
754  sumLockWaitTimeUs += thisLockWaitTimeUs;
755  lockCount++;
756  if (ls > monLS) {
757  monLS = ls;
758  if (lockCount)
759  if (fms_)
760  fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
761  lockCount = 0;
762  sumLockWaitTimeUs = 0;
763  }
764 
766  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded));
767  stop = true;
768  break;
769  }
770 
771  //error from filelocking function
773  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
774  stop = true;
775  break;
776  }
777  //queue new lumisection
778  if (ls > currentLumiSection) {
779  //new file service
782  //start transitions from LS specified by env, continue if not reached
783  if (ls < daqDirector_->getStartLumisectionFromEnv()) {
784  //skip file if from earlier LS than specified by env
785  if (rawFd != -1) {
786  close(rawFd);
787  rawFd = -1;
788  }
790  continue;
791  } else {
792  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
793  }
794  } else if (ls < 100) {
795  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
796  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
797 
798  for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
799  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
800  }
801  } else {
802  //start from current LS
803  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
804  }
805  } else {
806  //queue all lumisections after last one seen to avoid gaps
807  for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
808  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
809  }
810  }
812  }
813  //else
815  edm::LogError("DAQSource") << "Got old LS (" << ls
816  << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
817  << ". Aborting execution." << std::endl;
818  if (rawFd != -1)
819  close(rawFd);
820  rawFd = -1;
821  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
822  stop = true;
823  break;
824  }
825 
826  int dbgcount = 0;
829  dbgcount++;
830  if (!(dbgcount % 20))
831  LogDebug("DAQSource") << "No file for me... sleep and try again...";
832 
833  backoff_exp = std::min(4, backoff_exp); // max 1.6 seconds
834  //backoff_exp=0; // disabled!
835  int sleeptime = (int)(100000. * pow(2, backoff_exp));
836  usleep(sleeptime);
837  backoff_exp++;
838  } else
839  backoff_exp = 0;
840  }
841  //end of file grab loop, parse result
844  LogDebug("DAQSource") << "The director says to grab -: " << nextFile;
845 
846  std::string rawFile;
847  //file service will report raw extension
848  rawFile = nextFile;
849 
850  struct stat st;
851  int stat_res = stat(rawFile.c_str(), &st);
852  if (stat_res == -1) {
853  edm::LogError("DAQSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
854  setExceptionState_ = true;
855  break;
856  }
857  uint64_t fileSize = st.st_size;
858 
859  if (fms_) {
863  }
864  int eventsInNewFile;
865  if (fileListMode_) {
866  if (fileSize == 0)
867  eventsInNewFile = 0;
868  else
869  eventsInNewFile = -1;
870  } else {
871  eventsInNewFile = serverEventsInNewFile;
872  assert(eventsInNewFile >= 0);
873  assert((eventsInNewFile > 0) ==
874  (fileSize > rawHeaderSize)); //file without events must be empty or contain only header
875  }
876 
877  std::pair<bool, std::vector<std::string>> additionalFiles =
878  dataMode_->defineAdditionalFiles(rawFile, fileListMode_);
879  if (!additionalFiles.first) {
880  //skip secondary files from file broker
881  if (rawFd > -1)
882  close(rawFd);
883  continue;
884  }
885 
886  std::unique_ptr<RawInputFile> newInputFile(new RawInputFile(evf::EvFDaqDirector::FileStatus::newFile,
887  ls,
888  rawFile,
889  !fileListMode_,
890  rawFd,
891  fileSize,
892  rawHeaderSize, //for which format
893  0,
894  eventsInNewFile,
895  this));
896 
897  uint64_t neededSize = fileSize;
898  for (const auto& addFile : additionalFiles.second) {
899  struct stat buf;
900  //wait for secondary files to appear
901  unsigned int fcnt = 0;
902  while (stat(addFile.c_str(), &buf) != 0) {
903  if (fileListMode_) {
904  edm::LogError("DAQSource") << "additional file is missing -: " << addFile;
905  stop = true;
906  setExceptionState_ = true;
907  break;
908  }
909  usleep(10000);
910  fcnt++;
911  //report and EoR check every 30 seconds
912  if ((fcnt && fcnt % 3000 == 0) || quit_threads_.load(std::memory_order_relaxed)) {
913  edm::LogWarning("DAQSource") << "Additional file is still missing after 30 seconds -: " << addFile;
914  struct stat bufEoR;
915  auto secondaryPath = std::filesystem::path(addFile).parent_path();
917  std::string mainEoR = (std::filesystem::path(daqDirector_->buBaseRunDir()) / eorName).generic_string();
918  std::string secondaryEoR = (secondaryPath / eorName).generic_string();
919  bool prematureEoR = false;
920  if (stat(secondaryEoR.c_str(), &bufEoR) == 0) {
921  if (stat(addFile.c_str(), &bufEoR) != 0) {
922  edm::LogError("DAQSource")
923  << "EoR file appeared in -: " << secondaryPath << " while waiting for index file " << addFile;
924  prematureEoR = true;
925  }
926  } else if (stat(mainEoR.c_str(), &bufEoR) == 0) {
927  //wait another 10 seconds
928  usleep(10000000);
929  if (stat(addFile.c_str(), &bufEoR) != 0) {
930  edm::LogError("DAQSource")
931  << "Main EoR file appeared -: " << mainEoR << " while waiting for index file " << addFile;
932  prematureEoR = true;
933  }
934  }
935  if (prematureEoR) {
936  //queue EoR since this is not FU error
937  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded, 0));
938  stop = true;
939  break;
940  }
941  }
942 
943  if (quit_threads_) {
944  edm::LogError("DAQSource") << "Quitting while waiting for file -: " << addFile;
945  stop = true;
946  setExceptionState_ = true;
947  break;
948  }
949  }
950  LogDebug("DAQSource") << " APPEND NAME " << addFile;
951  if (stop)
952  break;
953 
954  newInputFile->appendFile(addFile, buf.st_size);
955  neededSize += buf.st_size;
956  }
957  if (stop)
958  break;
959 
960  //calculate number of needed chunks and size if resizing will be applied
961  uint16_t neededChunks;
962  uint64_t chunkSize;
963 
964  if (fitToBuffer) {
965  chunkSize = std::min(maxChunkSize_, std::max(eventChunkSize_, neededSize));
966  neededChunks = 1;
967  } else {
968  chunkSize = eventChunkSize_;
969  neededChunks = neededSize / eventChunkSize_ + uint16_t((neededSize % eventChunkSize_) > 0);
970  }
971  newInputFile->setChunks(neededChunks);
972 
973  newInputFile->randomizeOrder(rng_);
974 
976  auto newInputFilePtr = newInputFile.get();
977  fileQueue_.push(std::move(newInputFile));
978 
979  for (size_t i = 0; i < neededChunks; i++) {
980  if (fms_) {
981  bool copy_active = false;
982  for (auto j : tid_active_)
983  if (j)
984  copy_active = true;
985  if (copy_active)
987  else
989  }
990  //get thread
991  unsigned int newTid = 0xffffffff;
992  while (!workerPool_.try_pop(newTid)) {
993  usleep(100000);
994  if (quit_threads_.load(std::memory_order_relaxed)) {
995  stop = true;
996  break;
997  }
998  }
999 
1000  if (fms_) {
1001  bool copy_active = false;
1002  for (auto j : tid_active_)
1003  if (j)
1004  copy_active = true;
1005  if (copy_active)
1007  else
1009  }
1010  InputChunk* newChunk = nullptr;
1011  while (!freeChunks_.try_pop(newChunk)) {
1012  usleep(100000);
1013  if (quit_threads_.load(std::memory_order_relaxed)) {
1014  stop = true;
1015  break;
1016  }
1017  }
1018 
1019  if (newChunk == nullptr) {
1020  //return unused tid if we received shutdown (nullptr chunk)
1021  if (newTid != 0xffffffff)
1022  workerPool_.push(newTid);
1023  stop = true;
1024  break;
1025  }
1026  if (stop)
1027  break;
1029 
1030  std::unique_lock<std::mutex> lk(mReader_);
1031 
1032  uint64_t toRead = chunkSize;
1033  if (i == (uint64_t)neededChunks - 1 && neededSize % chunkSize)
1034  toRead = neededSize % chunkSize;
1035  newChunk->reset(i * chunkSize, toRead, i);
1036 
1037  workerJob_[newTid].first = newInputFilePtr;
1038  workerJob_[newTid].second = newChunk;
1039 
1040  //wake up the worker thread
1041  cvReader_[newTid]->notify_one();
1042  }
1043  }
1044  }
1046  //make sure threads finish reading
1047  unsigned int numFinishedThreads = 0;
1048  while (numFinishedThreads < workerThreads_.size()) {
1049  unsigned int tid = 0;
1050  while (!workerPool_.try_pop(tid)) {
1051  usleep(10000);
1052  }
1053  std::unique_lock<std::mutex> lk(mReader_);
1054  thread_quit_signal[tid] = true;
1055  cvReader_[tid]->notify_one();
1056  numFinishedThreads++;
1057  }
1058  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1059  workerThreads_[i]->join();
1060  delete workerThreads_[i];
1061  }
1062 }
1063 
1064 void DAQSource::readWorker(unsigned int tid) {
1065  bool init = true;
1066  threadInit_.exchange(true, std::memory_order_acquire);
1067 
1068  while (true) {
1069  tid_active_[tid] = false;
1070  std::unique_lock<std::mutex> lk(mReader_);
1071  workerJob_[tid].first = nullptr;
1072  workerJob_[tid].first = nullptr;
1073 
1074  assert(!thread_quit_signal[tid]); //should never get it here
1075  workerPool_.push(tid);
1076 
1077  if (init) {
1078  std::unique_lock<std::mutex> lk(startupLock_);
1079  init = false;
1080  startupCv_.notify_one();
1081  }
1082  cvReader_[tid]->wait(lk);
1083 
1084  if (thread_quit_signal[tid])
1085  return;
1086  tid_active_[tid] = true;
1087 
1088  RawInputFile* file;
1089  InputChunk* chunk;
1090 
1091  assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1092 
1093  file = workerJob_[tid].first;
1094  chunk = workerJob_[tid].second;
1095 
1096  bool fitToBuffer = dataMode_->fitToBuffer();
1097 
1098  //resize if multi-chunked reading is not possible
1099  if (fitToBuffer) {
1100  uint64_t accum = 0;
1101  for (auto s : file->diskFileSizes_)
1102  accum += s;
1103  if (accum > eventChunkSize_) {
1104  if (!chunk->resize(accum, maxChunkSize_)) {
1105  edm::LogError("DAQSource")
1106  << "maxChunkSize can not accomodate the file set. Try increasing chunk size and/or chunk maximum size.";
1107  if (file->rawFd_ != -1 && (numConcurrentReads_ == 1 || chunk->offset_ == 0))
1108  close(file->rawFd_);
1109  setExceptionState_ = true;
1110  continue;
1111  } else {
1112  edm::LogInfo("DAQSource") << "chunk size was increased to " << (chunk->size_ >> 20) << " MB";
1113  }
1114  }
1115  }
1116 
1117  //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1118  unsigned int bufferLeftInitial = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1119  const uint16_t readBlocks = chunk->size_ / eventChunkBlock_ + uint16_t(chunk->size_ % eventChunkBlock_ > 0);
1120 
1121  auto readPrimary = [&](uint64_t bufferLeft) {
1122  //BEGIN reading primary file - check if file descriptor is already open
1123  //in multi-threaded chunked mode, only first thread will use already open fd for reading the first file
1124  //fd will not be closed in other case (used by other threads)
1125  int fileDescriptor = -1;
1126  bool fileOpenedHere = false;
1127 
1128  if (numConcurrentReads_ == 1) {
1129  fileDescriptor = file->rawFd_;
1130  file->rawFd_ = -1;
1131  if (fileDescriptor == -1) {
1132  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1133  fileOpenedHere = true;
1134  }
1135  } else {
1136  if (chunk->offset_ == 0) {
1137  fileDescriptor = file->rawFd_;
1138  file->rawFd_ = -1;
1139  if (fileDescriptor == -1) {
1140  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1141  fileOpenedHere = true;
1142  }
1143  } else {
1144  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1145  fileOpenedHere = true;
1146  }
1147  }
1148 
1149  if (fileDescriptor == -1) {
1150  edm::LogError("DAQSource") << "readWorker failed to open file -: " << file->fileName_
1151  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1152  setExceptionState_ = true;
1153  return;
1154  }
1155 
1156  if (fileOpenedHere) { //fast forward to this chunk position
1157  off_t pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1158  if (pos == -1) {
1159  edm::LogError("DAQSource") << "readWorker failed to seek file -: " << file->fileName_
1160  << " fd:" << fileDescriptor << " to offset " << chunk->offset_
1161  << " error: " << strerror(errno);
1162  setExceptionState_ = true;
1163  return;
1164  }
1165  }
1166 
1167  LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1168  << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1169 
1170  size_t skipped = bufferLeft;
1172  for (unsigned int i = 0; i < readBlocks; i++) {
1173  ssize_t last;
1174  edm::LogInfo("DAQSource") << "readWorker read -: " << (int64_t)(chunk->usedSize_ - bufferLeft) << " or "
1175  << (int64_t)eventChunkBlock_;
1176 
1177  //protect against reading into next block
1178  last = ::read(fileDescriptor,
1179  (void*)(chunk->buf_ + bufferLeft),
1180  std::min((int64_t)(chunk->usedSize_ - bufferLeft), (int64_t)eventChunkBlock_));
1181 
1182  if (last < 0) {
1183  edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1184  << " fd:" << fileDescriptor << " last: " << last << " error: " << strerror(errno);
1185  setExceptionState_ = true;
1186  break;
1187  }
1188  if (last > 0) {
1189  bufferLeft += last;
1190  }
1191  if ((uint64_t)last < eventChunkBlock_) { //last read
1192  edm::LogInfo("DAQSource") << "chunkUsedSize" << chunk->usedSize_ << " u-s:" << (chunk->usedSize_ - skipped)
1193  << " ix:" << i * eventChunkBlock_ << " " << (size_t)last;
1194  //check if this is last block if single file, then total read size must match file size
1195  if (file->numFiles_ == 1 && !(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (size_t)last)) {
1196  edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1197  << " fd:" << fileDescriptor << " last:" << last
1198  << " expectedChunkSize:" << chunk->usedSize_
1199  << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last)
1200  << " skipped:" << skipped << " block:" << (i + 1) << "/" << readBlocks
1201  << " error: " << strerror(errno);
1202  setExceptionState_ = true;
1203  }
1204  break;
1205  }
1206  }
1207  if (setExceptionState_)
1208  return;
1209 
1210  file->fileSizes_[0] = bufferLeft;
1211 
1212  if (chunk->offset_ + bufferLeft == file->diskFileSizes_[0] || bufferLeft == chunk->size_) {
1213  //file reading finished using this fd
1214  //or the whole buffer is filled (single sequential file spread over more chunks)
1215  close(fileDescriptor);
1216  fileDescriptor = -1;
1217  } else
1218  assert(fileDescriptor == -1);
1219 
1220  if (fitToBuffer && bufferLeft != file->diskFileSizes_[0]) {
1221  edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[0]
1222  << " read:" << bufferLeft << " expected:" << file->diskFileSizes_[0];
1223  setExceptionState_ = true;
1224  return;
1225  }
1226 
1228  auto diff = end - start;
1229  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1230  LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1231  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1232  << " GB/s)";
1233  };
1234  //END primary function
1235 
1236  //SECONDARY files function
1237  auto readSecondary = [&](uint64_t bufferLeft, unsigned int j) {
1238  size_t fileLen = 0;
1239 
1240  std::string const& addFile = file->fileNames_[j];
1241  int fileDescriptor = open(addFile.c_str(), O_RDONLY);
1242 
1243  if (fileDescriptor < 0) {
1244  edm::LogError("DAQSource") << "readWorker failed to open file -: " << addFile << " fd:" << fileDescriptor
1245  << " error: " << strerror(errno);
1246  setExceptionState_ = true;
1247  return;
1248  }
1249 
1250  LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << addFile << " at offset "
1251  << lseek(fileDescriptor, 0, SEEK_CUR);
1252 
1253  //size_t skipped = 0;//file is newly opened, read with header
1255  for (unsigned int i = 0; i < readBlocks; i++) {
1256  ssize_t last;
1257 
1258  //protect against reading into next block
1259  //use bufferLeft for the write offset
1260  last = ::read(fileDescriptor,
1261  (void*)(chunk->buf_ + bufferLeft),
1262  std::min((uint64_t)file->diskFileSizes_[j], (uint64_t)eventChunkBlock_));
1263 
1264  if (last < 0) {
1265  edm::LogError("DAQSource") << "readWorker failed to read file -: " << addFile << " fd:" << fileDescriptor
1266  << " error: " << strerror(errno);
1267  setExceptionState_ = true;
1268  close(fileDescriptor);
1269  break;
1270  }
1271  if (last > 0) {
1272  bufferLeft += last;
1273  fileLen += last;
1274  file->fileSize_ += last;
1275  }
1276  };
1277 
1278  close(fileDescriptor);
1279  file->fileSizes_[j] = fileLen;
1280  assert(fileLen > 0);
1281 
1282  if (fitToBuffer && fileLen != file->diskFileSizes_[j]) {
1283  edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[j]
1284  << " read:" << fileLen << " expected:" << file->diskFileSizes_[j];
1285  setExceptionState_ = true;
1286  return;
1287  }
1288 
1290  auto diff = end - start;
1291  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1292  LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1293  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1294  << " GB/s)";
1295  };
1296 
1297  //randomized order multi-file loop
1298  for (unsigned int j : file->fileOrder_) {
1299  if (j == 0) {
1300  readPrimary(bufferLeftInitial);
1301  } else
1302  readSecondary(file->bufferOffsets_[j], j);
1303 
1304  if (setExceptionState_)
1305  break;
1306  }
1307 
1308  if (setExceptionState_)
1309  continue;
1310 
1311  //detect FRD event version. Skip file Header if it exists
1312  if (dataMode_->dataVersion() == 0 && chunk->offset_ == 0) {
1313  dataMode_->detectVersion(chunk->buf_, file->rawHeaderSize_);
1314  }
1315  assert(dataMode_->versionCheck());
1316 
1317  chunk->readComplete_ =
1318  true; //this is atomic to secure the sequential buffer fill before becoming available for processing)
1319  file->chunks_[chunk->fileIndex_] = chunk; //put the completed chunk in the file chunk vector at predetermined index
1320  }
1321 }
1322 
1324  quit_threads_ = true;
1325  throw cms::Exception("DAQSource:threadError") << " file reader thread error ";
1326 }
1327 
1329  if (fms_)
1330  fms_->setInState(state);
1331 }
1332 
1334  if (fms_)
1336 }
1337 
1338 bool RawInputFile::advance(unsigned char*& dataPosition, const size_t size) {
1339  //wait for chunk
1340 
1341  while (!waitForChunk(currentChunk_)) {
1343  usleep(100000);
1347  }
1348 
1349  dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1350  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1351 
1352  if (currentLeft < size) {
1353  //we need next chunk
1354  assert(chunks_.size() > currentChunk_ + 1);
1355  while (!waitForChunk(currentChunk_ + 1)) {
1357  usleep(100000);
1361  }
1362  //copy everything to beginning of the first chunk
1363  dataPosition -= chunkPosition_;
1364  assert(dataPosition == chunks_[currentChunk_]->buf_);
1365  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1366  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1367  //set pointers at the end of the old data position
1368  bufferPosition_ += size;
1369  chunkPosition_ = size - currentLeft;
1370  currentChunk_++;
1371  return true;
1372  } else {
1373  chunkPosition_ += size;
1374  bufferPosition_ += size;
1375  return false;
1376  }
1377 }
1378 
1379 void DAQSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1380  std::lock_guard<std::mutex> lock(monlock_);
1381  auto itr = sourceEventsReport_.find(lumi);
1382  if (itr != sourceEventsReport_.end())
1383  itr->second += events;
1384  else
1386 }
1387 
1388 std::pair<bool, unsigned int> DAQSource::getEventReport(unsigned int lumi, bool erase) {
1389  std::lock_guard<std::mutex> lock(monlock_);
1390  auto itr = sourceEventsReport_.find(lumi);
1391  if (itr != sourceEventsReport_.end()) {
1392  std::pair<bool, unsigned int> ret(true, itr->second);
1393  if (erase)
1394  sourceEventsReport_.erase(itr);
1395  return ret;
1396  } else
1397  return std::pair<bool, unsigned int>(false, 0);
1398 }
1399 
1402  if (a.rfind('/') != std::string::npos)
1403  a = a.substr(a.rfind('/'));
1404  if (b.rfind('/') != std::string::npos)
1405  b = b.substr(b.rfind('/'));
1406  return b > a;
1407  });
1408 
1409  if (!listFileNames_.empty()) {
1410  //get run number from first file in the vector
1412  std::string fileStem = fileName.stem().string();
1413  if (fileStem.find("file://") == 0)
1414  fileStem = fileStem.substr(7);
1415  else if (fileStem.find("file:") == 0)
1416  fileStem = fileStem.substr(5);
1417  auto end = fileStem.find('_');
1418 
1419  if (fileStem.find("run") == 0) {
1420  std::string runStr = fileStem.substr(3, end - 3);
1421  try {
1422  //get long to support test run numbers < 2^32
1423  long rval = std::stol(runStr);
1424  edm::LogInfo("DAQSource") << "Autodetected run number in fileListMode -: " << rval;
1425  return rval;
1426  } catch (const std::exception&) {
1427  edm::LogWarning("DAQSource") << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1428  }
1429  }
1430  }
1431  return -1;
1432 }
1433 
1435  if (fileListIndex_ < listFileNames_.size()) {
1436  nextFile = listFileNames_[fileListIndex_];
1437  if (nextFile.find("file://") == 0)
1438  nextFile = nextFile.substr(7);
1439  else if (nextFile.find("file:") == 0)
1440  nextFile = nextFile.substr(5);
1441  std::filesystem::path fileName = nextFile;
1442  std::string fileStem = fileName.stem().string();
1443  if (fileStem.find("ls"))
1444  fileStem = fileStem.substr(fileStem.find("ls") + 2);
1445  if (fileStem.find('_'))
1446  fileStem = fileStem.substr(0, fileStem.find('_'));
1447 
1448  if (!fileListLoopMode_)
1449  ls = std::stoul(fileStem);
1450  else //always starting from LS 1 in loop mode
1451  ls = 1 + loopModeIterationInc_;
1452 
1453  //fsize = 0;
1454  //lockWaitTime = 0;
1455  fileListIndex_++;
1457  } else {
1458  if (!fileListLoopMode_)
1460  else {
1461  //loop through files until interrupted
1463  fileListIndex_ = 0;
1464  return getFile(ls, nextFile, lockWaitTime);
1465  }
1466  }
1467 }
std::string & buBaseRunDir()
size
Write out results.
static const char runNumber_[]
long initFileList()
Definition: DAQSource.cc:1400
Definition: start.py:1
Definition: fillJson.h:27
std::unique_ptr< RawInputFile > currentFile_
Definition: DAQSource.h:134
std::string const & runString() const
std::pair< RawInputFile *, InputChunk * > ReaderInfo
Definition: DAQSource.h:132
int currentFileIndex_
Definition: DAQSource.h:158
edm::RunNumber_t runNumber_
Definition: DAQSource.h:114
void threadError()
Definition: DAQSource.cc:1323
DAQSource * sourceParent_
Definition: DAQSource.h:200
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:261
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:359
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
unsigned int checkEvery_
Definition: DAQSource.h:162
std::unique_ptr< std::thread > readSupervisorThread_
Definition: DAQSource.h:138
bool crc32c_hw_test()
Definition: crc32c.cc:354
uint64_t maxChunkSize_
Definition: DAQSource.h:92
std::mutex startupLock_
Definition: DAQSource.h:155
std::default_random_engine rng_
Definition: DAQSource.h:124
std::map< unsigned int, unsigned int > sourceEventsReport_
Definition: DAQSource.h:173
bool chunkIsFree_
Definition: DAQSource.h:135
std::condition_variable startupCv_
Definition: DAQSource.h:156
ret
prodAgent to be discontinued
std::atomic< bool > threadInit_
Definition: DAQSource.h:171
unsigned int loopModeIterationInc_
Definition: DAQSource.h:112
Next checkNext() override
Definition: DAQSource.cc:254
void read(edm::EventPrincipal &eventPrincipal) override
Definition: DAQSource.cc:548
volatile std::atomic< bool > shutdown_flag
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:458
bool lumisectionDiscarded(unsigned int ls)
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
Definition: DAQSource.cc:1388
void maybeOpenNewLumiSection(const uint32_t lumiSection)
Definition: DAQSource.cc:327
std::mutex fileDeleteLock_
Definition: DAQSource.h:160
const std::string dataModeConfig_
Definition: DAQSource.h:90
std::vector< std::thread * > workerThreads_
Definition: DAQSource.h:140
Log< level::Error, false > LogError
std::mutex mReader_
Definition: DAQSource.h:148
unsigned int numConcurrentLumis() const
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
Definition: DAQSource.cc:1379
StreamID streamID() const
int timeout
Definition: mps_check.py:53
assert(be >=bs)
bool isExceptionOnData(unsigned int ls)
edm::ProcessHistoryID processHistoryID_
Definition: DAQSource.h:117
std::mutex mWakeup_
Definition: DAQSource.h:165
bool waitForChunk(unsigned int chunkid)
unsigned int currentLumiSection_
Definition: DAQSource.h:119
void overrideRunNumber(unsigned int run)
std::atomic< unsigned int > readingFilesCount_
Definition: DAQSource.h:98
evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock()
Definition: DAQSource.cc:347
std::string getEoRFilePathOnFU() const
uint64_t eventChunkSize_
Definition: DAQSource.h:91
U second(std::pair< T, U > const &p)
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
static Timestamp beginOfTime()
Definition: Timestamp.h:77
bool advance(unsigned char *&dataPosition, const size_t size)
Definition: DAQSource.cc:1338
DAQSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
Definition: DAQSource.cc:32
uint64_t eventChunkBlock_
Definition: DAQSource.h:93
evf::EvFDaqDirector::FileStatus getNextDataBlock()
Definition: DAQSource.cc:373
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
int currentLumiSection() const
Definition: DAQSource.h:52
unsigned int fileListIndex_
Definition: DAQSource.h:110
unsigned int eventsThisLumi_
Definition: DAQSource.h:122
const bool fileListLoopMode_
Definition: DAQSource.h:111
std::condition_variable cvWakeup_
Definition: DAQSource.h:166
friend struct InputChunk
Definition: DAQSource.h:43
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:366
void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex)
unsigned int maxBufferedFiles_
Definition: DAQSource.h:96
unsigned char * buf_
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
void readWorker(unsigned int tid)
Definition: DAQSource.cc:1064
unsigned int numBuffers_
Definition: DAQSource.h:95
std::atomic< bool > quit_threads_
Definition: DAQSource.h:152
const bool alwaysStartFromFirstLS_
Definition: DAQSource.h:101
std::shared_ptr< DataMode > dataMode_
Definition: DAQSource.h:176
unsigned int fileIndex_
tbb::concurrent_queue< std::unique_ptr< RawInputFile > > fileQueue_
Definition: DAQSource.h:146
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:391
const bool fileListMode_
Definition: DAQSource.h:109
void createProcessingNotificationMaybe() const
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
std::atomic< bool > readComplete_
Definition: init.py:1
def ls(path, rec=False)
Definition: eostools.py:349
const std::vector< unsigned int > testTCDSFEDRange_
Definition: DAQSource.h:104
tbb::concurrent_vector< InputChunk * > chunks_
unsigned int getStartLumisectionFromEnv() const
def getRunNumber(filename)
uint32_t eventRunNumber_
Definition: DAQSource.h:120
void setInStateSup(FastMonState::InputState inputState)
unsigned long long uint64_t
Definition: Time.h:13
void dataArranger()
Definition: DAQSource.cc:591
std::vector< std::string > listFileNames_
Definition: DAQSource.h:105
~DAQSource() override
Definition: DAQSource.cc:185
def load(fileName)
Definition: svgfig.py:547
uint32_t chunkPosition_
std::vector< int > streamFileTracker_
Definition: DAQSource.h:161
double b
Definition: hdecay.h:120
void rewind_() override
Definition: DAQSource.cc:589
std::string getEoRFileName() const
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:374
std::vector< unsigned int > tid_active_
Definition: DAQSource.h:150
std::vector< int > const & getBUBaseDirsNSources() const
void stoppedLookingForFile(unsigned int lumi)
std::vector< unsigned int > thread_quit_signal
Definition: DAQSource.h:153
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:360
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:463
unsigned int readBlocks_
Definition: DAQSource.h:94
HLT enums.
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:362
ItemTypeInfo state() const
Definition: InputSource.h:361
double a
Definition: hdecay.h:121
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:264
void setMonStateSup(evf::FastMonState::InputState state)
Definition: DAQSource.cc:1333
friend class RawInputFile
Definition: DAQSource.h:42
bool setExceptionState_
Definition: DAQSource.h:154
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: DAQSource.cc:223
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
Definition: DAQSource.h:149
unsigned int getLumisectionToStart() const
bool startedSupervisorThread_
Definition: DAQSource.h:137
evf::EvFDaqDirector * daqDirector_
Definition: DAQSource.h:88
tbb::concurrent_queue< InputChunk * > freeChunks_
Definition: DAQSource.h:145
unsigned int numConcurrentReads_
Definition: DAQSource.h:97
int addFile(MEStore &micromes, int fd)
Definition: fastHadd.cc:352
unsigned int RunNumber_t
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
tbb::concurrent_queue< unsigned int > workerPool_
Definition: DAQSource.h:142
bool resize(uint64_t wantedSize, uint64_t maxSize)
Log< level::Warning, false > LogWarning
unsigned int currentChunk_
std::vector< ReaderInfo > workerJob_
Definition: DAQSource.h:143
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint64_t &lockWaitTime)
Definition: DAQSource.cc:1434
void setMonState(evf::FastMonState::InputState state)
Definition: DAQSource.cc:1328
void setFMS(evf::FastMonitoringService *fms)
bool exceptionState()
Definition: DAQSource.h:78
const bool verifyChecksum_
Definition: DAQSource.h:102
void readSupervisor()
Definition: DAQSource.cc:593
int events
Power< A, B >::type pow(const A &a, const B &b)
Definition: Power.h:29
std::vector< std::string > const & getBUBaseDirs() const
def move(src, dest)
Definition: eostools.py:511
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
Definition: DAQSource.h:159
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
#define LogDebug(id)
void setInState(FastMonState::InputState inputState)
std::mutex monlock_
Definition: DAQSource.h:174
uint32_t bufferPosition_