CMS 3D CMS Logo

DAQSource.cc
Go to the documentation of this file.
1 #include <sstream>
2 #include <unistd.h>
3 #include <vector>
4 #include <chrono>
5 #include <algorithm>
6 
11 
21 
26 
27 //JSON file reader
29 
30 using namespace evf::FastMonState;
31 
33  : edm::RawInputSource(pset, desc),
34  dataModeConfig_(pset.getUntrackedParameter<std::string>("dataMode")),
35  eventChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkSize")) << 20),
36  maxChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("maxChunkSize")) << 20),
37  eventChunkBlock_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkBlock")) << 20),
38  numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers")),
39  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles")),
40  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
41  verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum")),
42  useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID")),
43  testTCDSFEDRange_(pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange")),
44  listFileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames")),
45  fileListMode_(pset.getUntrackedParameter<bool>("fileListMode")),
46  fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
47  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
48  processHistoryID_(),
49  currentLumiSection_(0),
50  eventsThisLumi_(0),
51  rng_(std::chrono::system_clock::now().time_since_epoch().count()) {
52  char thishost[256];
53  gethostname(thishost, 255);
54 
55  if (maxChunkSize_ == 0)
57  else if (maxChunkSize_ < eventChunkSize_)
58  throw cms::Exception("DAQSource::DAQSource") << "maxChunkSize must be equal or larger than eventChunkSize";
59 
60  if (eventChunkBlock_ == 0)
63  throw cms::Exception("DAQSource::DAQSource") << "eventChunkBlock must be equal or smaller than eventChunkSize";
64 
65  edm::LogInfo("DAQSource") << "Construction. read-ahead chunk size -: " << std::endl
66  << (eventChunkSize_ >> 20) << " MB on host " << thishost << " in mode " << dataModeConfig_;
67 
68  uint16_t MINTCDSuTCAFEDID = FEDNumbering::MINTCDSuTCAFEDID;
69  uint16_t MAXTCDSuTCAFEDID = FEDNumbering::MAXTCDSuTCAFEDID;
70 
71  if (!testTCDSFEDRange_.empty()) {
72  if (testTCDSFEDRange_.size() != 2) {
73  throw cms::Exception("DAQSource::DAQSource") << "Invalid TCDS Test FED range parameter";
74  }
75  MINTCDSuTCAFEDID = testTCDSFEDRange_[0];
76  MAXTCDSuTCAFEDID = testTCDSFEDRange_[1];
77  }
78 
79  //load mode class based on parameter
80  if (dataModeConfig_ == "FRD") {
81  dataMode_.reset(new DataModeFRD(this));
82  } else if (dataModeConfig_ == "FRDStriped") {
83  dataMode_.reset(new DataModeFRDStriped(this));
84  } else
85  throw cms::Exception("DAQSource::DAQSource") << "Unknown data mode " << dataModeConfig_;
86 
88 
89  dataMode_->setTCDSSearchRange(MINTCDSuTCAFEDID, MAXTCDSuTCAFEDID);
90  dataMode_->setTesting(pset.getUntrackedParameter<bool>("testing", false));
91 
92  long autoRunNumber = -1;
93  if (fileListMode_) {
94  autoRunNumber = initFileList();
95  if (!fileListLoopMode_) {
96  if (autoRunNumber < 0)
97  throw cms::Exception("DAQSource::DAQSource") << "Run number not found from filename";
98  //override run number
99  runNumber_ = (edm::RunNumber_t)autoRunNumber;
100  daqDirector_->overrideRunNumber((unsigned int)autoRunNumber);
101  }
102  }
103 
104  dataMode_->makeDirectoryEntries(daqDirector_->getBUBaseDirs(), daqDirector_->runString());
105 
106  auto& daqProvenanceHelpers = dataMode_->makeDaqProvenanceHelpers();
107  for (const auto& daqProvenanceHelper : daqProvenanceHelpers)
108  processHistoryID_ = daqProvenanceHelper->daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
109  setNewRun();
110  //todo:autodetect from file name (assert if names differ)
112 
113  //make sure that chunk size is N * block size
118 
119  if (!numBuffers_)
120  throw cms::Exception("DAQSource::DAQSource") << "no reading enabled with numBuffers parameter 0";
121 
123  assert(numBuffers_ > 1);
124  readingFilesCount_ = 0;
125 
126  if (!crc32c_hw_test())
127  edm::LogError("DAQSource::DAQSource") << "Intel crc32c checksum computation unavailable";
128 
129  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
130  if (fileListMode_) {
131  try {
133  } catch (cms::Exception const&) {
134  edm::LogInfo("DAQSource") << "No FastMonitoringService found in the configuration";
135  }
136  } else {
138  if (!fms_) {
139  throw cms::Exception("DAQSource") << "FastMonitoringService not found";
140  }
141  }
142 
144  if (!daqDirector_)
145  cms::Exception("DAQSource") << "EvFDaqDirector not found";
146 
147  edm::LogInfo("DAQSource") << "EvFDaqDirector/Source configured to use file service";
148  //set DaqDirector to delete files in preGlobalEndLumi callback
150  if (fms_) {
152  fms_->setInputSource(this);
155  }
156  //should delete chunks when run stops
157  for (unsigned int i = 0; i < numBuffers_; i++) {
159  }
160 
161  quit_threads_ = false;
162 
163  //prepare data shared by threads
164  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
165  thread_quit_signal.push_back(false);
166  workerJob_.push_back(ReaderInfo(nullptr, nullptr));
167  cvReader_.push_back(std::make_unique<std::condition_variable>());
168  tid_active_.push_back(0);
169  }
170 
171  //start threads
172  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
173  //wait for each thread to complete initialization
174  std::unique_lock<std::mutex> lk(startupLock_);
175  workerThreads_.push_back(new std::thread(&DAQSource::readWorker, this, i));
176  startupCv_.wait(lk);
177  }
178 
179  runAuxiliary()->setProcessHistoryID(processHistoryID_);
180 }
181 
183  quit_threads_ = true;
184 
185  //delete any remaining open files
186  if (!fms_ || !fms_->exceptionDetected()) {
187  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
188  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
189  it->second.reset();
190  } else {
191  //skip deleting files with exception
192  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
193  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
194  if (fms_->isExceptionOnData(it->second->lumi_))
195  it->second->unsetDeleteFile();
196  else
197  it->second.reset();
198  }
199  //disable deleting current file with exception
200  if (currentFile_.get())
201  if (fms_->isExceptionOnData(currentFile_->lumi_))
202  currentFile_->unsetDeleteFile();
203  }
204 
206  readSupervisorThread_->join();
207  } else {
208  //join aux threads in case the supervisor thread was not started
209  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
210  std::unique_lock<std::mutex> lk(mReader_);
211  thread_quit_signal[i] = true;
212  cvReader_[i]->notify_one();
213  lk.unlock();
214  workerThreads_[i]->join();
215  delete workerThreads_[i];
216  }
217  }
218 }
219 
222  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk (unified)");
223  desc.addUntracked<std::string>("dataMode", "FRD")->setComment("Data mode: event 'FRD', 'FRSStriped', 'ScoutingRun2'");
224  desc.addUntracked<unsigned int>("eventChunkSize", 64)->setComment("Input buffer (chunk) size");
225  desc.addUntracked<unsigned int>("maxChunkSize", 0)
226  ->setComment("Maximum chunk size allowed if buffer is resized to fit data. If 0 is specifier, use chunk size");
227  desc.addUntracked<unsigned int>("eventChunkBlock", 0)
228  ->setComment(
229  "Block size used in a single file read call (must be smaller or equal to the initial chunk buffer size). If "
230  "0 is specified, use chunk size.");
231 
232  desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
233  desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
234  ->setComment("Maximum number of simultaneously buffered raw files");
235  desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
236  ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
237  desc.addUntracked<bool>("verifyChecksum", true)
238  ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
239  desc.addUntracked<bool>("useL1EventID", false)
240  ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
241  desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
242  ->setComment("[min, max] range to search for TCDS FED ID in test setup");
243  desc.addUntracked<bool>("fileListMode", false)
244  ->setComment("Use fileNames parameter to directly specify raw files to open");
245  desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
246  ->setComment("file list used when fileListMode is enabled");
247  desc.setAllowAnything();
248  descriptions.add("source", desc);
249 }
250 
253  std::unique_lock<std::mutex> lk(startupLock_);
254 
255  //this thread opens new files and dispatches reading to worker readers
256  readSupervisorThread_ = std::make_unique<std::thread>(&DAQSource::readSupervisor, this);
258 
259  startupCv_.wait(lk);
260  }
261 
262  //signal hltd to start event accounting
263  if (!currentLumiSection_)
266 
267  auto nextEvent = [this]() {
268  auto getNextEvent = [this]() {
269  //for some models this is always true (if one event is one block)
270  if (dataMode_->dataBlockCompleted()) {
271  return getNextDataBlock();
272  } else {
273  return getNextEventFromDataBlock();
274  }
275  };
276 
278  while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
279  if (edm::shutdown_flag.load(std::memory_order_relaxed))
280  break;
281  }
282  return status;
283  };
284 
285  switch (nextEvent()) {
287  //maybe create EoL file in working directory before ending run
288  struct stat buf;
289  //also create EoR file in FU data directory
290  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
291  if (!eorFound) {
292  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
293  O_RDWR | O_CREAT,
294  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
295  close(eor_fd);
296  }
298  eventsThisLumi_ = 0;
300  edm::LogInfo("DAQSource") << "----------------RUN ENDED----------------";
301  return Next::kStop;
302  }
304  //this is not reachable
305  return Next::kEvent;
306  }
308  //std::cout << "--------------NEW LUMI---------------" << std::endl;
309  return Next::kEvent;
310  }
311  default: {
314  else
315  eventRunNumber_ = dataMode_->run();
316 
317  setEventCached();
318 
319  return Next::kEvent;
320  }
321  }
322 }
323 
324 void DAQSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
325  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
326  currentLumiSection_ = lumiSection;
327 
329 
330  timeval tv;
331  gettimeofday(&tv, nullptr);
332  const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
333 
335  runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
336 
337  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
338  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
339 
340  edm::LogInfo("DAQSource") << "New lumi section was opened. LUMI -: " << lumiSection;
341  }
342 }
343 
346 
347  bool found = dataMode_->nextEventView();
348  //file(s) completely parsed
349  if (!found) {
350  if (dataMode_->dataBlockInitialized()) {
351  dataMode_->setDataBlockInitialized(false);
352  //roll position to the end of the file to close it
353  currentFile_->bufferPosition_ = currentFile_->fileSize_;
354  }
356  }
357 
358  if (verifyChecksum_ && !dataMode_->checksumValid()) {
359  if (fms_)
361  throw cms::Exception("DAQSource::getNextEventFromDataBlock") << dataMode_->getChecksumError();
362  }
364 
365  currentFile_->nProcessed_++;
366 
368 }
369 
371  if (setExceptionState_)
372  threadError();
373  if (!currentFile_.get()) {
376  if (!fileQueue_.try_pop(currentFile_)) {
377  //sleep until wakeup (only in single-buffer mode) or timeout
378  std::unique_lock<std::mutex> lkw(mWakeup_);
379  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
381  }
382  status = currentFile_->status_;
385  currentFile_.reset();
386  return status;
387  } else if (status == evf::EvFDaqDirector::runAbort) {
388  throw cms::Exception("DAQSource::getNextDataBlock") << "Run has been aborted by the input source reader thread";
389  } else if (status == evf::EvFDaqDirector::newLumi) {
391  if (currentFile_->lumi_ > currentLumiSection_) {
393  eventsThisLumi_ = 0;
395  }
396  currentFile_.reset();
397  return status;
398  } else if (status == evf::EvFDaqDirector::newFile) {
400  } else
401  assert(false);
402  }
404 
405  //file is empty
406  if (!currentFile_->fileSize_) {
408  //try to open new lumi
409  assert(currentFile_->nChunks_ == 0);
410  if (currentFile_->lumi_ > currentLumiSection_) {
412  eventsThisLumi_ = 0;
414  }
415  //immediately delete empty file
416  currentFile_.reset();
418  }
419 
420  //file is finished
421  if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
423  //release last chunk (it is never released elsewhere)
424  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
425  if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
426  throw cms::Exception("DAQSource::getNextDataBlock")
427  << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
428  << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
429  }
431  //put the file in pending delete list;
432  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
433  filesToDelete_.push_back(
434  std::pair<int, std::unique_ptr<RawInputFile>>(currentFileIndex_, std::move(currentFile_)));
435  } else {
436  //in single-thread and stream jobs, events are already processed
437  currentFile_.reset();
438  }
440  }
441 
442  //assert(currentFile_->status_ == evf::EvFDaqDirector::newFile);
443 
444  //handle RAW file header
445  if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
446  if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
447  if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
448  throw cms::Exception("DAQSource::getNextDataBlock") << "Premature end of input file while reading file header";
449 
450  edm::LogWarning("DAQSource") << "File with only raw header and no events received in LS " << currentFile_->lumi_;
451  if (currentFile_->lumi_ > currentLumiSection_) {
453  eventsThisLumi_ = 0;
455  }
456  }
457 
458  //advance buffer position to skip file header (chunk will be acquired later)
459  currentFile_->advance(currentFile_->rawHeaderSize_);
460  }
461 
462  //file is too short to fit event header
463  if (currentFile_->fileSizeLeft() < dataMode_->headerSize())
464  throw cms::Exception("DAQSource::getNextDataBlock")
465  << "Premature end of input file while reading event header. Missing: "
466  << (dataMode_->headerSize() - currentFile_->fileSizeLeft()) << " bytes";
467 
468  //multibuffer mode
469  //wait for the current chunk to become added to the vector
471  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
472  usleep(10000);
473  if (setExceptionState_)
474  threadError();
475  }
477 
478  chunkIsFree_ = false;
479  bool chunkEnd;
480  unsigned char* dataPosition;
481 
482  //read event header, copy it to a single chunk if necessary
483  chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize());
484 
485  //get buffer size of current chunk (can be resized)
486  uint64_t currentChunkSize = currentFile_->currentChunkSize();
487 
488  //prepare view based on header that was read
489  dataMode_->makeDataBlockView(dataPosition, currentChunkSize, currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
490 
491  //check that payload size is within the file
492  const size_t msgSize = dataMode_->dataBlockSize() - dataMode_->headerSize();
493 
494  if (currentFile_->fileSizeLeft() < (int64_t)msgSize)
495  throw cms::Exception("DAQSource::getNextEventDataBlock")
496  << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
497  << ") while parsing block";
498 
499  //for cross-buffer models
500  if (chunkEnd) {
501  //header was at the chunk boundary, move payload into the starting chunk as well. No need to update block view here
502  currentFile_->moveToPreviousChunk(msgSize, dataMode_->headerSize());
503  //mark to release old chunk
504  chunkIsFree_ = true;
505  } else if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) {
506  //header was contiguous, but payload does not fit in the chunk
507  //rewind to header start position and then together with payload will be copied together to the old chunk
508  currentFile_->rewindChunk(dataMode_->headerSize());
509 
511 
512  //do the copy to the beginning of the starting chunk. move pointers for next event in the next chunk
513  chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize() + msgSize);
514  assert(chunkEnd);
515  //mark to release old chunk
516  chunkIsFree_ = true;
517 
519  //header and payload is moved, update view
520  dataMode_->makeDataBlockView(
521  dataPosition, currentFile_->currentChunkSize(), currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
522  } else {
523  //everything is in a single chunk, only move pointers forward
524  chunkEnd = currentFile_->advance(dataPosition, msgSize);
525  assert(!chunkEnd);
526  chunkIsFree_ = false;
527  }
528 
529  //sanity-check check that the buffer position has not exceeded file size after preparing event
530  if (currentFile_->fileSize_ < currentFile_->bufferPosition_)
531  throw cms::Exception("DAQSource::getNextEventDataBlock")
532  << "Exceeded file size by " << (currentFile_->bufferPosition_ - currentFile_->fileSize_);
533 
534  //prepare event
535  return getNextEventFromDataBlock();
536 }
537 
538 void DAQSource::read(edm::EventPrincipal& eventPrincipal) {
540 
541  dataMode_->readEvent(eventPrincipal);
542 
543  eventsThisLumi_++;
545 
546  //resize vector if needed
547  while (streamFileTracker_.size() <= eventPrincipal.streamID())
548  streamFileTracker_.push_back(-1);
549 
550  streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
551 
552  //this old file check runs no more often than every 10 events
553  if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
554  //delete files that are not in processing
555  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
556  auto it = filesToDelete_.begin();
557  while (it != filesToDelete_.end()) {
558  bool fileIsBeingProcessed = false;
559  for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
560  if (it->first == streamFileTracker_.at(i)) {
561  fileIsBeingProcessed = true;
562  break;
563  }
564  }
565  if (!fileIsBeingProcessed && !(fms_ && fms_->isExceptionOnData(it->second->lumi_))) {
566  it = filesToDelete_.erase(it);
567  } else
568  it++;
569  }
570  }
571  if (dataMode_->dataBlockCompleted() && chunkIsFree_) {
572  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
573  chunkIsFree_ = false;
574  }
576  return;
577 }
578 
580 
582 
584  bool stop = false;
585  unsigned int currentLumiSection = 0;
586 
587  {
588  std::unique_lock<std::mutex> lk(startupLock_);
589  startupCv_.notify_one();
590  }
591 
592  uint32_t ls = 0;
593  uint32_t monLS = 1;
594  uint32_t lockCount = 0;
595  uint64_t sumLockWaitTimeUs = 0.;
596 
597  bool requireHeader = dataMode_->requireHeader();
598 
599  while (!stop) {
600  //wait for at least one free thread and chunk
601  int counter = 0;
602 
603  while (workerPool_.empty() || freeChunks_.empty() || readingFilesCount_ >= maxBufferedFiles_) {
604  //report state to monitoring
605  if (fms_) {
606  bool copy_active = false;
607  for (auto j : tid_active_)
608  if (j)
609  copy_active = true;
612  else if (freeChunks_.empty()) {
613  if (copy_active)
615  else
617  } else {
618  if (copy_active)
620  else
622  }
623  }
624  std::unique_lock<std::mutex> lkw(mWakeup_);
625  //sleep until woken up by condition or a timeout
626  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
627  counter++;
628  if (!(counter % 6000)) {
629  edm::LogWarning("FedRawDataInputSource")
630  << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
631  << ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
632  << " / " << maxBufferedFiles_;
633  }
634  LogDebug("DAQSource") << "No free chunks or threads...";
635  } else {
636  assert(!workerPool_.empty() || freeChunks_.empty());
637  }
638  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
639  stop = true;
640  break;
641  }
642  }
643  //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
644 
645  if (stop)
646  break;
647 
648  //look for a new file
649  std::string nextFile;
650  int64_t fileSizeFromMetadata;
651 
652  if (fms_) {
655  }
656  bool fitToBuffer = dataMode_->fitToBuffer();
657 
659  uint16_t rawHeaderSize = 0;
660  uint32_t lsFromRaw = 0;
661  int32_t serverEventsInNewFile = -1;
662  int rawFd = -1;
663 
664  int backoff_exp = 0;
665 
666  //entering loop which tries to grab new file from ramdisk
668  //check if hltd has signalled to throttle input
669  counter = 0;
670  while (daqDirector_->inputThrottled()) {
671  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
672  break;
673 
674  unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
675  unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
676  unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
677  bool hasDiscardedLumi = false;
678  for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
680  edm::LogWarning("DAQSource") << "Source detected that the lumisection is discarded -: " << i;
681  hasDiscardedLumi = true;
682  break;
683  }
684  }
685  if (hasDiscardedLumi)
686  break;
687 
689  if (!(counter % 50))
690  edm::LogWarning("DAQSource") << "Input throttled detected, reading files is paused...";
691  usleep(100000);
692  counter++;
693  }
694 
695  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
696  stop = true;
697  break;
698  }
699 
700  assert(rawFd == -1);
701  uint64_t thisLockWaitTimeUs = 0.;
703  if (fileListMode_) {
704  //return LS if LS not set, otherwise return file
705  status = getFile(ls, nextFile, thisLockWaitTimeUs);
707  uint16_t rawDataType;
709  rawFd,
710  rawHeaderSize,
711  rawDataType,
712  lsFromRaw,
713  serverEventsInNewFile,
714  fileSizeFromMetadata,
715  requireHeader,
716  false,
717  false) != 0) {
718  //error
719  setExceptionState_ = true;
720  stop = true;
721  break;
722  }
723  }
724  } else {
726  ls,
727  nextFile,
728  rawFd,
729  rawHeaderSize, //which format?
730  serverEventsInNewFile,
731  fileSizeFromMetadata,
732  thisLockWaitTimeUs,
733  requireHeader);
734  }
735 
737 
738  //cycle through all remaining LS even if no files get assigned
741 
742  //monitoring of lock wait time
743  if (thisLockWaitTimeUs > 0.)
744  sumLockWaitTimeUs += thisLockWaitTimeUs;
745  lockCount++;
746  if (ls > monLS) {
747  monLS = ls;
748  if (lockCount)
749  if (fms_)
750  fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
751  lockCount = 0;
752  sumLockWaitTimeUs = 0;
753  }
754 
756  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded));
757  stop = true;
758  break;
759  }
760 
761  //error from filelocking function
763  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
764  stop = true;
765  break;
766  }
767  //queue new lumisection
768  if (ls > currentLumiSection) {
769  //new file service
772  //start transitions from LS specified by env, continue if not reached
773  if (ls < daqDirector_->getStartLumisectionFromEnv()) {
774  //skip file if from earlier LS than specified by env
775  if (rawFd != -1) {
776  close(rawFd);
777  rawFd = -1;
778  }
780  continue;
781  } else {
782  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
783  }
784  } else if (ls < 100) {
785  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
786  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
787 
788  for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
789  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
790  }
791  } else {
792  //start from current LS
793  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
794  }
795  } else {
796  //queue all lumisections after last one seen to avoid gaps
797  for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
798  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
799  }
800  }
802  }
803  //else
805  edm::LogError("DAQSource") << "Got old LS (" << ls
806  << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
807  << ". Aborting execution." << std::endl;
808  if (rawFd != -1)
809  close(rawFd);
810  rawFd = -1;
811  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
812  stop = true;
813  break;
814  }
815 
816  int dbgcount = 0;
819  dbgcount++;
820  if (!(dbgcount % 20))
821  LogDebug("DAQSource") << "No file for me... sleep and try again...";
822 
823  backoff_exp = std::min(4, backoff_exp); // max 1.6 seconds
824  //backoff_exp=0; // disabled!
825  int sleeptime = (int)(100000. * pow(2, backoff_exp));
826  usleep(sleeptime);
827  backoff_exp++;
828  } else
829  backoff_exp = 0;
830  }
831  //end of file grab loop, parse result
834  LogDebug("DAQSource") << "The director says to grab -: " << nextFile;
835 
836  std::string rawFile;
837  //file service will report raw extension
838  rawFile = nextFile;
839 
840  struct stat st;
841  int stat_res = stat(rawFile.c_str(), &st);
842  if (stat_res == -1) {
843  edm::LogError("DAQSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
844  setExceptionState_ = true;
845  break;
846  }
847  uint64_t fileSize = st.st_size;
848 
849  if (fms_) {
853  }
854  int eventsInNewFile;
855  if (fileListMode_) {
856  if (fileSize == 0)
857  eventsInNewFile = 0;
858  else
859  eventsInNewFile = -1;
860  } else {
861  eventsInNewFile = serverEventsInNewFile;
862  assert(eventsInNewFile >= 0);
863  assert((eventsInNewFile > 0) ==
864  (fileSize > rawHeaderSize)); //file without events must be empty or contain only header
865  }
866 
867  std::pair<bool, std::vector<std::string>> additionalFiles =
868  dataMode_->defineAdditionalFiles(rawFile, fileListMode_);
869  if (!additionalFiles.first) {
870  //skip secondary files from file broker
871  if (rawFd > -1)
872  close(rawFd);
873  continue;
874  }
875 
876  std::unique_ptr<RawInputFile> newInputFile(new RawInputFile(evf::EvFDaqDirector::FileStatus::newFile,
877  ls,
878  rawFile,
879  !fileListMode_,
880  rawFd,
881  fileSize,
882  rawHeaderSize, //for which format
883  0,
884  eventsInNewFile,
885  this));
886 
887  uint64_t neededSize = fileSize;
888  for (const auto& addFile : additionalFiles.second) {
889  struct stat buf;
890  //wait for secondary files to appear
891  unsigned int fcnt = 0;
892  while (stat(addFile.c_str(), &buf) != 0) {
893  if (fileListMode_) {
894  edm::LogError("DAQSource") << "additional file is missing -: " << addFile;
895  stop = true;
896  setExceptionState_ = true;
897  break;
898  }
899  usleep(10000);
900  fcnt++;
901  //report and EoR check every 30 seconds
902  if ((fcnt && fcnt % 3000 == 0) || quit_threads_.load(std::memory_order_relaxed)) {
903  edm::LogWarning("DAQSource") << "Additional file is still missing after 30 seconds -: " << addFile;
904  struct stat bufEoR;
905  auto secondaryPath = std::filesystem::path(addFile).parent_path();
907  std::string mainEoR = (std::filesystem::path(daqDirector_->buBaseRunDir()) / eorName).generic_string();
908  std::string secondaryEoR = (secondaryPath / eorName).generic_string();
909  bool prematureEoR = false;
910  if (stat(secondaryEoR.c_str(), &bufEoR) == 0) {
911  if (stat(addFile.c_str(), &bufEoR) != 0) {
912  edm::LogError("DAQSource")
913  << "EoR file appeared in -: " << secondaryPath << " while waiting for index file " << addFile;
914  prematureEoR = true;
915  }
916  } else if (stat(mainEoR.c_str(), &bufEoR) == 0) {
917  //wait another 10 seconds
918  usleep(10000000);
919  if (stat(addFile.c_str(), &bufEoR) != 0) {
920  edm::LogError("DAQSource")
921  << "Main EoR file appeared -: " << mainEoR << " while waiting for index file " << addFile;
922  prematureEoR = true;
923  }
924  }
925  if (prematureEoR) {
926  //queue EoR since this is not FU error
927  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded, 0));
928  stop = true;
929  break;
930  }
931  }
932 
933  if (quit_threads_) {
934  edm::LogError("DAQSource") << "Quitting while waiting for file -: " << addFile;
935  stop = true;
936  setExceptionState_ = true;
937  break;
938  }
939  }
940  LogDebug("DAQSource") << " APPEND NAME " << addFile;
941  if (stop)
942  break;
943 
944  newInputFile->appendFile(addFile, buf.st_size);
945  neededSize += buf.st_size;
946  }
947  if (stop)
948  break;
949 
950  //calculate number of needed chunks and size if resizing will be applied
951  uint16_t neededChunks;
952  uint64_t chunkSize;
953 
954  if (fitToBuffer) {
955  chunkSize = std::min(maxChunkSize_, std::max(eventChunkSize_, neededSize));
956  neededChunks = 1;
957  } else {
958  chunkSize = eventChunkSize_;
959  neededChunks = neededSize / eventChunkSize_ + uint16_t((neededSize % eventChunkSize_) > 0);
960  }
961  newInputFile->setChunks(neededChunks);
962 
963  newInputFile->randomizeOrder(rng_);
964 
966  auto newInputFilePtr = newInputFile.get();
967  fileQueue_.push(std::move(newInputFile));
968 
969  for (size_t i = 0; i < neededChunks; i++) {
970  if (fms_) {
971  bool copy_active = false;
972  for (auto j : tid_active_)
973  if (j)
974  copy_active = true;
975  if (copy_active)
977  else
979  }
980  //get thread
981  unsigned int newTid = 0xffffffff;
982  while (!workerPool_.try_pop(newTid)) {
983  usleep(100000);
984  if (quit_threads_.load(std::memory_order_relaxed)) {
985  stop = true;
986  break;
987  }
988  }
989 
990  if (fms_) {
991  bool copy_active = false;
992  for (auto j : tid_active_)
993  if (j)
994  copy_active = true;
995  if (copy_active)
997  else
999  }
1000  InputChunk* newChunk = nullptr;
1001  while (!freeChunks_.try_pop(newChunk)) {
1002  usleep(100000);
1003  if (quit_threads_.load(std::memory_order_relaxed)) {
1004  stop = true;
1005  break;
1006  }
1007  }
1008 
1009  if (newChunk == nullptr) {
1010  //return unused tid if we received shutdown (nullptr chunk)
1011  if (newTid != 0xffffffff)
1012  workerPool_.push(newTid);
1013  stop = true;
1014  break;
1015  }
1016  if (stop)
1017  break;
1019 
1020  std::unique_lock<std::mutex> lk(mReader_);
1021 
1022  uint64_t toRead = chunkSize;
1023  if (i == (uint64_t)neededChunks - 1 && neededSize % chunkSize)
1024  toRead = neededSize % chunkSize;
1025  newChunk->reset(i * chunkSize, toRead, i);
1026 
1027  workerJob_[newTid].first = newInputFilePtr;
1028  workerJob_[newTid].second = newChunk;
1029 
1030  //wake up the worker thread
1031  cvReader_[newTid]->notify_one();
1032  }
1033  }
1034  }
1036  //make sure threads finish reading
1037  unsigned int numFinishedThreads = 0;
1038  while (numFinishedThreads < workerThreads_.size()) {
1039  unsigned int tid = 0;
1040  while (!workerPool_.try_pop(tid)) {
1041  usleep(10000);
1042  }
1043  std::unique_lock<std::mutex> lk(mReader_);
1044  thread_quit_signal[tid] = true;
1045  cvReader_[tid]->notify_one();
1046  numFinishedThreads++;
1047  }
1048  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1049  workerThreads_[i]->join();
1050  delete workerThreads_[i];
1051  }
1052 }
1053 
1054 void DAQSource::readWorker(unsigned int tid) {
1055  bool init = true;
1056  threadInit_.exchange(true, std::memory_order_acquire);
1057 
1058  while (true) {
1059  tid_active_[tid] = false;
1060  std::unique_lock<std::mutex> lk(mReader_);
1061  workerJob_[tid].first = nullptr;
1062  workerJob_[tid].first = nullptr;
1063 
1064  assert(!thread_quit_signal[tid]); //should never get it here
1065  workerPool_.push(tid);
1066 
1067  if (init) {
1068  std::unique_lock<std::mutex> lk(startupLock_);
1069  init = false;
1070  startupCv_.notify_one();
1071  }
1072  cvReader_[tid]->wait(lk);
1073 
1074  if (thread_quit_signal[tid])
1075  return;
1076  tid_active_[tid] = true;
1077 
1078  RawInputFile* file;
1079  InputChunk* chunk;
1080 
1081  assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1082 
1083  file = workerJob_[tid].first;
1084  chunk = workerJob_[tid].second;
1085 
1086  bool fitToBuffer = dataMode_->fitToBuffer();
1087 
1088  //resize if multi-chunked reading is not possible
1089  if (fitToBuffer) {
1090  uint64_t accum = 0;
1091  for (auto s : file->diskFileSizes_)
1092  accum += s;
1093  if (accum > eventChunkSize_) {
1094  if (!chunk->resize(accum, maxChunkSize_)) {
1095  edm::LogError("DAQSource")
1096  << "maxChunkSize can not accomodate the file set. Try increasing chunk size and/or chunk maximum size.";
1097  if (file->rawFd_ != -1 && (numConcurrentReads_ == 1 || chunk->offset_ == 0))
1098  close(file->rawFd_);
1099  setExceptionState_ = true;
1100  continue;
1101  } else {
1102  edm::LogInfo("DAQSource") << "chunk size was increased to " << (chunk->size_ >> 20) << " MB";
1103  }
1104  }
1105  }
1106 
1107  //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1108  unsigned int bufferLeftInitial = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1109  const uint16_t readBlocks = chunk->size_ / eventChunkBlock_ + uint16_t(chunk->size_ % eventChunkBlock_ > 0);
1110 
1111  auto readPrimary = [&](uint64_t bufferLeft) {
1112  //BEGIN reading primary file - check if file descriptor is already open
1113  //in multi-threaded chunked mode, only first thread will use already open fd for reading the first file
1114  //fd will not be closed in other case (used by other threads)
1115  int fileDescriptor = -1;
1116  bool fileOpenedHere = false;
1117 
1118  if (numConcurrentReads_ == 1) {
1119  fileDescriptor = file->rawFd_;
1120  file->rawFd_ = -1;
1121  if (fileDescriptor == -1) {
1122  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1123  fileOpenedHere = true;
1124  }
1125  } else {
1126  if (chunk->offset_ == 0) {
1127  fileDescriptor = file->rawFd_;
1128  file->rawFd_ = -1;
1129  if (fileDescriptor == -1) {
1130  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1131  fileOpenedHere = true;
1132  }
1133  } else {
1134  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1135  fileOpenedHere = true;
1136  }
1137  }
1138 
1139  if (fileDescriptor == -1) {
1140  edm::LogError("DAQSource") << "readWorker failed to open file -: " << file->fileName_
1141  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1142  setExceptionState_ = true;
1143  return;
1144  }
1145 
1146  if (fileOpenedHere) { //fast forward to this chunk position
1147  off_t pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1148  if (pos == -1) {
1149  edm::LogError("DAQSource") << "readWorker failed to seek file -: " << file->fileName_
1150  << " fd:" << fileDescriptor << " to offset " << chunk->offset_
1151  << " error: " << strerror(errno);
1152  setExceptionState_ = true;
1153  return;
1154  }
1155  }
1156 
1157  LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1158  << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1159 
1160  size_t skipped = bufferLeft;
1162  for (unsigned int i = 0; i < readBlocks; i++) {
1163  ssize_t last;
1164  edm::LogInfo("DAQSource") << "readWorker read -: " << (int64_t)(chunk->usedSize_ - bufferLeft) << " or "
1165  << (int64_t)eventChunkBlock_;
1166 
1167  //protect against reading into next block
1168  last = ::read(fileDescriptor,
1169  (void*)(chunk->buf_ + bufferLeft),
1170  std::min((int64_t)(chunk->usedSize_ - bufferLeft), (int64_t)eventChunkBlock_));
1171 
1172  if (last < 0) {
1173  edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1174  << " fd:" << fileDescriptor << " last: " << last << " error: " << strerror(errno);
1175  setExceptionState_ = true;
1176  break;
1177  }
1178  if (last > 0) {
1179  bufferLeft += last;
1180  }
1181  if ((uint64_t)last < eventChunkBlock_) { //last read
1182  edm::LogInfo("DAQSource") << "chunkUsedSize" << chunk->usedSize_ << " u-s:" << (chunk->usedSize_ - skipped)
1183  << " ix:" << i * eventChunkBlock_ << " " << (size_t)last;
1184  //check if this is last block if single file, then total read size must match file size
1185  if (file->numFiles_ == 1 && !(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (size_t)last)) {
1186  edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1187  << " fd:" << fileDescriptor << " last:" << last
1188  << " expectedChunkSize:" << chunk->usedSize_
1189  << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last)
1190  << " skipped:" << skipped << " block:" << (i + 1) << "/" << readBlocks
1191  << " error: " << strerror(errno);
1192  setExceptionState_ = true;
1193  }
1194  break;
1195  }
1196  }
1197  if (setExceptionState_)
1198  return;
1199 
1200  file->fileSizes_[0] = bufferLeft;
1201 
1202  if (chunk->offset_ + bufferLeft == file->diskFileSizes_[0] || bufferLeft == chunk->size_) {
1203  //file reading finished using this fd
1204  //or the whole buffer is filled (single sequential file spread over more chunks)
1205  close(fileDescriptor);
1206  fileDescriptor = -1;
1207  } else
1208  assert(fileDescriptor == -1);
1209 
1210  if (fitToBuffer && bufferLeft != file->diskFileSizes_[0]) {
1211  edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[0]
1212  << " read:" << bufferLeft << " expected:" << file->diskFileSizes_[0];
1213  setExceptionState_ = true;
1214  return;
1215  }
1216 
1218  auto diff = end - start;
1219  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1220  LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1221  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1222  << " GB/s)";
1223  };
1224  //END primary function
1225 
1226  //SECONDARY files function
1227  auto readSecondary = [&](uint64_t bufferLeft, unsigned int j) {
1228  size_t fileLen = 0;
1229 
1230  std::string const& addFile = file->fileNames_[j];
1231  int fileDescriptor = open(addFile.c_str(), O_RDONLY);
1232 
1233  if (fileDescriptor < 0) {
1234  edm::LogError("DAQSource") << "readWorker failed to open file -: " << addFile << " fd:" << fileDescriptor
1235  << " error: " << strerror(errno);
1236  setExceptionState_ = true;
1237  return;
1238  }
1239 
1240  LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << addFile << " at offset "
1241  << lseek(fileDescriptor, 0, SEEK_CUR);
1242 
1243  //size_t skipped = 0;//file is newly opened, read with header
1245  for (unsigned int i = 0; i < readBlocks; i++) {
1246  ssize_t last;
1247 
1248  //protect against reading into next block
1249  //use bufferLeft for the write offset
1250  last = ::read(fileDescriptor,
1251  (void*)(chunk->buf_ + bufferLeft),
1252  std::min((uint64_t)file->diskFileSizes_[j], (uint64_t)eventChunkBlock_));
1253 
1254  if (last < 0) {
1255  edm::LogError("DAQSource") << "readWorker failed to read file -: " << addFile << " fd:" << fileDescriptor
1256  << " error: " << strerror(errno);
1257  setExceptionState_ = true;
1258  close(fileDescriptor);
1259  break;
1260  }
1261  if (last > 0) {
1262  bufferLeft += last;
1263  fileLen += last;
1264  file->fileSize_ += last;
1265  }
1266  };
1267 
1268  close(fileDescriptor);
1269  file->fileSizes_[j] = fileLen;
1270  assert(fileLen > 0);
1271 
1272  if (fitToBuffer && fileLen != file->diskFileSizes_[j]) {
1273  edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[j]
1274  << " read:" << fileLen << " expected:" << file->diskFileSizes_[j];
1275  setExceptionState_ = true;
1276  return;
1277  }
1278 
1280  auto diff = end - start;
1281  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1282  LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1283  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1284  << " GB/s)";
1285  };
1286 
1287  //randomized order multi-file loop
1288  for (unsigned int j : file->fileOrder_) {
1289  if (j == 0) {
1290  readPrimary(bufferLeftInitial);
1291  } else
1292  readSecondary(file->bufferOffsets_[j], j);
1293 
1294  if (setExceptionState_)
1295  break;
1296  }
1297 
1298  if (setExceptionState_)
1299  continue;
1300 
1301  //detect FRD event version. Skip file Header if it exists
1302  if (dataMode_->dataVersion() == 0 && chunk->offset_ == 0) {
1303  dataMode_->detectVersion(chunk->buf_, file->rawHeaderSize_);
1304  }
1305  assert(dataMode_->versionCheck());
1306 
1307  chunk->readComplete_ =
1308  true; //this is atomic to secure the sequential buffer fill before becoming available for processing)
1309  file->chunks_[chunk->fileIndex_] = chunk; //put the completed chunk in the file chunk vector at predetermined index
1310  }
1311 }
1312 
1314  quit_threads_ = true;
1315  throw cms::Exception("DAQSource:threadError") << " file reader thread error ";
1316 }
1317 
1319  if (fms_)
1320  fms_->setInState(state);
1321 }
1322 
1324  if (fms_)
1326 }
1327 
1328 bool RawInputFile::advance(unsigned char*& dataPosition, const size_t size) {
1329  //wait for chunk
1330 
1331  while (!waitForChunk(currentChunk_)) {
1333  usleep(100000);
1337  }
1338 
1339  dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1340  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1341 
1342  if (currentLeft < size) {
1343  //we need next chunk
1344  assert(chunks_.size() > currentChunk_ + 1);
1345  while (!waitForChunk(currentChunk_ + 1)) {
1347  usleep(100000);
1351  }
1352  //copy everything to beginning of the first chunk
1353  dataPosition -= chunkPosition_;
1354  assert(dataPosition == chunks_[currentChunk_]->buf_);
1355  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1356  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1357  //set pointers at the end of the old data position
1358  bufferPosition_ += size;
1359  chunkPosition_ = size - currentLeft;
1360  currentChunk_++;
1361  return true;
1362  } else {
1363  chunkPosition_ += size;
1364  bufferPosition_ += size;
1365  return false;
1366  }
1367 }
1368 
1369 void DAQSource::reportEventsThisLumiInSource(unsigned int lumi, unsigned int events) {
1370  std::lock_guard<std::mutex> lock(monlock_);
1371  auto itr = sourceEventsReport_.find(lumi);
1372  if (itr != sourceEventsReport_.end())
1373  itr->second += events;
1374  else
1376 }
1377 
1378 std::pair<bool, unsigned int> DAQSource::getEventReport(unsigned int lumi, bool erase) {
1379  std::lock_guard<std::mutex> lock(monlock_);
1380  auto itr = sourceEventsReport_.find(lumi);
1381  if (itr != sourceEventsReport_.end()) {
1382  std::pair<bool, unsigned int> ret(true, itr->second);
1383  if (erase)
1384  sourceEventsReport_.erase(itr);
1385  return ret;
1386  } else
1387  return std::pair<bool, unsigned int>(false, 0);
1388 }
1389 
1392  if (a.rfind('/') != std::string::npos)
1393  a = a.substr(a.rfind('/'));
1394  if (b.rfind('/') != std::string::npos)
1395  b = b.substr(b.rfind('/'));
1396  return b > a;
1397  });
1398 
1399  if (!listFileNames_.empty()) {
1400  //get run number from first file in the vector
1402  std::string fileStem = fileName.stem().string();
1403  if (fileStem.find("file://") == 0)
1404  fileStem = fileStem.substr(7);
1405  else if (fileStem.find("file:") == 0)
1406  fileStem = fileStem.substr(5);
1407  auto end = fileStem.find('_');
1408 
1409  if (fileStem.find("run") == 0) {
1410  std::string runStr = fileStem.substr(3, end - 3);
1411  try {
1412  //get long to support test run numbers < 2^32
1413  long rval = std::stol(runStr);
1414  edm::LogInfo("DAQSource") << "Autodetected run number in fileListMode -: " << rval;
1415  return rval;
1416  } catch (const std::exception&) {
1417  edm::LogWarning("DAQSource") << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1418  }
1419  }
1420  }
1421  return -1;
1422 }
1423 
1425  if (fileListIndex_ < listFileNames_.size()) {
1426  nextFile = listFileNames_[fileListIndex_];
1427  if (nextFile.find("file://") == 0)
1428  nextFile = nextFile.substr(7);
1429  else if (nextFile.find("file:") == 0)
1430  nextFile = nextFile.substr(5);
1431  std::filesystem::path fileName = nextFile;
1432  std::string fileStem = fileName.stem().string();
1433  if (fileStem.find("ls"))
1434  fileStem = fileStem.substr(fileStem.find("ls") + 2);
1435  if (fileStem.find('_'))
1436  fileStem = fileStem.substr(0, fileStem.find('_'));
1437 
1438  if (!fileListLoopMode_)
1439  ls = std::stoul(fileStem);
1440  else //always starting from LS 1 in loop mode
1441  ls = 1 + loopModeIterationInc_;
1442 
1443  //fsize = 0;
1444  //lockWaitTime = 0;
1445  fileListIndex_++;
1447  } else {
1448  if (!fileListLoopMode_)
1450  else {
1451  //loop through files until interrupted
1453  fileListIndex_ = 0;
1454  return getFile(ls, nextFile, lockWaitTime);
1455  }
1456  }
1457 }
std::string & buBaseRunDir()
size
Write out results.
static const char runNumber_[]
long initFileList()
Definition: DAQSource.cc:1390
Definition: start.py:1
Definition: fillJson.h:27
std::unique_ptr< RawInputFile > currentFile_
Definition: DAQSource.h:134
std::string const & runString() const
std::pair< RawInputFile *, InputChunk * > ReaderInfo
Definition: DAQSource.h:132
int currentFileIndex_
Definition: DAQSource.h:158
edm::RunNumber_t runNumber_
Definition: DAQSource.h:114
void threadError()
Definition: DAQSource.cc:1313
DAQSource * sourceParent_
Definition: DAQSource.h:200
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:232
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:330
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
unsigned int checkEvery_
Definition: DAQSource.h:162
std::unique_ptr< std::thread > readSupervisorThread_
Definition: DAQSource.h:138
bool crc32c_hw_test()
Definition: crc32c.cc:354
uint64_t maxChunkSize_
Definition: DAQSource.h:92
std::mutex startupLock_
Definition: DAQSource.h:155
std::default_random_engine rng_
Definition: DAQSource.h:124
std::map< unsigned int, unsigned int > sourceEventsReport_
Definition: DAQSource.h:173
bool chunkIsFree_
Definition: DAQSource.h:135
std::condition_variable startupCv_
Definition: DAQSource.h:156
ret
prodAgent to be discontinued
std::atomic< bool > threadInit_
Definition: DAQSource.h:171
unsigned int loopModeIterationInc_
Definition: DAQSource.h:112
Next checkNext() override
Definition: DAQSource.cc:251
void read(edm::EventPrincipal &eventPrincipal) override
Definition: DAQSource.cc:538
volatile std::atomic< bool > shutdown_flag
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:457
constexpr int pow(int x)
Definition: conifer.h:24
bool lumisectionDiscarded(unsigned int ls)
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
Definition: DAQSource.cc:1378
void maybeOpenNewLumiSection(const uint32_t lumiSection)
Definition: DAQSource.cc:324
std::mutex fileDeleteLock_
Definition: DAQSource.h:160
const std::string dataModeConfig_
Definition: DAQSource.h:90
std::vector< std::thread * > workerThreads_
Definition: DAQSource.h:140
Log< level::Error, false > LogError
std::mutex mReader_
Definition: DAQSource.h:148
unsigned int numConcurrentLumis() const
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
Definition: DAQSource.cc:1369
StreamID streamID() const
int timeout
Definition: mps_check.py:53
assert(be >=bs)
bool isExceptionOnData(unsigned int ls)
edm::ProcessHistoryID processHistoryID_
Definition: DAQSource.h:117
std::mutex mWakeup_
Definition: DAQSource.h:165
bool waitForChunk(unsigned int chunkid)
unsigned int currentLumiSection_
Definition: DAQSource.h:119
void overrideRunNumber(unsigned int run)
std::atomic< unsigned int > readingFilesCount_
Definition: DAQSource.h:98
evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock()
Definition: DAQSource.cc:344
std::string getEoRFilePathOnFU() const
uint64_t eventChunkSize_
Definition: DAQSource.h:91
U second(std::pair< T, U > const &p)
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
static Timestamp beginOfTime()
Definition: Timestamp.h:77
bool advance(unsigned char *&dataPosition, const size_t size)
Definition: DAQSource.cc:1328
DAQSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
Definition: DAQSource.cc:32
uint64_t eventChunkBlock_
Definition: DAQSource.h:93
evf::EvFDaqDirector::FileStatus getNextDataBlock()
Definition: DAQSource.cc:370
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
int currentLumiSection() const
Definition: DAQSource.h:52
unsigned int fileListIndex_
Definition: DAQSource.h:110
unsigned int eventsThisLumi_
Definition: DAQSource.h:122
const bool fileListLoopMode_
Definition: DAQSource.h:111
std::condition_variable cvWakeup_
Definition: DAQSource.h:166
friend struct InputChunk
Definition: DAQSource.h:43
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:337
void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex)
unsigned int maxBufferedFiles_
Definition: DAQSource.h:96
unsigned char * buf_
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
void readWorker(unsigned int tid)
Definition: DAQSource.cc:1054
unsigned int numBuffers_
Definition: DAQSource.h:95
std::atomic< bool > quit_threads_
Definition: DAQSource.h:152
const bool alwaysStartFromFirstLS_
Definition: DAQSource.h:101
std::shared_ptr< DataMode > dataMode_
Definition: DAQSource.h:176
unsigned int fileIndex_
tbb::concurrent_queue< std::unique_ptr< RawInputFile > > fileQueue_
Definition: DAQSource.h:146
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:362
const bool fileListMode_
Definition: DAQSource.h:109
void createProcessingNotificationMaybe() const
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
std::atomic< bool > readComplete_
Definition: init.py:1
def ls(path, rec=False)
Definition: eostools.py:349
const std::vector< unsigned int > testTCDSFEDRange_
Definition: DAQSource.h:104
tbb::concurrent_vector< InputChunk * > chunks_
unsigned int getStartLumisectionFromEnv() const
def getRunNumber(filename)
uint32_t eventRunNumber_
Definition: DAQSource.h:120
void setInStateSup(FastMonState::InputState inputState)
unsigned long long uint64_t
Definition: Time.h:13
void dataArranger()
Definition: DAQSource.cc:581
std::vector< std::string > listFileNames_
Definition: DAQSource.h:105
~DAQSource() override
Definition: DAQSource.cc:182
def load(fileName)
Definition: svgfig.py:547
uint32_t chunkPosition_
std::vector< int > streamFileTracker_
Definition: DAQSource.h:161
double b
Definition: hdecay.h:120
void rewind_() override
Definition: DAQSource.cc:579
std::string getEoRFileName() const
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:345
std::vector< unsigned int > tid_active_
Definition: DAQSource.h:150
void stoppedLookingForFile(unsigned int lumi)
std::vector< unsigned int > thread_quit_signal
Definition: DAQSource.h:153
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:331
ItemType state() const
Definition: InputSource.h:332
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:462
unsigned int readBlocks_
Definition: DAQSource.h:94
HLT enums.
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:333
double a
Definition: hdecay.h:121
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:235
void setMonStateSup(evf::FastMonState::InputState state)
Definition: DAQSource.cc:1323
friend class RawInputFile
Definition: DAQSource.h:42
bool setExceptionState_
Definition: DAQSource.h:154
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: DAQSource.cc:220
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
Definition: DAQSource.h:149
unsigned int getLumisectionToStart() const
bool startedSupervisorThread_
Definition: DAQSource.h:137
evf::EvFDaqDirector * daqDirector_
Definition: DAQSource.h:88
tbb::concurrent_queue< InputChunk * > freeChunks_
Definition: DAQSource.h:145
unsigned int numConcurrentReads_
Definition: DAQSource.h:97
int addFile(MEStore &micromes, int fd)
Definition: fastHadd.cc:352
unsigned int RunNumber_t
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
tbb::concurrent_queue< unsigned int > workerPool_
Definition: DAQSource.h:142
bool resize(uint64_t wantedSize, uint64_t maxSize)
Log< level::Warning, false > LogWarning
unsigned int currentChunk_
std::vector< ReaderInfo > workerJob_
Definition: DAQSource.h:143
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint64_t &lockWaitTime)
Definition: DAQSource.cc:1424
void setMonState(evf::FastMonState::InputState state)
Definition: DAQSource.cc:1318
void setFMS(evf::FastMonitoringService *fms)
bool exceptionState()
Definition: DAQSource.h:78
const bool verifyChecksum_
Definition: DAQSource.h:102
void readSupervisor()
Definition: DAQSource.cc:583
int events
std::vector< std::string > const & getBUBaseDirs() const
def move(src, dest)
Definition: eostools.py:511
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
Definition: DAQSource.h:159
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
#define LogDebug(id)
void setInState(FastMonState::InputState inputState)
std::mutex monlock_
Definition: DAQSource.h:174
uint32_t bufferPosition_