CMS 3D CMS Logo

FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <memory>
5 #include <sstream>
6 #include <sys/types.h>
7 #include <sys/file.h>
8 #include <sys/time.h>
9 #include <unistd.h>
10 #include <vector>
11 #include <fstream>
12 #include <zlib.h>
13 #include <cstdio>
14 #include <chrono>
15 
16 #include <boost/algorithm/string.hpp>
17 
21 
23 
30 
32 
34 
38 
40 
45 
46 //JSON file reader
48 
49 #include <boost/lexical_cast.hpp>
50 
51 using namespace evf::FastMonState;
52 
54  : edm::RawInputSource(pset, desc),
55  defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
56  eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
57  eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
58  numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
59  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
60  getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
61  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
62  verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
63  useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
64  testTCDSFEDRange_(
65  pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())),
66  fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
67  fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
68  fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
69  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
70  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
71  eventID_(),
72  processHistoryID_(),
73  currentLumiSection_(0),
74  tcds_pointer_(nullptr),
75  eventsThisLumi_(0) {
76  char thishost[256];
77  gethostname(thishost, 255);
78  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
79  << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
80 
81  if (!testTCDSFEDRange_.empty()) {
82  if (testTCDSFEDRange_.size() != 2) {
83  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
84  << "Invalid TCDS Test FED range parameter";
85  }
88  }
89 
90  long autoRunNumber = -1;
91  if (fileListMode_) {
92  autoRunNumber = initFileList();
93  if (!fileListLoopMode_) {
94  if (autoRunNumber < 0)
95  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
96  //override run number
97  runNumber_ = (edm::RunNumber_t)autoRunNumber;
98  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
99  }
100  }
101 
103  setNewRun();
104  //todo:autodetect from file name (assert if names differ)
106 
107  //make sure that chunk size is N * block size
112 
113  if (!numBuffers_)
114  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
115  << "no reading enabled with numBuffers parameter 0";
116 
119  readingFilesCount_ = 0;
120 
121  if (!crc32c_hw_test())
122  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
123 
124  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
125  if (fileListMode_) {
126  try {
127  fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
128  } catch (cms::Exception const&) {
129  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
130  }
131  } else {
132  fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
133  if (!fms_) {
134  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
135  }
136  }
137 
139  if (!daqDirector_)
140  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
141 
143  if (useFileBroker_)
144  edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
145  //set DaqDirector to delete files in preGlobalEndLumi callback
147  if (fms_) {
149  fms_->setInputSource(this);
152  }
153  //should delete chunks when run stops
154  for (unsigned int i = 0; i < numBuffers_; i++) {
156  }
157 
158  quit_threads_ = false;
159 
160  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
161  std::unique_lock<std::mutex> lk(startupLock_);
162  //issue a memory fence here and in threads (constructor was segfaulting without this)
163  thread_quit_signal.push_back(false);
164  workerJob_.push_back(ReaderInfo(nullptr, nullptr));
165  cvReader_.push_back(new std::condition_variable);
166  tid_active_.push_back(0);
167  threadInit_.store(false, std::memory_order_release);
168  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
169  startupCv_.wait(lk);
170  }
171 
172  runAuxiliary()->setProcessHistoryID(processHistoryID_);
173 }
174 
176  quit_threads_ = true;
177 
178  //delete any remaining open files
179  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
180  it->second.reset();
181 
183  readSupervisorThread_->join();
184  } else {
185  //join aux threads in case the supervisor thread was not started
186  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
187  std::unique_lock<std::mutex> lk(mReader_);
188  thread_quit_signal[i] = true;
189  cvReader_[i]->notify_one();
190  lk.unlock();
191  workerThreads_[i]->join();
192  delete workerThreads_[i];
193  }
194  }
195  for (unsigned int i = 0; i < numConcurrentReads_; i++)
196  delete cvReader_[i];
197  /*
198  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
199  InputChunk *ch;
200  while (!freeChunks_.try_pop(ch)) {}
201  delete ch;
202  }
203  */
204 }
205 
208  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
209  desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
210  desc.addUntracked<unsigned int>("eventChunkBlock", 32)
211  ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
212  desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
213  desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
214  ->setComment("Maximum number of simultaneously buffered raw files");
215  desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
216  ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
217  desc.addUntracked<bool>("verifyChecksum", true)
218  ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
219  desc.addUntracked<bool>("useL1EventID", false)
220  ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
221  desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
222  ->setComment("[min, max] range to search for TCDS FED ID in test setup");
223  desc.addUntracked<bool>("fileListMode", false)
224  ->setComment("Use fileNames parameter to directly specify raw files to open");
225  desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
226  ->setComment("file list used when fileListMode is enabled");
227  desc.setAllowAnything();
228  descriptions.add("source", desc);
229 }
230 
233  //this thread opens new files and dispatches reading to worker readers
234  //threadInit_.store(false,std::memory_order_release);
235  std::unique_lock<std::mutex> lk(startupLock_);
236  readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
238  startupCv_.wait(lk);
239  }
240  //signal hltd to start event accounting
241  if (!currentLumiSection_)
244  switch (nextEvent()) {
246  //maybe create EoL file in working directory before ending run
247  struct stat buf;
248  if (!useFileBroker_ && currentLumiSection_ > 0) {
249  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
250  if (eolFound) {
252  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
253  if (!found) {
255  int eol_fd =
256  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
257  close(eol_fd);
259  }
260  }
261  }
262  //also create EoR file in FU data directory
263  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
264  if (!eorFound) {
265  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
266  O_RDWR | O_CREAT,
267  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
268  close(eor_fd);
269  }
271  eventsThisLumi_ = 0;
273  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
274  return Next::kStop;
275  }
277  //this is not reachable
278  return Next::kEvent;
279  }
281  //std::cout << "--------------NEW LUMI---------------" << std::endl;
282  return Next::kEvent;
283  }
284  default: {
285  if (!getLSFromFilename_) {
286  //get new lumi from file header
287  if (event_->lumi() > currentLumiSection_) {
289  eventsThisLumi_ = 0;
291  }
292  }
295  else
296  eventRunNumber_ = event_->run();
297  L1EventID_ = event_->event();
298 
299  setEventCached();
300 
301  return Next::kEvent;
302  }
303  }
304 }
305 
306 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
307  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
308  if (!useFileBroker_) {
309  if (currentLumiSection_ > 0) {
311  struct stat buf;
312  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
313  if (!found) {
315  int eol_fd =
316  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
317  close(eol_fd);
318  daqDirector_->createBoLSFile(lumiSection, false);
320  }
321  } else
322  daqDirector_->createBoLSFile(lumiSection, true); //needed for initial lumisection
323  }
324 
325  currentLumiSection_ = lumiSection;
326 
328 
329  timeval tv;
330  gettimeofday(&tv, nullptr);
331  const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
332 
334  runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
335 
336  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
337  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
338 
339  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
340  }
341 }
342 
346  if (edm::shutdown_flag.load(std::memory_order_relaxed))
347  break;
348  }
349  return status;
350 }
351 
353  if (setExceptionState_)
354  threadError();
355  if (!currentFile_.get()) {
358  if (!fileQueue_.try_pop(currentFile_)) {
359  //sleep until wakeup (only in single-buffer mode) or timeout
360  std::unique_lock<std::mutex> lkw(mWakeup_);
361  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
363  }
364  status = currentFile_->status_;
367  currentFile_.reset();
368  return status;
369  } else if (status == evf::EvFDaqDirector::runAbort) {
370  throw cms::Exception("FedRawDataInputSource::getNextEvent")
371  << "Run has been aborted by the input source reader thread";
372  } else if (status == evf::EvFDaqDirector::newLumi) {
374  if (getLSFromFilename_) {
375  if (currentFile_->lumi_ > currentLumiSection_) {
377  eventsThisLumi_ = 0;
379  }
380  } else { //let this be picked up from next event
382  }
383  currentFile_.reset();
384  return status;
385  } else if (status == evf::EvFDaqDirector::newFile) {
387  } else
388  assert(false);
389  }
391 
392  //file is empty
393  if (!currentFile_->fileSize_) {
395  //try to open new lumi
396  assert(currentFile_->nChunks_ == 0);
397  if (getLSFromFilename_)
398  if (currentFile_->lumi_ > currentLumiSection_) {
400  eventsThisLumi_ = 0;
402  }
403  //immediately delete empty file
404  currentFile_.reset();
406  }
407 
408  //file is finished
409  if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
411  //release last chunk (it is never released elsewhere)
412  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
413  if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
414  throw cms::Exception("FedRawDataInputSource::getNextEvent")
415  << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
416  << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
417  }
418  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
419  if (singleBufferMode_) {
420  std::unique_lock<std::mutex> lkw(mWakeup_);
421  cvWakeup_.notify_one();
422  }
423  bufferInputRead_ = 0;
425  //put the file in pending delete list;
426  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
427  filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
428  } else {
429  //in single-thread and stream jobs, events are already processed
430  currentFile_.reset();
431  }
433  }
434 
435  //handle RAW file header
436  if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
437  if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
438  if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
439  throw cms::Exception("FedRawDataInputSource::getNextEvent")
440  << "Premature end of input file while reading file header";
441 
442  edm::LogWarning("FedRawDataInputSource")
443  << "File with only raw header and no events received in LS " << currentFile_->lumi_;
444  if (getLSFromFilename_)
445  if (currentFile_->lumi_ > currentLumiSection_) {
447  eventsThisLumi_ = 0;
449  }
450  }
451 
452  //advance buffer position to skip file header (chunk will be acquired later)
453  currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
454  currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
455  }
456 
457  //file is too short
458  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
459  throw cms::Exception("FedRawDataInputSource::getNextEvent")
460  << "Premature end of input file while reading event header";
461  }
462  if (singleBufferMode_) {
463  //should already be there
465  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
466  usleep(10000);
467  if (currentFile_->parent_->exceptionState() || setExceptionState_)
468  currentFile_->parent_->threadError();
469  }
471 
472  unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
473 
474  //conditions when read amount is not sufficient for the header to fit
478 
479  if (detectedFRDversion_ == 0) {
480  detectedFRDversion_ = *((uint16_t*)dataPosition);
482  throw cms::Exception("FedRawDataInputSource::getNextEvent")
483  << "Unknown FRD version -: " << detectedFRDversion_;
485  }
486 
487  //recalculate chunk position
488  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
490  throw cms::Exception("FedRawDataInputSource::getNextEvent")
491  << "Premature end of input file while reading event header";
492  }
493  }
494 
495  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
496  if (event_->size() > eventChunkSize_) {
497  throw cms::Exception("FedRawDataInputSource::getNextEvent")
498  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
499  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
500  << " bytes";
501  }
502 
503  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
504 
505  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
506  throw cms::Exception("FedRawDataInputSource::getNextEvent")
507  << "Premature end of input file while reading event data";
508  }
509  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
511  //recalculate chunk position
512  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
513  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
514  }
515  currentFile_->bufferPosition_ += event_->size();
516  currentFile_->chunkPosition_ += event_->size();
517  //last chunk is released when this function is invoked next time
518 
519  }
520  //multibuffer mode:
521  else {
522  //wait for the current chunk to become added to the vector
524  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
525  usleep(10000);
526  if (setExceptionState_)
527  threadError();
528  }
530 
531  //check if header is at the boundary of two chunks
532  chunkIsFree_ = false;
533  unsigned char* dataPosition;
534 
535  //read header, copy it to a single chunk if necessary
536  bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
537 
538  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
539  if (event_->size() > eventChunkSize_) {
540  throw cms::Exception("FedRawDataInputSource::getNextEvent")
541  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
542  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
543  << " bytes";
544  }
545 
546  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
547 
548  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
549  throw cms::Exception("FedRawDataInputSource::getNextEvent")
550  << "Premature end of input file while reading event data";
551  }
552 
553  if (chunkEnd) {
554  //header was at the chunk boundary, we will have to move payload as well
555  currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
556  chunkIsFree_ = true;
557  } else {
558  //header was contiguous, but check if payload fits the chunk
559  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
560  //rewind to header start position
562  //copy event to a chunk start and move pointers
563 
565 
566  chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
567 
569 
570  assert(chunkEnd);
571  chunkIsFree_ = true;
572  //header is moved
573  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
574  } else {
575  //everything is in a single chunk, only move pointers forward
576  chunkEnd = currentFile_->advance(dataPosition, msgSize);
577  assert(!chunkEnd);
578  chunkIsFree_ = false;
579  }
580  }
581  } //end multibuffer mode
583 
584  if (verifyChecksum_ && event_->version() >= 5) {
585  uint32_t crc = 0;
586  crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
587  if (crc != event_->crc32c()) {
588  if (fms_)
590  throw cms::Exception("FedRawDataInputSource::getNextEvent")
591  << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
592  << crc;
593  }
594  } else if (verifyChecksum_ && event_->version() >= 3) {
595  uint32_t adler = adler32(0L, Z_NULL, 0);
596  adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
597 
598  if (adler != event_->adler32()) {
599  if (fms_)
601  throw cms::Exception("FedRawDataInputSource::getNextEvent")
602  << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
603  << adler;
604  }
605  }
607 
608  currentFile_->nProcessed_++;
609 
611 }
612 
615  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
616  bool tcdsInRange;
617  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);
618 
619  if (useL1EventID_) {
622  aux.setProcessHistoryID(processHistoryID_);
623  makeEvent(eventPrincipal, aux);
624  } else if (tcds_pointer_ == nullptr) {
625  if (!GTPEventID_) {
626  throw cms::Exception("FedRawDataInputSource::read")
627  << "No TCDS or GTP FED in event with FEDHeader EID -: " << L1EventID_;
628  }
631  aux.setProcessHistoryID(processHistoryID_);
632  makeEvent(eventPrincipal, aux);
633  } else {
634  const FEDHeader fedHeader(tcds_pointer_);
635  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
640  event_->isRealData(),
641  static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
642  processGUID(),
644  !tcdsInRange);
645  aux.setProcessHistoryID(processHistoryID_);
646  makeEvent(eventPrincipal, aux);
647  }
648 
649  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
650 
652 
653  eventsThisLumi_++;
655 
656  //resize vector if needed
657  while (streamFileTracker_.size() <= eventPrincipal.streamID())
658  streamFileTracker_.push_back(-1);
659 
660  streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
661 
662  //this old file check runs no more often than every 10 events
663  if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
664  //delete files that are not in processing
665  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
666  auto it = filesToDelete_.begin();
667  while (it != filesToDelete_.end()) {
668  bool fileIsBeingProcessed = false;
669  for (unsigned int i = 0; i < nStreams_; i++) {
670  if (it->first == streamFileTracker_.at(i)) {
671  fileIsBeingProcessed = true;
672  break;
673  }
674  }
675  if (!fileIsBeingProcessed) {
676  std::string fileToDelete = it->second->fileName_;
677  it = filesToDelete_.erase(it);
678  } else
679  it++;
680  }
681  }
682  if (chunkIsFree_)
683  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
684  chunkIsFree_ = false;
686  return;
687 }
688 
691  timeval stv;
692  gettimeofday(&stv, nullptr);
693  time = stv.tv_sec;
694  time = (time << 32) + stv.tv_usec;
695  edm::Timestamp tstamp(time);
696 
697  uint32_t eventSize = event_->eventSize();
698  unsigned char* event = (unsigned char*)event_->payload();
699  GTPEventID_ = 0;
700  tcds_pointer_ = nullptr;
701  tcdsInRange = false;
702  uint16_t selectedTCDSFed = 0;
703  while (eventSize > 0) {
704  assert(eventSize >= FEDTrailer::length);
705  eventSize -= FEDTrailer::length;
706  const FEDTrailer fedTrailer(event + eventSize);
707  const uint32_t fedSize = fedTrailer.fragmentLength() << 3; //trailer length counts in 8 bytes
708  assert(eventSize >= fedSize - FEDHeader::length);
709  eventSize -= (fedSize - FEDHeader::length);
710  const FEDHeader fedHeader(event + eventSize);
711  const uint16_t fedId = fedHeader.sourceID();
713  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
714  } else if (fedId >= MINTCDSuTCAFEDID_ && fedId <= MAXTCDSuTCAFEDID_) {
715  if (!selectedTCDSFed) {
716  selectedTCDSFed = fedId;
717  tcds_pointer_ = event + eventSize;
719  tcdsInRange = true;
720  }
721  } else
722  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
723  << "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
724  }
726  if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
727  GTPEventID_ = evf::evtn::get(event + eventSize, true);
728  else
729  GTPEventID_ = evf::evtn::get(event + eventSize, false);
730  //evf::evtn::evm_board_setformat(fedSize);
731  const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
732  const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
733  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
734  }
735  //take event ID from GTPE FED
737  if (evf::evtn::gtpe_board_sense(event + eventSize)) {
738  GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
739  }
740  }
741  FEDRawData& fedData = rawData.FEDData(fedId);
742  fedData.resize(fedSize);
743  memcpy(fedData.data(), event + eventSize, fedSize);
744  }
745  assert(eventSize == 0);
746 
747  return tstamp;
748 }
749 
751 
753  bool stop = false;
754  unsigned int currentLumiSection = 0;
755  //threadInit_.exchange(true,std::memory_order_acquire);
756 
757  {
758  std::unique_lock<std::mutex> lk(startupLock_);
759  startupCv_.notify_one();
760  }
761 
762  uint32_t ls = 0;
763  uint32_t monLS = 1;
764  uint32_t lockCount = 0;
765  uint64_t sumLockWaitTimeUs = 0.;
766 
767  while (!stop) {
768  //wait for at least one free thread and chunk
769  int counter = 0;
770 
771  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
773  //report state to monitoring
774  if (fms_) {
775  bool copy_active = false;
776  for (auto j : tid_active_)
777  if (j)
778  copy_active = true;
781  else if (freeChunks_.empty()) {
782  if (copy_active)
784  else
786  } else {
787  if (copy_active)
789  else
791  }
792  }
793  std::unique_lock<std::mutex> lkw(mWakeup_);
794  //sleep until woken up by condition or a timeout
795  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
796  counter++;
797  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
798  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
799  } else {
800  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
801  }
802  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
803  stop = true;
804  break;
805  }
806  }
807  //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
808 
809  if (stop)
810  break;
811 
812  //look for a new file
813  std::string nextFile;
814  uint32_t fileSizeIndex;
815  int64_t fileSizeFromMetadata;
816 
817  if (fms_) {
820  }
821 
823  uint16_t rawHeaderSize = 0;
824  uint32_t lsFromRaw = 0;
825  int32_t serverEventsInNewFile = -1;
826  int rawFd = -1;
827 
828  int backoff_exp = 0;
829 
830  //entering loop which tries to grab new file from ramdisk
832  //check if hltd has signalled to throttle input
833  counter = 0;
834  while (daqDirector_->inputThrottled()) {
835  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
836  break;
838  if (!(counter % 50))
839  edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
840  usleep(100000);
841  counter++;
842  }
843 
844  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
845  stop = true;
846  break;
847  }
848 
849  assert(rawFd == -1);
850  uint64_t thisLockWaitTimeUs = 0.;
852  if (fileListMode_) {
853  //return LS if LS not set, otherwise return file
854  status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
857  rawFd,
858  rawHeaderSize,
859  lsFromRaw,
860  serverEventsInNewFile,
861  fileSizeFromMetadata,
862  false,
863  false,
864  false) != 0) {
865  //error
866  setExceptionState_ = true;
867  stop = true;
868  break;
869  }
870  if (!getLSFromFilename_)
871  ls = lsFromRaw;
872  }
873  } else if (!useFileBroker_)
875  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
876  else {
877  status = daqDirector_->getNextFromFileBroker(currentLumiSection,
878  ls,
879  nextFile,
880  rawFd,
881  rawHeaderSize,
882  serverEventsInNewFile,
883  fileSizeFromMetadata,
884  thisLockWaitTimeUs);
885  }
886 
888 
889  //cycle through all remaining LS even if no files get assigned
890  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
892 
893  //monitoring of lock wait time
894  if (thisLockWaitTimeUs > 0.)
895  sumLockWaitTimeUs += thisLockWaitTimeUs;
896  lockCount++;
897  if (ls > monLS) {
898  monLS = ls;
899  if (lockCount)
900  if (fms_)
901  fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
902  lockCount = 0;
903  sumLockWaitTimeUs = 0;
904  }
905 
906  //check again for any remaining index/EoLS files after EoR file is seen
909  usleep(100000);
910  //now all files should have appeared in ramdisk, check again if any raw files were left behind
912  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
913  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
915  }
916 
918  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
919  fileQueue_.push(std::move(inf));
920  stop = true;
921  break;
922  }
923 
924  //error from filelocking function
926  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
927  fileQueue_.push(std::move(inf));
928  stop = true;
929  break;
930  }
931  //queue new lumisection
932  if (getLSFromFilename_) {
933  if (ls > currentLumiSection) {
934  if (!useFileBroker_) {
935  //file locking
936  //setMonStateSup(inSupNewLumi);
937  currentLumiSection = ls;
938  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
939  fileQueue_.push(std::move(inf));
940  } else {
941  //new file service
942  if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
943  if (ls < 100) {
944  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
945  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
946 
947  for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
948  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
949  fileQueue_.push(std::move(inf));
950  }
951  } else {
952  //start from current LS
953  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
954  fileQueue_.push(std::move(inf));
955  }
956  } else {
957  //queue all lumisections after last one seen to avoid gaps
958  for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
959  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
960  fileQueue_.push(std::move(inf));
961  }
962  }
963  currentLumiSection = ls;
964  }
965  }
966  //else
967  if (currentLumiSection > 0 && ls < currentLumiSection) {
968  edm::LogError("FedRawDataInputSource")
969  << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
970  << ". Aborting execution." << std::endl;
971  if (rawFd != -1)
972  close(rawFd);
973  rawFd = -1;
974  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
975  fileQueue_.push(std::move(inf));
976  stop = true;
977  break;
978  }
979  }
980 
981  int dbgcount = 0;
984  dbgcount++;
985  if (!(dbgcount % 20))
986  LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
987  if (!useFileBroker_)
988  usleep(100000);
989  else {
990  backoff_exp = std::min(4, backoff_exp); // max 1.6 seconds
991  //backoff_exp=0; // disabled!
992  int sleeptime = (int)(100000. * pow(2, backoff_exp));
993  usleep(sleeptime);
994  backoff_exp++;
995  }
996  } else
997  backoff_exp = 0;
998  }
999  //end of file grab loop, parse result
1002  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1003 
1004  std::string rawFile;
1005  //file service will report raw extension
1006  if (useFileBroker_ || rawHeaderSize)
1007  rawFile = nextFile;
1008  else {
1009  std::filesystem::path rawFilePath(nextFile);
1010  rawFile = rawFilePath.replace_extension(".raw").string();
1011  }
1012 
1013  struct stat st;
1014  int stat_res = stat(rawFile.c_str(), &st);
1015  if (stat_res == -1) {
1016  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
1017  setExceptionState_ = true;
1018  break;
1019  }
1020  uint64_t fileSize = st.st_size;
1021 
1022  if (fms_) {
1026  }
1027  int eventsInNewFile;
1028  if (fileListMode_) {
1029  if (fileSize == 0)
1030  eventsInNewFile = 0;
1031  else
1032  eventsInNewFile = -1;
1033  } else {
1035  if (!useFileBroker_) {
1036  if (rawHeaderSize) {
1037  int rawFdEmpty = -1;
1038  uint16_t rawHeaderCheck;
1039  bool fileFound;
1040  eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
1041  nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
1042  assert(fileFound && rawHeaderCheck == rawHeaderSize);
1044  } else
1045  eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1046  } else
1047  eventsInNewFile = serverEventsInNewFile;
1048  assert(eventsInNewFile >= 0);
1049  assert((eventsInNewFile > 0) ==
1050  (fileSize > rawHeaderSize)); //file without events must be empty or contain only header
1051  }
1052 
1053  if (!singleBufferMode_) {
1054  //calculate number of needed chunks
1055  unsigned int neededChunks = fileSize / eventChunkSize_;
1056  if (fileSize % eventChunkSize_)
1057  neededChunks++;
1058 
1059  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1060  ls,
1061  rawFile,
1062  !fileListMode_,
1063  rawFd,
1064  fileSize,
1065  rawHeaderSize,
1066  neededChunks,
1067  eventsInNewFile,
1068  this));
1070  auto newInputFilePtr = newInputFile.get();
1071  fileQueue_.push(std::move(newInputFile));
1072 
1073  for (unsigned int i = 0; i < neededChunks; i++) {
1074  if (fms_) {
1075  bool copy_active = false;
1076  for (auto j : tid_active_)
1077  if (j)
1078  copy_active = true;
1079  if (copy_active)
1081  else
1083  }
1084  //get thread
1085  unsigned int newTid = 0xffffffff;
1086  while (!workerPool_.try_pop(newTid)) {
1087  usleep(100000);
1088  if (quit_threads_.load(std::memory_order_relaxed)) {
1089  stop = true;
1090  break;
1091  }
1092  }
1093 
1094  if (fms_) {
1095  bool copy_active = false;
1096  for (auto j : tid_active_)
1097  if (j)
1098  copy_active = true;
1099  if (copy_active)
1101  else
1103  }
1104  InputChunk* newChunk = nullptr;
1105  while (!freeChunks_.try_pop(newChunk)) {
1106  usleep(100000);
1107  if (quit_threads_.load(std::memory_order_relaxed)) {
1108  stop = true;
1109  break;
1110  }
1111  }
1112 
1113  if (newChunk == nullptr) {
1114  //return unused tid if we received shutdown (nullptr chunk)
1115  if (newTid != 0xffffffff)
1116  workerPool_.push(newTid);
1117  stop = true;
1118  break;
1119  }
1120  if (stop)
1121  break;
1123 
1124  std::unique_lock<std::mutex> lk(mReader_);
1125 
1126  unsigned int toRead = eventChunkSize_;
1127  if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1128  toRead = fileSize % eventChunkSize_;
1129  newChunk->reset(i * eventChunkSize_, toRead, i);
1130 
1131  workerJob_[newTid].first = newInputFilePtr;
1132  workerJob_[newTid].second = newChunk;
1133 
1134  //wake up the worker thread
1135  cvReader_[newTid]->notify_one();
1136  }
1137  } else {
1138  if (!eventsInNewFile) {
1139  if (rawFd) {
1140  close(rawFd);
1141  rawFd = -1;
1142  }
1143  //still queue file for lumi update
1144  std::unique_lock<std::mutex> lkw(mWakeup_);
1145  //TODO: also file with only file header fits in this edge case. Check if read correctly in single buffer mode
1146  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1147  ls,
1148  rawFile,
1149  !fileListMode_,
1150  rawFd,
1151  fileSize,
1152  rawHeaderSize,
1153  (rawHeaderSize > 0),
1154  0,
1155  this));
1157  fileQueue_.push(std::move(newInputFile));
1158  cvWakeup_.notify_one();
1159  break;
1160  }
1161  //in single-buffer mode put single chunk in the file and let the main thread read the file
1162  InputChunk* newChunk;
1163  //should be available immediately
1164  while (!freeChunks_.try_pop(newChunk)) {
1165  usleep(100000);
1166  if (quit_threads_.load(std::memory_order_relaxed))
1167  break;
1168  }
1169 
1170  std::unique_lock<std::mutex> lkw(mWakeup_);
1171 
1172  unsigned int toRead = eventChunkSize_;
1173  if (fileSize % eventChunkSize_)
1174  toRead = fileSize % eventChunkSize_;
1175  newChunk->reset(0, toRead, 0);
1176  newChunk->readComplete_ = true;
1177 
1178  //push file and wakeup main thread
1179  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1180  ls,
1181  rawFile,
1182  !fileListMode_,
1183  rawFd,
1184  fileSize,
1185  rawHeaderSize,
1186  1,
1187  eventsInNewFile,
1188  this));
1189  newInputFile->chunks_[0] = newChunk;
1191  fileQueue_.push(std::move(newInputFile));
1192  cvWakeup_.notify_one();
1193  }
1194  }
1195  }
1197  //make sure threads finish reading
1198  unsigned numFinishedThreads = 0;
1199  while (numFinishedThreads < workerThreads_.size()) {
1200  unsigned tid = 0;
1201  while (!workerPool_.try_pop(tid)) {
1202  usleep(10000);
1203  }
1204  std::unique_lock<std::mutex> lk(mReader_);
1205  thread_quit_signal[tid] = true;
1206  cvReader_[tid]->notify_one();
1207  numFinishedThreads++;
1208  }
1209  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1210  workerThreads_[i]->join();
1211  delete workerThreads_[i];
1212  }
1213 }
1214 
1215 void FedRawDataInputSource::readWorker(unsigned int tid) {
1216  bool init = true;
1217  threadInit_.exchange(true, std::memory_order_acquire);
1218 
1219  while (true) {
1220  tid_active_[tid] = false;
1221  std::unique_lock<std::mutex> lk(mReader_);
1222  workerJob_[tid].first = nullptr;
1223  workerJob_[tid].first = nullptr;
1224 
1225  assert(!thread_quit_signal[tid]); //should never get it here
1226  workerPool_.push(tid);
1227 
1228  if (init) {
1229  std::unique_lock<std::mutex> lk(startupLock_);
1230  init = false;
1231  startupCv_.notify_one();
1232  }
1233  cvReader_[tid]->wait(lk);
1234 
1235  if (thread_quit_signal[tid])
1236  return;
1237  tid_active_[tid] = true;
1238 
1239  InputFile* file;
1240  InputChunk* chunk;
1241 
1242  assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1243 
1244  file = workerJob_[tid].first;
1245  chunk = workerJob_[tid].second;
1246 
1247  //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1248  unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1249 
1250  //if only one worker thread exists, use single fd for all operations
1251  //if more worker threads exist, use rawFd_ for only the first read operation and then close file
1252  int fileDescriptor;
1253  bool fileOpenedHere = false;
1254 
1255  if (numConcurrentReads_ == 1) {
1256  fileDescriptor = file->rawFd_;
1257  if (fileDescriptor == -1) {
1258  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1259  fileOpenedHere = true;
1260  file->rawFd_ = fileDescriptor;
1261  }
1262  } else {
1263  if (chunk->offset_ == 0) {
1264  fileDescriptor = file->rawFd_;
1265  file->rawFd_ = -1;
1266  if (fileDescriptor == -1) {
1267  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1268  fileOpenedHere = true;
1269  }
1270  } else {
1271  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1272  fileOpenedHere = true;
1273  }
1274  }
1275 
1276  if (fileDescriptor < 0) {
1277  edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1278  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1279  setExceptionState_ = true;
1280  continue;
1281  }
1282  if (fileOpenedHere) { //fast forward to this chunk position
1283  off_t pos = 0;
1284  pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1285  if (pos == -1) {
1286  edm::LogError("FedRawDataInputSource")
1287  << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1288  << chunk->offset_ << " error: " << strerror(errno);
1289  setExceptionState_ = true;
1290  continue;
1291  }
1292  }
1293 
1294  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1295  << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1296 
1297  unsigned int skipped = bufferLeft;
1299  for (unsigned int i = 0; i < readBlocks_; i++) {
1300  ssize_t last;
1301 
1302  //protect against reading into next block
1303  last = ::read(
1304  fileDescriptor, (void*)(chunk->buf_ + bufferLeft), std::min(chunk->usedSize_ - bufferLeft, eventChunkBlock_));
1305 
1306  if (last < 0) {
1307  edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1308  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1309  setExceptionState_ = true;
1310  break;
1311  }
1312  if (last > 0)
1313  bufferLeft += last;
1314  if (last < eventChunkBlock_) { //last read
1315  //check if this is last block, then total read size must match file size
1316  if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + last)) {
1317  edm::LogError("FedRawDataInputSource")
1318  << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1319  << " expectedChunkSize:" << chunk->usedSize_
1320  << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1321  << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1322  setExceptionState_ = true;
1323  }
1324  break;
1325  }
1326  }
1327  if (setExceptionState_)
1328  continue;
1329 
1331  auto diff = end - start;
1332  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1333  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1334  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1335  << " GB/s)";
1336 
1337  if (chunk->offset_ + bufferLeft == file->fileSize_) { //file reading finished using same fd
1338  close(fileDescriptor);
1339  fileDescriptor = -1;
1340  if (numConcurrentReads_ == 1)
1341  file->rawFd_ = -1;
1342  }
1343  if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1344  close(fileDescriptor);
1345 
1346  //detect FRD event version. Skip file Header if it exists
1347  if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1348  detectedFRDversion_ = *((uint16_t*)(chunk->buf_ + file->rawHeaderSize_));
1349  }
1351  chunk->readComplete_ =
1352  true; //this is atomic to secure the sequential buffer fill before becoming available for processing)
1353  file->chunks_[chunk->fileIndex_] = chunk; //put the completed chunk in the file chunk vector at predetermined index
1354  }
1355 }
1356 
1358  quit_threads_ = true;
1359  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1360 }
1361 
1363  if (fms_)
1364  fms_->setInState(state);
1365 }
1366 
1368  if (fms_)
1370 }
1371 
1372 inline bool InputFile::advance(unsigned char*& dataPosition, const size_t size) {
1373  //wait for chunk
1374 
1375  while (!waitForChunk(currentChunk_)) {
1377  usleep(100000);
1379  if (parent_->exceptionState())
1380  parent_->threadError();
1381  }
1382 
1383  dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1384  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1385 
1386  if (currentLeft < size) {
1387  //we need next chunk
1388  while (!waitForChunk(currentChunk_ + 1)) {
1390  usleep(100000);
1392  if (parent_->exceptionState())
1393  parent_->threadError();
1394  }
1395  //copy everything to beginning of the first chunk
1396  dataPosition -= chunkPosition_;
1397  assert(dataPosition == chunks_[currentChunk_]->buf_);
1398  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1399  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1400  //set pointers at the end of the old data position
1401  bufferPosition_ += size;
1402  chunkPosition_ = size - currentLeft;
1403  currentChunk_++;
1404  return true;
1405  } else {
1406  chunkPosition_ += size;
1407  bufferPosition_ += size;
1408  return false;
1409  }
1410 }
1411 
1412 inline void InputFile::moveToPreviousChunk(const size_t size, const size_t offset) {
1413  //this will fail in case of events that are too large
1415  assert(size - offset < chunks_[currentChunk_]->size_);
1416  memcpy(chunks_[currentChunk_ - 1]->buf_ + offset, chunks_[currentChunk_]->buf_ + chunkPosition_, size);
1417  chunkPosition_ += size;
1418  bufferPosition_ += size;
1419 }
1420 
1421 inline void InputFile::rewindChunk(const size_t size) {
1422  chunkPosition_ -= size;
1423  bufferPosition_ -= size;
1424 }
1425 
1427  if (rawFd_ != -1)
1428  close(rawFd_);
1429 
1430  if (deleteFile_ && !fileName_.empty()) {
1432  try {
1433  //sometimes this fails but file gets deleted
1434  LogDebug("FedRawDataInputSource:InputFile") << "Deleting input file -:" << fileName_;
1436  return;
1437  } catch (const std::filesystem::filesystem_error& ex) {
1438  edm::LogError("FedRawDataInputSource:InputFile")
1439  << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() << ". Trying again.";
1440  } catch (std::exception& ex) {
1441  edm::LogError("FedRawDataInputSource:InputFile")
1442  << " - deleteFile std::exception CAUGHT -: " << ex.what() << ". Trying again.";
1443  }
1445  }
1446 }
1447 
1448 //single-buffer mode file reading
1450  uint32_t existingSize = 0;
1451 
1452  if (fileDescriptor_ < 0) {
1453  bufferInputRead_ = 0;
1454  if (file->rawFd_ == -1) {
1455  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1456  if (file->rawHeaderSize_)
1457  lseek(fileDescriptor_, file->rawHeaderSize_, SEEK_SET);
1458  } else
1459  fileDescriptor_ = file->rawFd_;
1460 
1461  //skip header size in destination buffer (chunk position was already adjusted)
1462  bufferInputRead_ += file->rawHeaderSize_;
1463  existingSize += file->rawHeaderSize_;
1464 
1465  if (fileDescriptor_ >= 0)
1466  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1467  else {
1468  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1469  << "failed to open file " << std::endl
1470  << file->fileName_ << " fd:" << fileDescriptor_;
1471  }
1472  //fill chunk (skipping file header if present)
1473  for (unsigned int i = 0; i < readBlocks_; i++) {
1474  const ssize_t last = ::read(fileDescriptor_,
1475  (void*)(file->chunks_[0]->buf_ + existingSize),
1476  eventChunkBlock_ - (i == readBlocks_ - 1 ? existingSize : 0));
1478  existingSize += last;
1479  }
1480 
1481  } else {
1482  //continue reading
1483  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1484  for (unsigned int i = 0; i < readBlocks_; i++) {
1485  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1487  existingSize += last;
1488  }
1489  } else {
1490  //event didn't fit in last chunk, so leftover must be moved to the beginning and completed
1491  uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1492  memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1493 
1494  //calculate amount of data that can be added
1495  const uint32_t blockcount = file->chunkPosition_ / eventChunkBlock_;
1496  const uint32_t leftsize = file->chunkPosition_ % eventChunkBlock_;
1497 
1498  for (uint32_t i = 0; i < blockcount; i++) {
1499  const ssize_t last =
1500  ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1502  existingSizeLeft += last;
1503  }
1504  if (leftsize) {
1505  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1507  }
1508  file->chunkPosition_ = 0; //data was moved to beginning of the chunk
1509  }
1510  }
1511  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1512  if (fileDescriptor_ != -1) {
1513  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1514  close(fileDescriptor_);
1515  file->rawFd_ = fileDescriptor_ = -1;
1516  }
1517  }
1518 }
1519 
1521  std::lock_guard<std::mutex> lock(monlock_);
1522  auto itr = sourceEventsReport_.find(lumi);
1523  if (itr != sourceEventsReport_.end())
1524  itr->second += events;
1525  else
1527 }
1528 
1529 std::pair<bool, unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase) {
1530  std::lock_guard<std::mutex> lock(monlock_);
1531  auto itr = sourceEventsReport_.find(lumi);
1532  if (itr != sourceEventsReport_.end()) {
1533  std::pair<bool, unsigned int> ret(true, itr->second);
1534  if (erase)
1535  sourceEventsReport_.erase(itr);
1536  return ret;
1537  } else
1538  return std::pair<bool, unsigned int>(false, 0);
1539 }
1540 
1542  std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1543  if (a.rfind('/') != std::string::npos)
1544  a = a.substr(a.rfind('/'));
1545  if (b.rfind('/') != std::string::npos)
1546  b = b.substr(b.rfind('/'));
1547  return b > a;
1548  });
1549 
1550  if (!fileNames_.empty()) {
1551  //get run number from first file in the vector
1553  std::string fileStem = fileName.stem().string();
1554  if (fileStem.find("file://") == 0)
1555  fileStem = fileStem.substr(7);
1556  else if (fileStem.find("file:") == 0)
1557  fileStem = fileStem.substr(5);
1558  auto end = fileStem.find('_');
1559 
1560  if (fileStem.find("run") == 0) {
1561  std::string runStr = fileStem.substr(3, end - 3);
1562  try {
1563  //get long to support test run numbers < 2^32
1564  long rval = boost::lexical_cast<long>(runStr);
1565  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1566  return rval;
1567  } catch (boost::bad_lexical_cast const&) {
1568  edm::LogWarning("FedRawDataInputSource")
1569  << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1570  }
1571  }
1572  }
1573  return -1;
1574 }
1575 
1577  std::string& nextFile,
1578  uint32_t& fsize,
1579  uint64_t& lockWaitTime) {
1580  if (fileListIndex_ < fileNames_.size()) {
1581  nextFile = fileNames_[fileListIndex_];
1582  if (nextFile.find("file://") == 0)
1583  nextFile = nextFile.substr(7);
1584  else if (nextFile.find("file:") == 0)
1585  nextFile = nextFile.substr(5);
1586  std::filesystem::path fileName = nextFile;
1587  std::string fileStem = fileName.stem().string();
1588  if (fileStem.find("ls"))
1589  fileStem = fileStem.substr(fileStem.find("ls") + 2);
1590  if (fileStem.find('_'))
1591  fileStem = fileStem.substr(0, fileStem.find('_'));
1592 
1593  if (!fileListLoopMode_)
1594  ls = boost::lexical_cast<unsigned int>(fileStem);
1595  else //always starting from LS 1 in loop mode
1596  ls = 1 + loopModeIterationInc_;
1597 
1598  //fsize = 0;
1599  //lockWaitTime = 0;
1600  fileListIndex_++;
1602  } else {
1603  if (!fileListLoopMode_)
1605  else {
1606  //loop through files until interrupted
1608  fileListIndex_ = 0;
1609  return getFile(ls, nextFile, fsize, lockWaitTime);
1610  }
1611  }
1612 }
runTheMatrix.ret
ret
prodAgent to be discontinued
Definition: runTheMatrix.py:543
FedRawDataInputSource::fileDeleteLock_
std::mutex fileDeleteLock_
Definition: FedRawDataInputSource.h:167
change_name.diff
diff
Definition: change_name.py:13
evf::EvFDaqDirector::unlockFULocal2
void unlockFULocal2()
Definition: EvFDaqDirector.cc:929
evf::FastMonState::inSupWaitFreeThreadCopying
Definition: FastMonitoringService.h:107
eostools.ls
def ls(path, rec=False)
Definition: eostools.py:349
edm::RunNumber_t
unsigned int RunNumber_t
Definition: RunLumiEventNumber.h:14
counter
Definition: counter.py:1
FedRawDataInputSource::chunkIsFree_
bool chunkIsFree_
Definition: FedRawDataInputSource.h:142
dttmaxenums::L
Definition: DTTMax.h:29
evf::EvFDaqDirector::createBoLSFile
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
Definition: EvFDaqDirector.cc:934
FedRawDataInputSource::threadInit_
std::atomic< bool > threadInit_
Definition: FedRawDataInputSource.h:181
electrons_cff.bool
bool
Definition: electrons_cff.py:366
mps_fire.i
i
Definition: mps_fire.py:428
FedRawDataInputSource::rewind_
void rewind_() override
Definition: FedRawDataInputSource.cc:750
start
Definition: start.py:1
evf::evtn::makeEventAuxiliary
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, bool isRealData, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection, bool suppressWarning)
Definition: AuxiliaryMakers.cc:9
evf::EvFDaqDirector::runEnded
Definition: EvFDaqDirector.h:64
MessageLogger.h
evf::FastMonState::inSupNewFileWaitChunkCopying
Definition: FastMonitoringService.h:115
InputFile::parent_
FedRawDataInputSource * parent_
Definition: FedRawDataInputSource.h:212
evf::EvFDaqDirector::getEoLSFilePathOnBU
std::string getEoLSFilePathOnBU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:477
funct::false
false
Definition: Factorize.h:29
FEDNumbering::MINTriggerEGTPFEDID
Definition: FEDNumbering.h:63
InputFile::fileName_
std::string fileName_
Definition: FedRawDataInputSource.h:215
FedRawDataInputSource::maybeOpenNewLumiSection
void maybeOpenNewLumiSection(const uint32_t lumiSection)
Definition: FedRawDataInputSource.cc:306
FedRawDataInputSource::quit_threads_
std::atomic< bool > quit_threads_
Definition: FedRawDataInputSource.h:158
FedRawDataInputSource::mWakeup_
std::mutex mWakeup_
Definition: FedRawDataInputSource.h:173
FedRawDataInputSource::fileNames_
std::vector< std::string > fileNames_
Definition: FedRawDataInputSource.h:102
evf::FastMonState::inWaitChunk
Definition: FastMonitoringService.h:92
FedRawDataInputSource::workerThreads_
std::vector< std::thread * > workerThreads_
Definition: FedRawDataInputSource.h:146
FedRawDataInputSource::setMonState
void setMonState(evf::FastMonState::InputState state)
Definition: FedRawDataInputSource.cc:1362
FedRawDataInputSource::fileListIndex_
unsigned int fileListIndex_
Definition: FedRawDataInputSource.h:107
FedRawDataInputSource::MINTCDSuTCAFEDID_
uint16_t MINTCDSuTCAFEDID_
Definition: FedRawDataInputSource.h:129
evf::FastMonState::inChunkReceived
Definition: FastMonitoringService.h:93
FedRawDataInputSource::threadError
void threadError()
Definition: FedRawDataInputSource.cc:1357
FedRawDataInputSource::readingFilesCount_
std::atomic< unsigned int > readingFilesCount_
Definition: FedRawDataInputSource.h:94
submitPVValidationJobs.now
now
Definition: submitPVValidationJobs.py:639
mps_update.status
status
Definition: mps_update.py:68
evf::FastMonState::inSupNewFileWaitThread
Definition: FastMonitoringService.h:114
FedRawDataInputSource::useL1EventID_
const bool useL1EventID_
Definition: FedRawDataInputSource.h:100
min
T min(T a, T b)
Definition: MathUtil.h:58
FEDRawDataCollection
Definition: FEDRawDataCollection.h:18
evf::EvFDaqDirector::getLumisectionToStart
unsigned int getLumisectionToStart() const
Definition: EvFDaqDirector.cc:1891
edm
HLT enums.
Definition: AlignableModifier.h:19
FedRawDataInputSource::numConcurrentReads_
unsigned int numConcurrentReads_
Definition: FedRawDataInputSource.h:93
evf::FastMonState::inNoRequest
Definition: FastMonitoringService.h:98
evf::FastMonState::inSupNewFileWaitChunk
Definition: FastMonitoringService.h:116
evf::evtn::gtpe_board_sense
bool gtpe_board_sense(const unsigned char *p)
Definition: GlobalEventNumber.cc:11
FedRawDataInputSource::nStreams_
unsigned int nStreams_
Definition: FedRawDataInputSource.h:169
pos
Definition: PixelAliasList.h:18
FedRawDataInputSource::getNextEvent
evf::EvFDaqDirector::FileStatus getNextEvent()
Definition: FedRawDataInputSource.cc:352
FedRawDataInputSource::runNumber_
edm::RunNumber_t runNumber_
Definition: FedRawDataInputSource.h:111
FedRawDataInputSource::currentLumiSection_
unsigned int currentLumiSection_
Definition: FedRawDataInputSource.h:121
evf::FastMonState::inInit
Definition: FastMonitoringService.h:85
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
l1tstage2_dqm_sourceclient-live_cfg.rawData
rawData
Definition: l1tstage2_dqm_sourceclient-live_cfg.py:163
FedRawDataInputSource::workerPool_
tbb::concurrent_queue< unsigned int > workerPool_
Definition: FedRawDataInputSource.h:148
FedRawDataInputSource::sourceEventsReport_
std::map< unsigned int, unsigned int > sourceEventsReport_
Definition: FedRawDataInputSource.h:183
DeadROCCounter.getRunNumber
def getRunNumber(filename)
Definition: DeadROCCounter.py:7
if
if(0==first)
Definition: CAHitNtupletGeneratorKernelsImpl.h:58
FedRawDataInputSource::readNextChunkIntoBuffer
void readNextChunkIntoBuffer(InputFile *file)
Definition: FedRawDataInputSource.cc:1449
edm::InputSourceDescription
Definition: InputSourceDescription.h:21
cms::cuda::assert
assert(be >=bs)
protons_cff.time
time
Definition: protons_cff.py:35
FedRawDataInputSource::fms_
evf::FastMonitoringService * fms_
Definition: FedRawDataInputSource.h:83
edm::second
U second(std::pair< T, U > const &p)
Definition: ParameterSet.cc:222
edm::Timestamp::beginOfTime
static Timestamp beginOfTime()
Definition: Timestamp.h:84
InputFile::moveToPreviousChunk
void moveToPreviousChunk(const size_t size, const size_t offset)
Definition: FedRawDataInputSource.cc:1412
FedRawDataInputSource::cvReader_
std::vector< std::condition_variable * > cvReader_
Definition: FedRawDataInputSource.h:155
FedRawDataInputSource::eventChunkSize_
unsigned int eventChunkSize_
Definition: FedRawDataInputSource.h:88
evf::evtn::get
unsigned int get(const unsigned char *, bool)
Definition: GlobalEventNumber.cc:77
FEDNumbering::MAXFEDID
Definition: FEDNumbering.h:26
FedRawDataInputSource::cvWakeup_
std::condition_variable cvWakeup_
Definition: FedRawDataInputSource.h:174
evf::EvFDaqDirector::sameFile
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::runAbort
Definition: EvFDaqDirector.h:64
edm::LogInfo
Log< level::Info, false > LogInfo
Definition: MessageLogger.h:125
FedRawDataInputSource::freeChunks_
tbb::concurrent_queue< InputChunk * > freeChunks_
Definition: FedRawDataInputSource.h:151
MillePedeFileConverter_cfg.fileName
fileName
Definition: MillePedeFileConverter_cfg.py:32
evf::evtn::getgpslow
unsigned int getgpslow(const unsigned char *)
Definition: GlobalEventNumber.cc:92
patZpeak.events
events
Definition: patZpeak.py:20
newFWLiteAna.found
found
Definition: newFWLiteAna.py:118
edm::LogWarning
Log< level::Warning, false > LogWarning
Definition: MessageLogger.h:122
FRDHeaderVersionSize
constexpr std::array< uint32, FRDHeaderMaxVersion+1 > FRDHeaderVersionSize
Definition: FRDEventMessage.h:129
evf::FastMonState::inSupWaitFreeThread
Definition: FastMonitoringService.h:106
evf::FastMonitoringService::setInputSource
void setInputSource(FedRawDataInputSource *inputSource)
Definition: FastMonitoringService.h:219
FedRawDataInputSource::bufferInputRead_
uint32_t bufferInputRead_
Definition: FedRawDataInputSource.h:179
edm::InputSource::processHistoryRegistryForUpdate
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:331
edm::InputSource::state
ItemType state() const
Definition: InputSource.h:332
FEDRawData::data
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:24
FedRawDataInputSource::getEventReport
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
Definition: FedRawDataInputSource.cc:1529
InputFile::bufferPosition_
uint32_t bufferPosition_
Definition: FedRawDataInputSource.h:226
InputFile
Definition: FedRawDataInputSource.h:211
InputChunk::offset_
unsigned int offset_
Definition: FedRawDataInputSource.h:193
edm::Wrapper
Definition: Product.h:10
FedRawDataInputSource::workerJob_
std::vector< ReaderInfo > workerJob_
Definition: FedRawDataInputSource.h:149
FEDRawData
Definition: FEDRawData.h:19
InputFile::chunkPosition_
uint32_t chunkPosition_
Definition: FedRawDataInputSource.h:227
Utilities.operator
operator
Definition: Utilities.py:24
FedRawDataInputSource::readBlocks_
unsigned int readBlocks_
Definition: FedRawDataInputSource.h:90
AuxiliaryMakers.h
edm::LuminosityBlockAuxiliary
Definition: LuminosityBlockAuxiliary.h:15
evf::FastMonState::inThrottled
Definition: FastMonitoringService.h:150
FEDNumbering::MINTriggerGTPFEDID
Definition: FEDNumbering.h:61
evf::EvFDaqDirector::updateFuLock
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
Definition: EvFDaqDirector.cc:504
FedRawDataInputSource::streamFileTracker_
std::vector< int > streamFileTracker_
Definition: FedRawDataInputSource.h:168
InputChunk::usedSize_
uint32_t usedSize_
Definition: FedRawDataInputSource.h:191
InputChunk::buf_
unsigned char * buf_
Definition: FedRawDataInputSource.h:188
InputChunk::reset
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
Definition: FedRawDataInputSource.h:201
FedRawDataInputSource::eventRunNumber_
uint32_t eventRunNumber_
Definition: FedRawDataInputSource.h:122
evf::EvFDaqDirector::getEoLSFilePathOnFU
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:481
evf::FastMonState::inReadCleanup
Definition: FastMonitoringService.h:97
FedRawDataInputSource::testTCDSFEDRange_
const std::vector< unsigned int > testTCDSFEDRange_
Definition: FedRawDataInputSource.h:101
dqmdumpme.last
last
Definition: dqmdumpme.py:56
FedRawDataInputSource::InputFile
friend struct InputFile
Definition: FedRawDataInputSource.h:41
edm::EventPrincipal
Definition: EventPrincipal.h:48
edm::ConfigurationDescriptions::add
void add(std::string const &label, ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:57
evf::EvFDaqDirector::isSingleStreamThread
bool isSingleStreamThread()
Definition: EvFDaqDirector.h:122
FEDNumbering::MAXTCDSuTCAFEDID
Definition: FEDNumbering.h:102
evf::FastMonState::inWaitInput
Definition: FastMonitoringService.h:86
evf::FastMonState::inNewLumi
Definition: FastMonitoringService.h:87
evf::EvFDaqDirector::inputThrottled
bool inputThrottled()
Definition: EvFDaqDirector.cc:2060
FRDHeaderMaxVersion
constexpr size_t FRDHeaderMaxVersion
Definition: FRDEventMessage.h:128
evf::EvFDaqDirector::parseFRDFileHeader
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
Definition: EvFDaqDirector.cc:966
TCDSRaw.h
FedRawDataInputSource::filesToDelete_
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
Definition: FedRawDataInputSource.h:165
EventID.h
FedRawDataInputSource::getLSFromFilename_
const bool getLSFromFilename_
Definition: FedRawDataInputSource.h:97
FedRawDataInputSource::ReaderInfo
std::pair< InputFile *, InputChunk * > ReaderInfo
Definition: FedRawDataInputSource.h:138
InputChunk
Definition: FedRawDataInputSource.h:187
FedRawDataInputSource::FedRawDataInputSource
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
Definition: FedRawDataInputSource.cc:53
mps_fire.end
end
Definition: mps_fire.py:242
FedRawDataInputSource::fileListLoopMode_
const bool fileListLoopMode_
Definition: FedRawDataInputSource.h:108
edm::InputSource::luminosityBlockAuxiliary
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:233
InputFile::currentChunk_
unsigned int currentChunk_
Definition: FedRawDataInputSource.h:228
FedRawDataInputSource::daqDirector_
evf::EvFDaqDirector * daqDirector_
Definition: FedRawDataInputSource.h:84
evf::FastMonState::inSupFileLimit
Definition: FastMonitoringService.h:103
tcds::Raw_v1
Definition: TCDSRaw.h:106
svgfig.load
def load(fileName)
Definition: svgfig.py:547
dt4ml_dqm_sourceclient-live_cfg.filePath
filePath
CUSTOMIZE FOR ML.
Definition: dt4ml_dqm_sourceclient-live_cfg.py:45
evf::FastMonState::inChecksumEvent
Definition: FastMonitoringService.h:94
evf::EvFDaqDirector::noFile
Definition: EvFDaqDirector.h:64
FEDNumbering::MINTCDSuTCAFEDID
Definition: FEDNumbering.h:101
FedRawDataInputSource::fileQueue_
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
Definition: FedRawDataInputSource.h:152
evf::EvFDaqDirector::useFileBroker
bool useFileBroker() const
Definition: EvFDaqDirector.h:79
InputChunk::readComplete_
std::atomic< bool > readComplete_
Definition: FedRawDataInputSource.h:195
b
double b
Definition: hdecay.h:118
edm::EventPrincipal::put
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
Definition: EventPrincipal.cc:185
FedRawDataInputSource::reportEventsThisLumiInSource
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
Definition: FedRawDataInputSource.cc:1520
evf::FastMonState::inSupBusy
Definition: FastMonitoringService.h:108
FedRawDataInputSource::processHistoryID_
edm::ProcessHistoryID processHistoryID_
Definition: FedRawDataInputSource.h:119
InputSourceDescription.h
first
auto first
Definition: CAHitNtupletGeneratorKernelsImpl.h:125
InputFile::~InputFile
~InputFile()
Definition: FedRawDataInputSource.cc:1426
edm::EventAuxiliary
Definition: EventAuxiliary.h:14
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
FedRawDataInputSource::startedSupervisorThread_
bool startedSupervisorThread_
Definition: FedRawDataInputSource.h:144
FedRawDataInputSource::exceptionState
bool exceptionState()
Definition: FedRawDataInputSource.h:68
FedRawDataInputSource::fillDescriptions
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: FedRawDataInputSource.cc:206
FedRawDataInputSource::daqProvenanceHelper_
const edm::DaqProvenanceHelper daqProvenanceHelper_
Definition: FedRawDataInputSource.h:114
geometryDiff.file
file
Definition: geometryDiff.py:13
crc32c
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
edm::InputSource::runAuxiliary
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:230
FedRawDataInputSource::initFileList
long initFileList()
Definition: FedRawDataInputSource.cc:1541
FedRawDataInputSource::L1EventID_
uint32_t L1EventID_
Definition: FedRawDataInputSource.h:124
UnixSignalHandlers.h
cppFunctionSkipper.exception
exception
Definition: cppFunctionSkipper.py:10
funct::true
true
Definition: Factorize.h:173
evf::FastMonState::inSupNewFile
Definition: FastMonitoringService.h:112
InputChunk::fileIndex_
unsigned int fileIndex_
Definition: FedRawDataInputSource.h:194
reader.h
LogDebug
#define LogDebug(id)
Definition: MessageLogger.h:233
edm::InputSource::setRunAuxiliary
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:333
edm::ParameterSet
Definition: ParameterSet.h:47
FedRawDataInputSource::currentFileIndex_
int currentFileIndex_
Definition: FedRawDataInputSource.h:164
FEDTrailer
Definition: FEDTrailer.h:14
a
double a
Definition: hdecay.h:119
evf::FastMonState::inSupLockPolling
Definition: FastMonitoringService.h:109
Timestamp.h
FedRawDataInputSource::alwaysStartFromFirstLS_
const bool alwaysStartFromFirstLS_
Definition: FedRawDataInputSource.h:98
FFFNamingSchema.h
Event.h
edm::InputSource::productRegistryUpdate
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:330
FedRawDataInputSource::eventChunkBlock_
unsigned int eventChunkBlock_
Definition: FedRawDataInputSource.h:89
FEDHeader::triggerType
uint8_t triggerType() const
Event Trigger type identifier.
Definition: FEDHeader.cc:13
CommonMethods.lock
def lock()
Definition: CommonMethods.py:81
edm::DaqProvenanceHelper::daqInit
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
Definition: DaqProvenanceHelper.cc:83
evf::EvFDaqDirector::getNextFromFileBroker
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
Definition: EvFDaqDirector.cc:1718
FedRawDataInputSource::verifyChecksum_
const bool verifyChecksum_
Definition: FedRawDataInputSource.h:99
mps_check.timeout
int timeout
Definition: mps_check.py:53
evf::FastMonState::inReadEvent
Definition: FastMonitoringService.h:96
SiStripCommissioningSource_FromEDM_cfg.EvFDaqDirector
EvFDaqDirector
Definition: SiStripCommissioningSource_FromEDM_cfg.py:57
evf::EvFDaqDirector::unlockFULocal
void unlockFULocal()
Definition: EvFDaqDirector.cc:919
jetUpdater_cfi.sort
sort
Definition: jetUpdater_cfi.py:29
edm::shutdown_flag
volatile std::atomic< bool > shutdown_flag
Definition: UnixSignalHandlers.cc:22
FedRawDataInputSource::checkNext
Next checkNext() override
Definition: FedRawDataInputSource.cc:231
FedRawDataInputSource::fillFEDRawDataCollection
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &rawData, bool &tcdsInRange)
Definition: FedRawDataInputSource.cc:689
printConversionInfo.aux
aux
Definition: printConversionInfo.py:19
edm::Service
Definition: Service.h:30
createfilelist.int
int
Definition: createfilelist.py:10
FedRawDataInputSource::currentFile_
std::unique_ptr< InputFile > currentFile_
Definition: FedRawDataInputSource.h:141
FedRawDataInputSource::setExceptionState_
bool setExceptionState_
Definition: FedRawDataInputSource.h:160
FastMonitoringService.h
edm::InputSource::luminosityBlock
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:462
FedRawDataInputSource::eventID_
edm::EventID eventID_
Definition: FedRawDataInputSource.h:118
edm::RawInputSource::Next::kStop
FEDRawDataCollection.h
evf::FastMonState::inSupWaitFreeChunk
Definition: FastMonitoringService.h:104
FedRawDataInputSource::mReader_
std::mutex mReader_
Definition: FedRawDataInputSource.h:154
trackerHitRTTI::vector
Definition: trackerHitRTTI.h:21
FedRawDataInputSource::eventsThisLumi_
unsigned int eventsThisLumi_
Definition: FedRawDataInputSource.h:126
crc32c.h
FedRawDataInputSource::~FedRawDataInputSource
~FedRawDataInputSource() override
Definition: FedRawDataInputSource.cc:175
FEDHeader::length
static const uint32_t length
Definition: FEDHeader.h:54
evf::EvFDaqDirector::getEoRFilePathOnFU
std::string getEoRFilePathOnFU() const
Definition: EvFDaqDirector.cc:491
edm::EventPrincipal::streamID
StreamID streamID() const
Definition: EventPrincipal.h:110
l1tstage2_dqm_sourceclient-live_cfg.fedId
fedId
Definition: l1tstage2_dqm_sourceclient-live_cfg.py:89
evf::EvFDaqDirector::newLumi
Definition: EvFDaqDirector.h:64
edm::LogError
Log< level::Error, false > LogError
Definition: MessageLogger.h:123
FedRawDataInputSource::getFile
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
Definition: FedRawDataInputSource.cc:1576
InputFile::rawFd_
int rawFd_
Definition: FedRawDataInputSource.h:217
FedRawDataInputSource::startupLock_
std::mutex startupLock_
Definition: FedRawDataInputSource.h:161
visDQMUpload.buf
buf
Definition: visDQMUpload.py:160
AlCaHLTBitMon_QueryRunRegistry.string
string string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
tcds
Definition: TCDSRaw.h:16
FedRawDataInputSource.h
evf::FastMonitoringService::reportLockWait
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
Definition: FastMonitoringService.cc:720
evf::evtn::evm_board_sense
bool evm_board_sense(const unsigned char *p, size_t size)
Definition: GlobalEventNumber.cc:15
FedRawDataInputSource::thread_quit_signal
std::vector< unsigned int > thread_quit_signal
Definition: FedRawDataInputSource.h:159
InputSourceMacros.h
InputFile::advance
bool advance(unsigned char *&dataPosition, const size_t size)
Definition: FedRawDataInputSource.cc:1372
evf::FastMonState::inSupNoFile
Definition: FastMonitoringService.h:111
FedRawDataInputSource::useFileBroker_
bool useFileBroker_
Definition: FedRawDataInputSource.h:103
edm::InputSource::setLuminosityBlockAuxiliary
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:337
evf::FastMonitoringService::setInState
void setInState(FastMonState::InputState inputState)
Definition: FastMonitoringService.h:220
DataPointDefinition.h
evf::FastMonState
Definition: FastMonitoringService.h:51
edm::RawInputSource::Next
Next
Definition: RawInputSource.h:24
edm::Timestamp::invalidTimestamp
static Timestamp invalidTimestamp()
Definition: Timestamp.h:82
edm::EventAuxiliary::PhysicsTrigger
Definition: EventAuxiliary.h:20
FedRawDataInputSource::readSupervisor
void readSupervisor()
Definition: FedRawDataInputSource.cc:752
submitPVResolutionJobs.desc
string desc
Definition: submitPVResolutionJobs.py:251
eostools.move
def move(src, dest)
Definition: eostools.py:511
std
Definition: JetResolutionObject.h:76
evf::EvFDaqDirector::createProcessingNotificationMaybe
void createProcessingNotificationMaybe() const
Definition: EvFDaqDirector.cc:2046
crc32c_hw_test
bool crc32c_hw_test()
Definition: crc32c.cc:354
dqmiodatasetharvest.inf
inf
Definition: dqmiodatasetharvest.py:38
init
Definition: init.py:1
RunInfoPI::state
state
Definition: RunInfoPayloadInspectoHelper.h:16
InputFile::chunks_
tbb::concurrent_vector< InputChunk * > chunks_
Definition: FedRawDataInputSource.h:224
edm::RawInputSource::makeEvent
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
Definition: RawInputSource.cc:51
edm::DaqProvenanceHelper::branchDescription
BranchDescription const & branchDescription() const
Definition: DaqProvenanceHelper.h:46
FedRawDataInputSource::event_
std::unique_ptr< FRDEventMsgView > event_
Definition: FedRawDataInputSource.h:116
edm::InputSource::setNewRun
void setNewRun()
Definition: InputSource.h:355
FEDTrailer::length
static const uint32_t length
Definition: FEDTrailer.h:57
FedRawDataInputSource::nextEvent
evf::EvFDaqDirector::FileStatus nextEvent()
Definition: FedRawDataInputSource.cc:343
FedRawDataInputSource::detectedFRDversion_
uint16_t detectedFRDversion_
Definition: FedRawDataInputSource.h:140
relativeConstraints.empty
bool empty
Definition: relativeConstraints.py:46
GlobalEventNumber.h
EventAuxiliary.h
FedRawDataInputSource::tid_active_
std::vector< unsigned int > tid_active_
Definition: FedRawDataInputSource.h:156
Exception
Definition: hltDiff.cc:245
FedRawDataInputSource::checkEvery_
unsigned int checkEvery_
Definition: FedRawDataInputSource.h:170
evf::FastMonState::inProcessingFile
Definition: FastMonitoringService.h:91
FedRawDataInputSource::setMonStateSup
void setMonStateSup(evf::FastMonState::InputState state)
Definition: FedRawDataInputSource.cc:1367
FEDRawData::resize
void resize(size_t newsize)
Definition: FEDRawData.cc:28
evf::EvFDaqDirector::setFMS
void setFMS(evf::FastMonitoringService *fms)
Definition: EvFDaqDirector.h:121
MatrixUtil.remove
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:219
FedRawDataInputSource::fileDescriptor_
int fileDescriptor_
Definition: FedRawDataInputSource.h:178
evf
Definition: fillJson.h:27
edm::InputSource::setEventCached
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:362
evf::FastMonitoringService::startedLookingForFile
void startedLookingForFile()
Definition: FastMonitoringService.cc:684
evf::FastMonState::inCachedEvent
Definition: FastMonitoringService.h:95
evf::FastMonState::inSupNewFileWaitThreadCopying
Definition: FastMonitoringService.h:113
evf::FastMonitoringService::setExceptionDetected
void setExceptionDetected(unsigned int ls)
Definition: FastMonitoringService.cc:419
evf::EvFDaqDirector::grabNextJsonFileAndUnlock
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
Definition: EvFDaqDirector.cc:1363
edm::InputSource::resetLuminosityBlockAuxiliary
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:345
evf::FastMonState::inRunEnd
Definition: FastMonitoringService.h:90
FEDHeader::sourceID
uint16_t sourceID() const
Identifier of the FED.
Definition: FEDHeader.cc:19
FedRawDataInputSource::startupCv_
std::condition_variable startupCv_
Definition: FedRawDataInputSource.h:162
FedRawDataInputSource::tcds_pointer_
unsigned char * tcds_pointer_
Definition: FedRawDataInputSource.h:125
cond::uint64_t
unsigned long long uint64_t
Definition: Time.h:13
funct::pow
Power< A, B >::type pow(const A &a, const B &b)
Definition: Power.h:29
FedRawDataInputSource::fileListMode_
const bool fileListMode_
Definition: FedRawDataInputSource.h:106
cms::Exception
Definition: Exception.h:70
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
FedRawDataInputSource::InputChunk
friend struct InputChunk
Definition: FedRawDataInputSource.h:42
evf::FastMonState::inSupWaitFreeChunkCopying
Definition: FastMonitoringService.h:105
FedRawDataInputSource::readSupervisorThread_
std::unique_ptr< std::thread > readSupervisorThread_
Definition: FedRawDataInputSource.h:145
command_line.start
start
Definition: command_line.py:167
ParameterSet.h
edm::InputSource::processGUID
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:195
dqmiolumiharvest.j
j
Definition: dqmiolumiharvest.py:66
runEdmFileComparison.skipped
skipped
Definition: runEdmFileComparison.py:224
edm_modernize_messagelogger.stat
stat
Definition: edm_modernize_messagelogger.py:27
edm::RawInputSource::Next::kEvent
evf::evtn::gtpe_get
unsigned int gtpe_get(const unsigned char *)
Definition: GlobalEventNumber.cc:83
event
Definition: event.py:1
edm::EventID
Definition: EventID.h:31
evf::FastMonitoringService::stoppedLookingForFile
void stoppedLookingForFile(unsigned int lumi)
Definition: FastMonitoringService.cc:692
hltrates_dqm_sourceclient-live_cfg.offset
offset
Definition: hltrates_dqm_sourceclient-live_cfg.py:82
FEDHeader
Definition: FEDHeader.h:14
InputFile::waitForChunk
bool waitForChunk(unsigned int chunkid)
Definition: FedRawDataInputSource.h:258
FedRawDataInputSource::MAXTCDSuTCAFEDID_
uint16_t MAXTCDSuTCAFEDID_
Definition: FedRawDataInputSource.h:130
FEDHeader.h
lumi
Definition: LumiSectionData.h:20
InputFile::rewindChunk
void rewindChunk(const size_t size)
Definition: FedRawDataInputSource.cc:1421
edm::Log
Definition: MessageLogger.h:70
evf::EvFDaqDirector::FileStatus
FileStatus
Definition: EvFDaqDirector.h:64
evf::FastMonitoringService::setInStateSup
void setInStateSup(FastMonState::InputState inputState)
Definition: FastMonitoringService.h:221
evf::EvFDaqDirector::lockFULocal2
void lockFULocal2()
Definition: EvFDaqDirector.cc:924
FedRawDataInputSource::monlock_
std::mutex monlock_
Definition: FedRawDataInputSource.h:184
InputFile::deleteFile_
bool deleteFile_
Definition: FedRawDataInputSource.h:216
FedRawDataInputSource::read
void read(edm::EventPrincipal &eventPrincipal) override
Definition: FedRawDataInputSource.cc:613
sistrip::runNumber_
static const char runNumber_[]
Definition: ConstantsForDqm.h:33
FedRawDataInputSource::numBuffers_
unsigned int numBuffers_
Definition: FedRawDataInputSource.h:91
evf::evtn::getgpshigh
unsigned int getgpshigh(const unsigned char *)
Definition: GlobalEventNumber.cc:95
FedRawDataInputSource::maxBufferedFiles_
unsigned int maxBufferedFiles_
Definition: FedRawDataInputSource.h:92
edm::TimeValue_t
unsigned long long TimeValue_t
Definition: Timestamp.h:28
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
FedRawDataInputSource::singleBufferMode_
bool singleBufferMode_
Definition: FedRawDataInputSource.h:177
FedRawDataInputSource::readWorker
void readWorker(unsigned int tid)
Definition: FedRawDataInputSource.cc:1215
FedRawDataInputSource::loopModeIterationInc_
unsigned int loopModeIterationInc_
Definition: FedRawDataInputSource.h:109
evf::EvFDaqDirector::newFile
Definition: EvFDaqDirector.h:64
edm::RunAuxiliary
Definition: RunAuxiliary.h:15
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
edm::InputSource::run
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:457
evf::EvFDaqDirector::grabNextJsonFromRaw
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
Definition: EvFDaqDirector.cc:1101
FEDTrailer::fragmentLength
uint32_t fragmentLength() const
The length of the event fragment counted in 64-bit words including header and trailer.
Definition: FEDTrailer.cc:13
FedRawDataInputSource::GTPEventID_
uint32_t GTPEventID_
Definition: FedRawDataInputSource.h:123
evf::EvFDaqDirector::setDeleteTracking
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
Definition: EvFDaqDirector.h:176
edm::DaqProvenanceHelper::dummyProvenance
ProductProvenance const & dummyProvenance() const
Definition: DaqProvenanceHelper.h:48
FEDTrailer.h
evf::FastMonState::InputState
InputState
Definition: FastMonitoringService.h:83
edm::Timestamp
Definition: Timestamp.h:30