CMS 3D CMS Logo

FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <memory>
5 #include <sstream>
6 #include <sys/types.h>
7 #include <sys/file.h>
8 #include <sys/time.h>
9 #include <unistd.h>
10 #include <vector>
11 #include <fstream>
12 #include <zlib.h>
13 #include <cstdio>
14 #include <chrono>
15 
16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
18 
23 
25 
32 
34 
36 
40 
42 
47 
48 //JSON file reader
50 
51 #include <boost/lexical_cast.hpp>
52 
54  : edm::RawInputSource(pset, desc),
55  defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
56  eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
57  eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
58  numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
59  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
60  getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
61  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
62  verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
63  useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
64  fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
65  fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
66  fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
68  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
69  eventID_(),
70  processHistoryID_(),
71  currentLumiSection_(0),
72  tcds_pointer_(nullptr),
73  eventsThisLumi_(0) {
74  char thishost[256];
75  gethostname(thishost, 255);
76  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
77  << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
78 
79  long autoRunNumber = -1;
80  if (fileListMode_) {
81  autoRunNumber = initFileList();
82  if (!fileListLoopMode_) {
83  if (autoRunNumber < 0)
84  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
85  //override run number
86  runNumber_ = (edm::RunNumber_t)autoRunNumber;
87  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
88  }
89  }
90 
92  setNewRun();
93  //todo:autodetect from file name (assert if names differ)
95 
96  //make sure that chunk size is N * block size
101 
102  if (!numBuffers_)
103  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
104  << "no reading enabled with numBuffers parameter 0";
105 
108  readingFilesCount_ = 0;
109 
110  if (!crc32c_hw_test())
111  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
112 
113  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
114  if (fileListMode_) {
115  try {
116  fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
117  } catch (cms::Exception const&) {
118  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
119  }
120  } else {
121  fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
122  if (!fms_) {
123  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
124  }
125  }
126 
128  if (!daqDirector_)
129  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
130 
132  if (useFileBroker_)
133  edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
134  //set DaqDirector to delete files in preGlobalEndLumi callback
136  if (fms_) {
138  fms_->setInputSource(this);
141  }
142  //should delete chunks when run stops
143  for (unsigned int i = 0; i < numBuffers_; i++) {
145  }
146 
147  quit_threads_ = false;
148 
149  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
150  std::unique_lock<std::mutex> lk(startupLock_);
151  //issue a memory fence here and in threads (constructor was segfaulting without this)
152  thread_quit_signal.push_back(false);
153  workerJob_.push_back(ReaderInfo(nullptr, nullptr));
154  cvReader_.push_back(new std::condition_variable);
155  tid_active_.push_back(0);
156  threadInit_.store(false, std::memory_order_release);
157  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
158  startupCv_.wait(lk);
159  }
160 
161  runAuxiliary()->setProcessHistoryID(processHistoryID_);
162 }
163 
165  quit_threads_ = true;
166 
167  //delete any remaining open files
168  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
169  std::string fileToDelete = it->second->fileName_;
170  it->second.release();
171  }
173  readSupervisorThread_->join();
174  } else {
175  //join aux threads in case the supervisor thread was not started
176  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
177  std::unique_lock<std::mutex> lk(mReader_);
178  thread_quit_signal[i] = true;
179  cvReader_[i]->notify_one();
180  lk.unlock();
181  workerThreads_[i]->join();
182  delete workerThreads_[i];
183  }
184  }
185  for (unsigned int i = 0; i < numConcurrentReads_; i++)
186  delete cvReader_[i];
187  /*
188  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
189  InputChunk *ch;
190  while (!freeChunks_.try_pop(ch)) {}
191  delete ch;
192  }
193  */
194 }
195 
198  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
199  desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
200  desc.addUntracked<unsigned int>("eventChunkBlock", 32)
201  ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
202  desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
203  desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
204  ->setComment("Maximum number of simultaneously buffered raw files");
205  desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
206  ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
207  desc.addUntracked<bool>("verifyChecksum", true)
208  ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
209  desc.addUntracked<bool>("useL1EventID", false)
210  ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
211  desc.addUntracked<bool>("fileListMode", false)
212  ->setComment("Use fileNames parameter to directly specify raw files to open");
213  desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
214  ->setComment("file list used when fileListMode is enabled");
215  desc.setAllowAnything();
216  descriptions.add("source", desc);
217 }
218 
221  //this thread opens new files and dispatches reading to worker readers
222  //threadInit_.store(false,std::memory_order_release);
223  std::unique_lock<std::mutex> lk(startupLock_);
224  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor, this));
226  startupCv_.wait(lk);
227  }
228  //signal hltd to start event accounting
229  if (!currentLumiSection_)
231  if (fms_)
233  switch (nextEvent()) {
235  //maybe create EoL file in working directory before ending run
236  struct stat buf;
237  if (!useFileBroker_ && currentLumiSection_ > 0) {
238  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
239  if (eolFound) {
241  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
242  if (!found) {
244  int eol_fd =
245  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
246  close(eol_fd);
248  }
249  }
250  }
251  //also create EoR file in FU data directory
252  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
253  if (!eorFound) {
254  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
255  O_RDWR | O_CREAT,
256  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
257  close(eor_fd);
258  }
260  eventsThisLumi_ = 0;
262  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
263  return Next::kStop;
264  }
266  //this is not reachable
267  return Next::kEvent;
268  }
270  //std::cout << "--------------NEW LUMI---------------" << std::endl;
271  return Next::kEvent;
272  }
273  default: {
274  if (!getLSFromFilename_) {
275  //get new lumi from file header
276  if (event_->lumi() > currentLumiSection_) {
278  eventsThisLumi_ = 0;
280  }
281  }
284  else
285  eventRunNumber_ = event_->run();
286  L1EventID_ = event_->event();
287 
288  setEventCached();
289 
290  return Next::kEvent;
291  }
292  }
293 }
294 
295 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
296  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
297  if (!useFileBroker_) {
298  if (currentLumiSection_ > 0) {
300  struct stat buf;
301  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
302  if (!found) {
304  int eol_fd =
305  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
306  close(eol_fd);
307  daqDirector_->createBoLSFile(lumiSection, false);
309  }
310  } else
311  daqDirector_->createBoLSFile(lumiSection, true); //needed for initial lumisection
312  }
313 
314  currentLumiSection_ = lumiSection;
315 
317 
318  timeval tv;
319  gettimeofday(&tv, nullptr);
320  const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
321 
323  runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
324 
325  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
326  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
327 
328  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
329  }
330 }
331 
335  if (edm::shutdown_flag.load(std::memory_order_relaxed))
336  break;
337  }
338  return status;
339 }
340 
342  if (setExceptionState_)
343  threadError();
344  if (!currentFile_.get()) {
346  if (fms_)
348  if (!fileQueue_.try_pop(currentFile_)) {
349  //sleep until wakeup (only in single-buffer mode) or timeout
350  std::unique_lock<std::mutex> lkw(mWakeup_);
351  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
353  }
354  status = currentFile_->status_;
356  if (fms_)
358  currentFile_.release();
359  return status;
360  } else if (status == evf::EvFDaqDirector::runAbort) {
361  throw cms::Exception("FedRawDataInputSource::getNextEvent")
362  << "Run has been aborted by the input source reader thread";
363  } else if (status == evf::EvFDaqDirector::newLumi) {
364  if (fms_)
366  if (getLSFromFilename_) {
367  if (currentFile_->lumi_ > currentLumiSection_) {
369  eventsThisLumi_ = 0;
371  }
372  } else { //let this be picked up from next event
374  }
375 
376  currentFile_.release();
377  return status;
378  } else if (status == evf::EvFDaqDirector::newFile) {
380  } else
381  assert(false);
382  }
383  if (fms_)
385 
386  //file is empty
387  if (!currentFile_->fileSize_) {
389  //try to open new lumi
390  assert(currentFile_->nChunks_ == 0);
391  if (getLSFromFilename_)
392  if (currentFile_->lumi_ > currentLumiSection_) {
394  eventsThisLumi_ = 0;
396  }
397  //immediately delete empty file
398  std::string currentName = currentFile_->fileName_;
399  currentFile_.release();
401  }
402 
403  //file is finished
404  if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
406  //release last chunk (it is never released elsewhere)
407  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
408  if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
409  throw cms::Exception("FedRawDataInputSource::getNextEvent")
410  << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
411  << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
412  }
413  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
414  if (singleBufferMode_) {
415  std::unique_lock<std::mutex> lkw(mWakeup_);
416  cvWakeup_.notify_one();
417  }
418  bufferInputRead_ = 0;
420  //put the file in pending delete list;
421  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
422  filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
423  } else {
424  //in single-thread and stream jobs, events are already processed
425  std::string currentName = currentFile_->fileName_;
426  currentFile_.release();
427  }
429  }
430 
431  //handle RAW file header
432  if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
433  if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
434  if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
435  throw cms::Exception("FedRawDataInputSource::getNextEvent")
436  << "Premature end of input file while reading file header";
437 
438  edm::LogWarning("FedRawDataInputSource")
439  << "File with only raw header and no events received in LS " << currentFile_->lumi_;
440  if (getLSFromFilename_)
441  if (currentFile_->lumi_ > currentLumiSection_) {
443  eventsThisLumi_ = 0;
445  }
446  }
447 
448  //advance buffer position to skip file header (chunk will be acquired later)
449  currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
450  currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
451  }
452 
453  //file is too short
454  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
455  throw cms::Exception("FedRawDataInputSource::getNextEvent")
456  << "Premature end of input file while reading event header";
457  }
458  if (singleBufferMode_) {
459  //should already be there
460  if (fms_)
462  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
463  usleep(10000);
464  if (currentFile_->parent_->exceptionState() || setExceptionState_)
465  currentFile_->parent_->threadError();
466  }
467  if (fms_)
469 
470  unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
471 
472  //conditions when read amount is not sufficient for the header to fit
476 
477  if (detectedFRDversion_ == 0) {
478  detectedFRDversion_ = *((uint32*)dataPosition);
479  if (detectedFRDversion_ > 5)
480  throw cms::Exception("FedRawDataInputSource::getNextEvent")
481  << "Unknown FRD version -: " << detectedFRDversion_;
483  }
484 
485  //recalculate chunk position
486  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
488  throw cms::Exception("FedRawDataInputSource::getNextEvent")
489  << "Premature end of input file while reading event header";
490  }
491  }
492 
493  event_.reset(new FRDEventMsgView(dataPosition));
494  if (event_->size() > eventChunkSize_) {
495  throw cms::Exception("FedRawDataInputSource::getNextEvent")
496  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
497  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
498  << " bytes";
499  }
500 
501  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
502 
503  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
504  throw cms::Exception("FedRawDataInputSource::getNextEvent")
505  << "Premature end of input file while reading event data";
506  }
507  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
509  //recalculate chunk position
510  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
511  event_.reset(new FRDEventMsgView(dataPosition));
512  }
513  currentFile_->bufferPosition_ += event_->size();
514  currentFile_->chunkPosition_ += event_->size();
515  //last chunk is released when this function is invoked next time
516 
517  }
518  //multibuffer mode:
519  else {
520  //wait for the current chunk to become added to the vector
521  if (fms_)
523  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
524  usleep(10000);
525  if (setExceptionState_)
526  threadError();
527  }
528  if (fms_)
530 
531  //check if header is at the boundary of two chunks
532  chunkIsFree_ = false;
533  unsigned char* dataPosition;
534 
535  //read header, copy it to a single chunk if necessary
536  bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
537 
538  event_.reset(new FRDEventMsgView(dataPosition));
539  if (event_->size() > eventChunkSize_) {
540  throw cms::Exception("FedRawDataInputSource::getNextEvent")
541  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
542  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
543  << " bytes";
544  }
545 
546  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
547 
548  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
549  throw cms::Exception("FedRawDataInputSource::getNextEvent")
550  << "Premature end of input file while reading event data";
551  }
552 
553  if (chunkEnd) {
554  //header was at the chunk boundary, we will have to move payload as well
555  currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
556  chunkIsFree_ = true;
557  } else {
558  //header was contiguous, but check if payload fits the chunk
559  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
560  //rewind to header start position
562  //copy event to a chunk start and move pointers
563  chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
564  assert(chunkEnd);
565  chunkIsFree_ = true;
566  //header is moved
567  event_.reset(new FRDEventMsgView(dataPosition));
568  } else {
569  //everything is in a single chunk, only move pointers forward
570  chunkEnd = currentFile_->advance(dataPosition, msgSize);
571  assert(!chunkEnd);
572  chunkIsFree_ = false;
573  }
574  }
575  } //end multibuffer mode
576  if (fms_)
578 
579  if (verifyChecksum_ && event_->version() >= 5) {
580  uint32_t crc = 0;
581  crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
582  if (crc != event_->crc32c()) {
583  if (fms_)
585  throw cms::Exception("FedRawDataInputSource::getNextEvent")
586  << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
587  << crc;
588  }
589  } else if (verifyChecksum_ && event_->version() >= 3) {
590  uint32_t adler = adler32(0L, Z_NULL, 0);
591  adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
592 
593  if (adler != event_->adler32()) {
594  if (fms_)
596  throw cms::Exception("FedRawDataInputSource::getNextEvent")
597  << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
598  << adler;
599  }
600  }
601  if (fms_)
603 
604  currentFile_->nProcessed_++;
605 
607 }
608 
610  if (fms_)
612  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
614 
615  if (useL1EventID_) {
618  aux.setProcessHistoryID(processHistoryID_);
619  makeEvent(eventPrincipal, aux);
620  } else if (tcds_pointer_ == nullptr) {
624  aux.setProcessHistoryID(processHistoryID_);
625  makeEvent(eventPrincipal, aux);
626  } else {
627  const FEDHeader fedHeader(tcds_pointer_);
628  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
633  static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
634  processGUID(),
636  aux.setProcessHistoryID(processHistoryID_);
637  makeEvent(eventPrincipal, aux);
638  }
639 
640  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
641 
643 
644  eventsThisLumi_++;
645  if (fms_)
647 
648  //resize vector if needed
649  while (streamFileTracker_.size() <= eventPrincipal.streamID())
650  streamFileTracker_.push_back(-1);
651 
652  streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
653 
654  //this old file check runs no more often than every 10 events
655  if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
656  //delete files that are not in processing
657  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
658  auto it = filesToDelete_.begin();
659  while (it != filesToDelete_.end()) {
660  bool fileIsBeingProcessed = false;
661  for (unsigned int i = 0; i < nStreams_; i++) {
662  if (it->first == streamFileTracker_.at(i)) {
663  fileIsBeingProcessed = true;
664  break;
665  }
666  }
667  if (!fileIsBeingProcessed) {
668  std::string fileToDelete = it->second->fileName_;
669  //it->second.release();
670  it = filesToDelete_.erase(it);
671  } else
672  it++;
673  }
674  }
675  if (chunkIsFree_)
676  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
677  chunkIsFree_ = false;
678  if (fms_)
680  return;
681 }
682 
685  timeval stv;
686  gettimeofday(&stv, nullptr);
687  time = stv.tv_sec;
688  time = (time << 32) + stv.tv_usec;
689  edm::Timestamp tstamp(time);
690 
691  uint32_t eventSize = event_->eventSize();
692  unsigned char* event = (unsigned char*)event_->payload();
693  GTPEventID_ = 0;
694  tcds_pointer_ = nullptr;
695  while (eventSize > 0) {
696  assert(eventSize >= FEDTrailer::length);
697  eventSize -= FEDTrailer::length;
698  const FEDTrailer fedTrailer(event + eventSize);
699  const uint32_t fedSize = fedTrailer.fragmentLength() << 3; //trailer length counts in 8 bytes
700  assert(eventSize >= fedSize - FEDHeader::length);
701  eventSize -= (fedSize - FEDHeader::length);
702  const FEDHeader fedHeader(event + eventSize);
703  const uint16_t fedId = fedHeader.sourceID();
705  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
706  }
708  tcds_pointer_ = event + eventSize;
709  }
711  if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
712  GTPEventID_ = evf::evtn::get(event + eventSize, true);
713  else
714  GTPEventID_ = evf::evtn::get(event + eventSize, false);
715  //evf::evtn::evm_board_setformat(fedSize);
716  const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
717  const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
718  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
719  }
720  //take event ID from GTPE FED
722  if (evf::evtn::gtpe_board_sense(event + eventSize)) {
723  GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
724  }
725  }
726  FEDRawData& fedData = rawData.FEDData(fedId);
727  fedData.resize(fedSize);
728  memcpy(fedData.data(), event + eventSize, fedSize);
729  }
730  assert(eventSize == 0);
731 
732  return tstamp;
733 }
734 
736 
738  bool stop = false;
739  unsigned int currentLumiSection = 0;
740  //threadInit_.exchange(true,std::memory_order_acquire);
741 
742  {
743  std::unique_lock<std::mutex> lk(startupLock_);
744  startupCv_.notify_one();
745  }
746 
747  uint32_t ls = 0;
748  uint32_t monLS = 1;
749  uint32_t lockCount = 0;
750  uint64_t sumLockWaitTimeUs = 0.;
751 
752  while (!stop) {
753  //wait for at least one free thread and chunk
754  int counter = 0;
755  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
757  //report state to monitoring
758  if (fms_) {
759  bool copy_active = false;
760  for (auto j : tid_active_)
761  if (j)
762  copy_active = true;
765  else if (freeChunks_.empty()) {
766  if (copy_active)
768  else
770  } else {
771  if (copy_active)
773  else
775  }
776  }
777  std::unique_lock<std::mutex> lkw(mWakeup_);
778  //sleep until woken up by condition or a timeout
779  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
780  counter++;
781  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
782  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
783  } else {
784  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
785  }
786  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
787  stop = true;
788  break;
789  }
790  }
791  //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
792 
793  if (stop)
794  break;
795 
796  //look for a new file
797  std::string nextFile;
798  uint32_t fileSizeIndex;
799  int64_t fileSizeFromMetadata;
800 
801  if (fms_) {
805  }
806 
808  uint16_t rawHeaderSize = 0;
809  uint32_t lsFromRaw = 0;
810  int32_t serverEventsInNewFile = -1;
811  int rawFd = -1;
812 
813  int backoff_exp = 0;
814 
815  //entering loop which tries to grab new file from ramdisk
817  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
818  stop = true;
819  break;
820  }
821 
822  assert(rawFd == -1);
823  uint64_t thisLockWaitTimeUs = 0.;
824  if (fileListMode_) {
825  //return LS if LS not set, otherwise return file
826  status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
829  rawFd,
830  rawHeaderSize,
831  lsFromRaw,
832  serverEventsInNewFile,
833  fileSizeFromMetadata,
834  false,
835  false,
836  false) != 0) {
837  //error
838  setExceptionState_ = true;
839  stop = true;
840  break;
841  }
842  if (!getLSFromFilename_)
843  ls = lsFromRaw;
844  }
845  } else if (!useFileBroker_)
846  status = daqDirector_->updateFuLock(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
847  else {
848  status = daqDirector_->getNextFromFileBroker(currentLumiSection,
849  ls,
850  nextFile,
851  rawFd,
852  rawHeaderSize,
853  serverEventsInNewFile,
854  fileSizeFromMetadata,
855  thisLockWaitTimeUs);
856  }
857 
858  if (fms_)
860 
861  //cycle through all remaining LS even if no files get assigned
862  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
864 
865  //monitoring of lock wait time
866  if (thisLockWaitTimeUs > 0.)
867  sumLockWaitTimeUs += thisLockWaitTimeUs;
868  lockCount++;
869  if (ls > monLS) {
870  monLS = ls;
871  if (lockCount)
872  if (fms_)
873  fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
874  lockCount = 0;
875  sumLockWaitTimeUs = 0;
876  }
877 
878  //check again for any remaining index/EoLS files after EoR file is seen
880  if (fms_)
882  usleep(100000);
883  //now all files should have appeared in ramdisk, check again if any raw files were left behind
884  status = daqDirector_->updateFuLock(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
885  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
887  }
888 
890  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
891  fileQueue_.push(std::move(inf));
892  stop = true;
893  break;
894  }
895 
896  //error from filelocking function
898  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
899  fileQueue_.push(std::move(inf));
900  stop = true;
901  break;
902  }
903  //queue new lumisection
904  if (getLSFromFilename_) {
905  if (ls > currentLumiSection) {
906  if (!useFileBroker_) {
907  //file locking
908  //fms_->setInStateSup(evf::FastMonitoringThread::inSupNewLumi);
909  currentLumiSection = ls;
910  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
911  fileQueue_.push(std::move(inf));
912  } else {
913  //new file service
914  if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
915  if (ls < 100) {
916  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
917  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
918 
919  for (unsigned int nextLS = lsToStart; nextLS <= ls; nextLS++) {
920  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
921  fileQueue_.push(std::move(inf));
922  }
923  } else {
924  //start from current LS
925  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
926  fileQueue_.push(std::move(inf));
927  }
928  } else {
929  //queue all lumisections after last one seen to avoid gaps
930  for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
931  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
932  fileQueue_.push(std::move(inf));
933  }
934  }
935  currentLumiSection = ls;
936  }
937  }
938  //else
939  if (currentLumiSection > 0 && ls < currentLumiSection) {
940  edm::LogError("FedRawDataInputSource")
941  << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
942  << ". Aborting execution." << std::endl;
943  if (rawFd != -1)
944  close(rawFd);
945  rawFd = -1;
946  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
947  fileQueue_.push(std::move(inf));
948  stop = true;
949  break;
950  }
951  }
952 
953  int dbgcount = 0;
955  if (fms_)
957  dbgcount++;
958  if (!(dbgcount % 20))
959  LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
960  if (!useFileBroker_)
961  usleep(100000);
962  else {
963  backoff_exp = std::min(4, backoff_exp); // max 1.6 seconds
964  //backoff_exp=0; // disabled!
965  int sleeptime = (int)(100000. * pow(2, backoff_exp));
966  usleep(sleeptime);
967  backoff_exp++;
968  }
969  } else
970  backoff_exp = 0;
971  }
972  //end of file grab loop, parse result
974  if (fms_)
976  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
977 
978  std::string rawFile;
979  //file service will report raw extension
980  if (useFileBroker_)
981  rawFile = nextFile;
982  else {
983  boost::filesystem::path rawFilePath(nextFile);
984  rawFile = rawFilePath.replace_extension(".raw").string();
985  }
986 
987  struct stat st;
988  int stat_res = stat(rawFile.c_str(), &st);
989  if (stat_res == -1) {
990  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
991  setExceptionState_ = true;
992  break;
993  }
994  uint64_t fileSize = st.st_size;
995 
996  if (fms_) {
1000  }
1001  int eventsInNewFile;
1002  if (fileListMode_) {
1003  if (fileSize == 0)
1004  eventsInNewFile = 0;
1005  else
1006  eventsInNewFile = -1;
1007  } else {
1009  if (!useFileBroker_)
1010  eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1011  else
1012  eventsInNewFile = serverEventsInNewFile;
1013  assert(eventsInNewFile >= 0);
1014  assert((eventsInNewFile > 0) ==
1015  (fileSize > rawHeaderSize)); //file without events must be empty or contain only header
1016  }
1017 
1018  if (!singleBufferMode_) {
1019  //calculate number of needed chunks
1020  unsigned int neededChunks = fileSize / eventChunkSize_;
1021  if (fileSize % eventChunkSize_)
1022  neededChunks++;
1023 
1024  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1025  ls,
1026  rawFile,
1027  !fileListMode_,
1028  rawFd,
1029  fileSize,
1030  rawHeaderSize,
1031  neededChunks,
1032  eventsInNewFile,
1033  this));
1035  auto newInputFilePtr = newInputFile.get();
1036  fileQueue_.push(std::move(newInputFile));
1037 
1038  for (unsigned int i = 0; i < neededChunks; i++) {
1039  if (fms_) {
1040  bool copy_active = false;
1041  for (auto j : tid_active_)
1042  if (j)
1043  copy_active = true;
1044  if (copy_active)
1046  else
1048  }
1049  //get thread
1050  unsigned int newTid = 0xffffffff;
1051  while (!workerPool_.try_pop(newTid)) {
1052  usleep(100000);
1053  if (quit_threads_.load(std::memory_order_relaxed)) {
1054  stop = true;
1055  break;
1056  }
1057  }
1058 
1059  if (fms_) {
1060  bool copy_active = false;
1061  for (auto j : tid_active_)
1062  if (j)
1063  copy_active = true;
1064  if (copy_active)
1066  else
1068  }
1069  InputChunk* newChunk = nullptr;
1070  while (!freeChunks_.try_pop(newChunk)) {
1071  usleep(100000);
1072  if (quit_threads_.load(std::memory_order_relaxed)) {
1073  stop = true;
1074  break;
1075  }
1076  }
1077 
1078  if (newChunk == nullptr) {
1079  //return unused tid if we received shutdown (nullptr chunk)
1080  if (newTid != 0xffffffff)
1081  workerPool_.push(newTid);
1082  stop = true;
1083  break;
1084  }
1085  if (stop)
1086  break;
1087  if (fms_)
1089 
1090  std::unique_lock<std::mutex> lk(mReader_);
1091 
1092  unsigned int toRead = eventChunkSize_;
1093  if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1094  toRead = fileSize % eventChunkSize_;
1095  newChunk->reset(i * eventChunkSize_, toRead, i);
1096 
1097  workerJob_[newTid].first = newInputFilePtr;
1098  workerJob_[newTid].second = newChunk;
1099 
1100  //wake up the worker thread
1101  cvReader_[newTid]->notify_one();
1102  }
1103  } else {
1104  if (!eventsInNewFile) {
1105  if (rawFd) {
1106  close(rawFd);
1107  rawFd = -1;
1108  }
1109  //still queue file for lumi update
1110  std::unique_lock<std::mutex> lkw(mWakeup_);
1111  //TODO: also file with only file header fits in this edge case. Check if read correctly in single buffer mode
1112  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1113  ls,
1114  rawFile,
1115  !fileListMode_,
1116  rawFd,
1117  fileSize,
1118  rawHeaderSize,
1119  (rawHeaderSize > 0),
1120  0,
1121  this));
1123  fileQueue_.push(std::move(newInputFile));
1124  cvWakeup_.notify_one();
1125  break;
1126  }
1127  //in single-buffer mode put single chunk in the file and let the main thread read the file
1128  InputChunk* newChunk;
1129  //should be available immediately
1130  while (!freeChunks_.try_pop(newChunk)) {
1131  usleep(100000);
1132  if (quit_threads_.load(std::memory_order_relaxed))
1133  break;
1134  }
1135 
1136  std::unique_lock<std::mutex> lkw(mWakeup_);
1137 
1138  unsigned int toRead = eventChunkSize_;
1139  if (fileSize % eventChunkSize_)
1140  toRead = fileSize % eventChunkSize_;
1141  newChunk->reset(0, toRead, 0);
1142  newChunk->readComplete_ = true;
1143 
1144  //push file and wakeup main thread
1145  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1146  ls,
1147  rawFile,
1148  !fileListMode_,
1149  rawFd,
1150  fileSize,
1151  rawHeaderSize,
1152  1,
1153  eventsInNewFile,
1154  this));
1155  newInputFile->chunks_[0] = newChunk;
1157  fileQueue_.push(std::move(newInputFile));
1158  cvWakeup_.notify_one();
1159  }
1160  }
1161  }
1162  if (fms_)
1164  //make sure threads finish reading
1165  unsigned numFinishedThreads = 0;
1166  while (numFinishedThreads < workerThreads_.size()) {
1167  unsigned int tid;
1168  while (!workerPool_.try_pop(tid)) {
1169  usleep(10000);
1170  }
1171  std::unique_lock<std::mutex> lk(mReader_);
1172  thread_quit_signal[tid] = true;
1173  cvReader_[tid]->notify_one();
1174  numFinishedThreads++;
1175  }
1176  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1177  workerThreads_[i]->join();
1178  delete workerThreads_[i];
1179  }
1180 }
1181 
1182 void FedRawDataInputSource::readWorker(unsigned int tid) {
1183  bool init = true;
1184  threadInit_.exchange(true, std::memory_order_acquire);
1185 
1186  while (true) {
1187  tid_active_[tid] = false;
1188  std::unique_lock<std::mutex> lk(mReader_);
1189  workerJob_[tid].first = nullptr;
1190  workerJob_[tid].first = nullptr;
1191 
1192  assert(!thread_quit_signal[tid]); //should never get it here
1193  workerPool_.push(tid);
1194 
1195  if (init) {
1196  std::unique_lock<std::mutex> lk(startupLock_);
1197  init = false;
1198  startupCv_.notify_one();
1199  }
1200  cvReader_[tid]->wait(lk);
1201 
1202  if (thread_quit_signal[tid])
1203  return;
1204  tid_active_[tid] = true;
1205 
1206  InputFile* file;
1207  InputChunk* chunk;
1208 
1209  assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1210 
1211  file = workerJob_[tid].first;
1212  chunk = workerJob_[tid].second;
1213 
1214  //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1215  unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != 0) ? file->rawHeaderSize_ : 0;
1216 
1217  //if only one worker thread exists, use single fd for all operations
1218  //if more worker threads exist, use rawFd_ for only the first read operation and then close file
1219  int fileDescriptor;
1220  bool fileOpenedHere = false;
1221 
1222  if (numConcurrentReads_ == 1) {
1223  fileDescriptor = file->rawFd_;
1224  if (fileDescriptor == -1) {
1225  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1226  fileOpenedHere = true;
1227  file->rawFd_ = fileDescriptor;
1228  }
1229  } else {
1230  if (chunk->offset_ == 0) {
1231  fileDescriptor = file->rawFd_;
1232  file->rawFd_ = -1;
1233  if (fileDescriptor == -1) {
1234  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1235  fileOpenedHere = true;
1236  }
1237  } else {
1238  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1239  fileOpenedHere = true;
1240  }
1241  }
1242 
1243  if (fileDescriptor < 0) {
1244  edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1245  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1246  setExceptionState_ = true;
1247  continue;
1248  }
1249  if (fileOpenedHere) { //fast forward to this chunk position
1250  off_t pos = 0;
1251  pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1252  if (pos == -1) {
1253  edm::LogError("FedRawDataInputSource")
1254  << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1255  << chunk->offset_ << " error: " << strerror(errno);
1256  setExceptionState_ = true;
1257  continue;
1258  }
1259  }
1260 
1261  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1262  << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1263 
1264  unsigned int skipped = bufferLeft;
1266  for (unsigned int i = 0; i < readBlocks_; i++) {
1267  ssize_t last;
1268 
1269  //protect against reading into next block
1270  last = ::read(
1271  fileDescriptor, (void*)(chunk->buf_ + bufferLeft), std::min(chunk->usedSize_ - bufferLeft, eventChunkBlock_));
1272 
1273  if (last < 0) {
1274  edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1275  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1276  setExceptionState_ = true;
1277  break;
1278  }
1279  if (last > 0)
1280  bufferLeft += last;
1281  if (last < eventChunkBlock_) { //last read
1282  //check if this is last block, then total read size must match file size
1283  if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + last)) {
1284  edm::LogError("FedRawDataInputSource")
1285  << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1286  << " expectedChunkSize:" << chunk->usedSize_
1287  << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1288  << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1289  setExceptionState_ = true;
1290  }
1291  break;
1292  }
1293  }
1294  if (setExceptionState_)
1295  continue;
1296 
1298  auto diff = end - start;
1299  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1300  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1301  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1302  << " GB/s)";
1303 
1304  if (chunk->offset_ + bufferLeft == file->fileSize_) { //file reading finished using same fd
1305  close(fileDescriptor);
1306  fileDescriptor = -1;
1307  if (numConcurrentReads_ == 1)
1308  file->rawFd_ = -1;
1309  }
1310  if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1311  close(fileDescriptor);
1312 
1313  //detect FRD event version. Skip file Header if it exists
1314  if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1315  detectedFRDversion_ = *((uint32*)(chunk->buf_ + file->rawHeaderSize_));
1316  }
1318  chunk->readComplete_ =
1319  true; //this is atomic to secure the sequential buffer fill before becoming available for processing)
1320  file->chunks_[chunk->fileIndex_] = chunk; //put the completed chunk in the file chunk vector at predetermined index
1321  }
1322 }
1323 
1325  quit_threads_ = true;
1326  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1327 }
1328 
1329 inline bool InputFile::advance(unsigned char*& dataPosition, const size_t size) {
1330  //wait for chunk
1331  while (!waitForChunk(currentChunk_)) {
1332  usleep(100000);
1333  if (parent_->exceptionState())
1334  parent_->threadError();
1335  }
1336 
1337  dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1338  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1339 
1340  if (currentLeft < size) {
1341  //we need next chunk
1342  while (!waitForChunk(currentChunk_ + 1)) {
1343  usleep(100000);
1344  if (parent_->exceptionState())
1345  parent_->threadError();
1346  }
1347  //copy everything to beginning of the first chunk
1348  dataPosition -= chunkPosition_;
1349  assert(dataPosition == chunks_[currentChunk_]->buf_);
1350  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1351  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1352  //set pointers at the end of the old data position
1353  bufferPosition_ += size;
1354  chunkPosition_ = size - currentLeft;
1355  currentChunk_++;
1356  return true;
1357  } else {
1358  chunkPosition_ += size;
1359  bufferPosition_ += size;
1360  return false;
1361  }
1362 }
1363 
1364 inline void InputFile::moveToPreviousChunk(const size_t size, const size_t offset) {
1365  //this will fail in case of events that are too large
1367  assert(size - offset < chunks_[currentChunk_]->size_);
1368  memcpy(chunks_[currentChunk_ - 1]->buf_ + offset, chunks_[currentChunk_]->buf_ + chunkPosition_, size);
1369  chunkPosition_ += size;
1370  bufferPosition_ += size;
1371 }
1372 
1373 inline void InputFile::rewindChunk(const size_t size) {
1374  chunkPosition_ -= size;
1375  bufferPosition_ -= size;
1376 }
1377 
1379  if (rawFd_ != -1)
1380  close(rawFd_);
1381 
1382  if (deleteFile_ && !fileName_.empty()) {
1384  try {
1385  //sometimes this fails but file gets deleted
1386  LogDebug("FedRawDataInputSource:InputFile") << "Deleting input file -:" << fileName_;
1388  return;
1389  } catch (const boost::filesystem::filesystem_error& ex) {
1390  edm::LogError("FedRawDataInputSource:InputFile")
1391  << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() << ". Trying again.";
1392  } catch (std::exception& ex) {
1393  edm::LogError("FedRawDataInputSource:InputFile")
1394  << " - deleteFile std::exception CAUGHT -: " << ex.what() << ". Trying again.";
1395  }
1397  }
1398 }
1399 
1400 //single-buffer mode file reading
1402  uint32_t existingSize = 0;
1403 
1404  if (fileDescriptor_ < 0) {
1405  bufferInputRead_ = 0;
1406  if (file->rawFd_ == -1)
1407  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1408  else {
1409  fileDescriptor_ = file->rawFd_;
1410  //skip header size in destination buffer (chunk position was already adjusted)
1411  bufferInputRead_ += file->rawHeaderSize_;
1412  existingSize += file->rawHeaderSize_;
1413  }
1414  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1415  if (fileDescriptor_ >= 0)
1416  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1417  else {
1418  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1419  << "failed to open file " << std::endl
1420  << file->fileName_ << " fd:" << fileDescriptor_;
1421  }
1422  }
1423 
1424  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1425  for (unsigned int i = 0; i < readBlocks_; i++) {
1426  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1428  existingSize += last;
1429  }
1430  } else {
1431  const uint32_t chunksize = file->chunkPosition_;
1432  const uint32_t blockcount = chunksize / eventChunkBlock_;
1433  const uint32_t leftsize = chunksize % eventChunkBlock_;
1434  uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1435  memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1436 
1437  for (uint32_t i = 0; i < blockcount; i++) {
1438  const ssize_t last =
1439  ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1441  existingSizeLeft += last;
1442  }
1443  if (leftsize) {
1444  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1446  }
1447  file->chunkPosition_ = 0; //data was moved to beginning of the chunk
1448  }
1449  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1450  if (fileDescriptor_ != -1) {
1451  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1452  close(fileDescriptor_);
1453  file->rawFd_ = fileDescriptor_ = -1;
1454  }
1455  }
1456 }
1457 
1459  std::lock_guard<std::mutex> lock(monlock_);
1460  auto itr = sourceEventsReport_.find(lumi);
1461  if (itr != sourceEventsReport_.end())
1462  itr->second += events;
1463  else
1465 }
1466 
1467 std::pair<bool, unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase) {
1468  std::lock_guard<std::mutex> lock(monlock_);
1469  auto itr = sourceEventsReport_.find(lumi);
1470  if (itr != sourceEventsReport_.end()) {
1471  std::pair<bool, unsigned int> ret(true, itr->second);
1472  if (erase)
1473  sourceEventsReport_.erase(itr);
1474  return ret;
1475  } else
1476  return std::pair<bool, unsigned int>(false, 0);
1477 }
1478 
1480  std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1481  if (a.rfind("/") != std::string::npos)
1482  a = a.substr(a.rfind("/"));
1483  if (b.rfind("/") != std::string::npos)
1484  b = b.substr(b.rfind("/"));
1485  return b > a;
1486  });
1487 
1488  if (!fileNames_.empty()) {
1489  //get run number from first file in the vector
1491  std::string fileStem = fileName.stem().string();
1492  auto end = fileStem.find("_");
1493  if (fileStem.find("run") == 0) {
1494  std::string runStr = fileStem.substr(3, end - 3);
1495  try {
1496  //get long to support test run numbers < 2^32
1497  long rval = boost::lexical_cast<long>(runStr);
1498  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1499  return rval;
1500  } catch (boost::bad_lexical_cast const&) {
1501  edm::LogWarning("FedRawDataInputSource")
1502  << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1503  }
1504  }
1505  }
1506  return -1;
1507 }
1508 
1510  std::string& nextFile,
1511  uint32_t& fsize,
1512  uint64_t& lockWaitTime) {
1513  if (fileListIndex_ < fileNames_.size()) {
1514  nextFile = fileNames_[fileListIndex_];
1515  if (nextFile.find("file://") == 0)
1516  nextFile = nextFile.substr(7);
1517  else if (nextFile.find("file:") == 0)
1518  nextFile = nextFile.substr(5);
1519  boost::filesystem::path fileName = nextFile;
1520  std::string fileStem = fileName.stem().string();
1521  if (fileStem.find("ls"))
1522  fileStem = fileStem.substr(fileStem.find("ls") + 2);
1523  if (fileStem.find("_"))
1524  fileStem = fileStem.substr(0, fileStem.find("_"));
1525 
1526  if (!fileListLoopMode_)
1527  ls = boost::lexical_cast<unsigned int>(fileStem);
1528  else //always starting from LS 1 in loop mode
1529  ls = 1 + loopModeIterationInc_;
1530 
1531  //fsize = 0;
1532  //lockWaitTime = 0;
1533  fileListIndex_++;
1535  } else {
1536  if (!fileListLoopMode_)
1538  else {
1539  //loop through files until interrupted
1541  fileListIndex_ = 0;
1542  return getFile(ls, nextFile, fsize, lockWaitTime);
1543  }
1544  }
1545 }
runTheMatrix.ret
ret
prodAgent to be discontinued
Definition: runTheMatrix.py:355
FedRawDataInputSource::fileDeleteLock_
std::mutex fileDeleteLock_
Definition: FedRawDataInputSource.h:157
FedRawDataInputSource::fillFEDRawDataCollection
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
Definition: FedRawDataInputSource.cc:683
FEDNumbering.h
change_name.diff
diff
Definition: change_name.py:13
evf::EvFDaqDirector::unlockFULocal2
void unlockFULocal2()
Definition: EvFDaqDirector.cc:893
eostools.ls
def ls(path, rec=False)
Definition: eostools.py:349
edm::RunNumber_t
unsigned int RunNumber_t
Definition: RunLumiEventNumber.h:14
counter
Definition: counter.py:1
FedRawDataInputSource::chunkIsFree_
bool chunkIsFree_
Definition: FedRawDataInputSource.h:132
dttmaxenums::L
Definition: DTTMax.h:29
evf::EvFDaqDirector::createBoLSFile
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
Definition: EvFDaqDirector.cc:898
FedRawDataInputSource::threadInit_
std::atomic< bool > threadInit_
Definition: FedRawDataInputSource.h:171
electrons_cff.bool
bool
Definition: electrons_cff.py:372
mps_fire.i
i
Definition: mps_fire.py:355
FedRawDataInputSource::rewind_
void rewind_() override
Definition: FedRawDataInputSource.cc:735
start
Definition: start.py:1
FEDNumbering::MINTriggerEGTPFEDID
Definition: FEDNumbering.h:63
evf::EvFDaqDirector::runEnded
Definition: EvFDaqDirector.h:64
MessageLogger.h
InputFile::parent_
FedRawDataInputSource * parent_
Definition: FedRawDataInputSource.h:202
evf::EvFDaqDirector::getEoLSFilePathOnBU
std::string getEoLSFilePathOnBU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:470
funct::false
false
Definition: Factorize.h:34
evf::FastMonitoringThread::inSupFileLimit
Definition: FastMonitoringThread.h:54
InputFile::fileName_
std::string fileName_
Definition: FedRawDataInputSource.h:205
FedRawDataInputSource::maybeOpenNewLumiSection
void maybeOpenNewLumiSection(const uint32_t lumiSection)
Definition: FedRawDataInputSource.cc:295
FedRawDataInputSource::quit_threads_
std::atomic< bool > quit_threads_
Definition: FedRawDataInputSource.h:148
FedRawDataInputSource::mWakeup_
std::mutex mWakeup_
Definition: FedRawDataInputSource.h:163
FedRawDataInputSource::fileNames_
std::vector< std::string > fileNames_
Definition: FedRawDataInputSource.h:95
FRDEventMsgView
Definition: FRDEventMessage.h:107
FedRawDataInputSource::workerThreads_
std::vector< std::thread * > workerThreads_
Definition: FedRawDataInputSource.h:136
FedRawDataInputSource::fileListIndex_
unsigned int fileListIndex_
Definition: FedRawDataInputSource.h:100
FedRawDataInputSource::threadError
void threadError()
Definition: FedRawDataInputSource.cc:1324
evf::FastMonitoringThread::inSupWaitFreeChunk
Definition: FastMonitoringThread.h:55
FedRawDataInputSource::readingFilesCount_
std::atomic< unsigned int > readingFilesCount_
Definition: FedRawDataInputSource.h:88
mps_update.status
status
Definition: mps_update.py:69
FedRawDataInputSource::useL1EventID_
const bool useL1EventID_
Definition: FedRawDataInputSource.h:94
min
T min(T a, T b)
Definition: MathUtil.h:58
evf::EvFDaqDirector::updateFuLock
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
Definition: EvFDaqDirector.cc:497
FEDRawDataCollection
Definition: FEDRawDataCollection.h:18
evf::EvFDaqDirector::getLumisectionToStart
unsigned int getLumisectionToStart() const
Definition: EvFDaqDirector.cc:1787
edm
HLT enums.
Definition: AlignableModifier.h:19
FedRawDataInputSource::numConcurrentReads_
unsigned int numConcurrentReads_
Definition: FedRawDataInputSource.h:87
evf::evtn::gtpe_board_sense
bool gtpe_board_sense(const unsigned char *p)
Definition: GlobalEventNumber.cc:11
FedRawDataInputSource::nStreams_
unsigned int nStreams_
Definition: FedRawDataInputSource.h:159
pos
Definition: PixelAliasList.h:18
FedRawDataInputSource::getNextEvent
evf::EvFDaqDirector::FileStatus getNextEvent()
Definition: FedRawDataInputSource.cc:341
FedRawDataInputSource::runNumber_
edm::RunNumber_t runNumber_
Definition: FedRawDataInputSource.h:104
FedRawDataInputSource::currentLumiSection_
unsigned int currentLumiSection_
Definition: FedRawDataInputSource.h:114
edm::LogInfo
Definition: MessageLogger.h:254
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
l1tstage2_dqm_sourceclient-live_cfg.rawData
rawData
Definition: l1tstage2_dqm_sourceclient-live_cfg.py:156
evf::FastMonitoringThread::inRunEnd
Definition: FastMonitoringThread.h:41
FedRawDataInputSource::workerPool_
tbb::concurrent_queue< unsigned int > workerPool_
Definition: FedRawDataInputSource.h:138
FedRawDataInputSource::sourceEventsReport_
std::map< unsigned int, unsigned int > sourceEventsReport_
Definition: FedRawDataInputSource.h:173
DeadROCCounter.getRunNumber
def getRunNumber(filename)
Definition: DeadROCCounter.py:7
FedRawDataInputSource::readNextChunkIntoBuffer
void readNextChunkIntoBuffer(InputFile *file)
Definition: FedRawDataInputSource.cc:1401
edm::InputSourceDescription
Definition: InputSourceDescription.h:20
cms::cuda::assert
assert(be >=bs)
FedRawDataInputSource::fms_
evf::FastMonitoringService * fms_
Definition: FedRawDataInputSource.h:77
edm::second
U second(std::pair< T, U > const &p)
Definition: ParameterSet.cc:215
FRDHeaderVersionSize
const uint32 FRDHeaderVersionSize[6]
Definition: FRDEventMessage.h:104
edm::Timestamp::beginOfTime
static Timestamp beginOfTime()
Definition: Timestamp.h:84
InputFile::moveToPreviousChunk
void moveToPreviousChunk(const size_t size, const size_t offset)
Definition: FedRawDataInputSource.cc:1364
FedRawDataInputSource::cvReader_
std::vector< std::condition_variable * > cvReader_
Definition: FedRawDataInputSource.h:145
FEDNumbering::MINTCDSuTCAFEDID
Definition: FEDNumbering.h:101
FedRawDataInputSource::eventChunkSize_
unsigned int eventChunkSize_
Definition: FedRawDataInputSource.h:82
evf::evtn::get
unsigned int get(const unsigned char *, bool)
Definition: GlobalEventNumber.cc:77
evf::FastMonitoringThread::inCachedEvent
Definition: FastMonitoringThread.h:46
FedRawDataInputSource::cvWakeup_
std::condition_variable cvWakeup_
Definition: FedRawDataInputSource.h:164
evf::EvFDaqDirector::sameFile
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::runAbort
Definition: EvFDaqDirector.h:64
FedRawDataInputSource::freeChunks_
tbb::concurrent_queue< InputChunk * > freeChunks_
Definition: FedRawDataInputSource.h:141
FedRawDataInputSource::detectedFRDversion_
uint32 detectedFRDversion_
Definition: FedRawDataInputSource.h:130
MillePedeFileConverter_cfg.fileName
fileName
Definition: MillePedeFileConverter_cfg.py:32
evf::evtn::getgpslow
unsigned int getgpslow(const unsigned char *)
Definition: GlobalEventNumber.cc:92
patZpeak.events
events
Definition: patZpeak.py:20
newFWLiteAna.found
found
Definition: newFWLiteAna.py:118
dqmdumpme.first
first
Definition: dqmdumpme.py:55
evf::FastMonitoringThread::inWaitChunk
Definition: FastMonitoringThread.h:43
evf::FastMonitoringService::setInputSource
void setInputSource(FedRawDataInputSource *inputSource)
Definition: FastMonitoringService.h:178
FedRawDataInputSource::bufferInputRead_
uint32_t bufferInputRead_
Definition: FedRawDataInputSource.h:169
edm::InputSource::processHistoryRegistryForUpdate
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:327
FEDRawData::data
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:24
FedRawDataInputSource::getEventReport
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
Definition: FedRawDataInputSource.cc:1467
InputFile::bufferPosition_
uint32_t bufferPosition_
Definition: FedRawDataInputSource.h:216
InputFile
Definition: FedRawDataInputSource.h:201
InputChunk::offset_
unsigned int offset_
Definition: FedRawDataInputSource.h:183
edm::Wrapper
Definition: Product.h:10
FedRawDataInputSource::workerJob_
std::vector< ReaderInfo > workerJob_
Definition: FedRawDataInputSource.h:139
end
#define end
Definition: vmac.h:39
uint32
unsigned int uint32
Definition: MsgTools.h:13
FEDRawData
Definition: FEDRawData.h:19
InputFile::chunkPosition_
uint32_t chunkPosition_
Definition: FedRawDataInputSource.h:217
Utilities.operator
operator
Definition: Utilities.py:24
evf::FastMonitoringThread::inProcessingFile
Definition: FastMonitoringThread.h:42
FedRawDataInputSource::readBlocks_
unsigned int readBlocks_
Definition: FedRawDataInputSource.h:84
evf::FastMonitoringThread::inSupWaitFreeChunkCopying
Definition: FastMonitoringThread.h:56
AuxiliaryMakers.h
edm::LuminosityBlockAuxiliary
Definition: LuminosityBlockAuxiliary.h:15
fileCollector.now
now
Definition: fileCollector.py:207
FedRawDataInputSource::streamFileTracker_
std::vector< int > streamFileTracker_
Definition: FedRawDataInputSource.h:158
InputChunk::usedSize_
uint32_t usedSize_
Definition: FedRawDataInputSource.h:181
InputChunk::buf_
unsigned char * buf_
Definition: FedRawDataInputSource.h:178
InputChunk::reset
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
Definition: FedRawDataInputSource.h:191
FedRawDataInputSource::eventRunNumber_
uint32_t eventRunNumber_
Definition: FedRawDataInputSource.h:115
evf::EvFDaqDirector::getEoLSFilePathOnFU
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:474
dqmdumpme.last
last
Definition: dqmdumpme.py:56
FedRawDataInputSource::InputFile
friend struct InputFile
Definition: FedRawDataInputSource.h:37
edm::EventPrincipal
Definition: EventPrincipal.h:46
hgcalPlots.stat
stat
Definition: hgcalPlots.py:1111
edm::ConfigurationDescriptions::add
void add(std::string const &label, ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:57
evf::EvFDaqDirector::isSingleStreamThread
bool isSingleStreamThread()
Definition: EvFDaqDirector.h:117
rval
unsigned long long int rval
Definition: vlib.h:21
evf::FastMonitoringThread::inSupWaitFreeThread
Definition: FastMonitoringThread.h:57
evf::FastMonitoringThread::inWaitInput
Definition: FastMonitoringThread.h:37
evf::EvFDaqDirector::parseFRDFileHeader
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
Definition: EvFDaqDirector.cc:926
TCDSRaw.h
FedRawDataInputSource::filesToDelete_
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
Definition: FedRawDataInputSource.h:155
EventID.h
FedRawDataInputSource::getLSFromFilename_
const bool getLSFromFilename_
Definition: FedRawDataInputSource.h:91
FedRawDataInputSource::ReaderInfo
std::pair< InputFile *, InputChunk * > ReaderInfo
Definition: FedRawDataInputSource.h:128
InputChunk
Definition: FedRawDataInputSource.h:177
FedRawDataInputSource::FedRawDataInputSource
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
Definition: FedRawDataInputSource.cc:53
evf::FastMonitoringThread::inReadCleanup
Definition: FastMonitoringThread.h:48
FedRawDataInputSource::fileListLoopMode_
const bool fileListLoopMode_
Definition: FedRawDataInputSource.h:101
edm::InputSource::luminosityBlockAuxiliary
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:242
InputFile::currentChunk_
unsigned int currentChunk_
Definition: FedRawDataInputSource.h:218
FedRawDataInputSource::daqDirector_
evf::EvFDaqDirector * daqDirector_
Definition: FedRawDataInputSource.h:78
Service
tcds::Raw_v1
Definition: TCDSRaw.h:106
svgfig.load
def load(fileName)
Definition: svgfig.py:547
evf::evtn::makeEventAuxiliary
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection)
Definition: AuxiliaryMakers.cc:9
dt4ml_dqm_sourceclient-live_cfg.filePath
filePath
CUSTOMIZE FOR ML.
Definition: dt4ml_dqm_sourceclient-live_cfg.py:39
evf::EvFDaqDirector::noFile
Definition: EvFDaqDirector.h:64
FedRawDataInputSource::fileQueue_
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
Definition: FedRawDataInputSource.h:142
evf::EvFDaqDirector::useFileBroker
bool useFileBroker() const
Definition: EvFDaqDirector.h:79
evf::FastMonitoringThread::inChecksumEvent
Definition: FastMonitoringThread.h:45
InputChunk::readComplete_
std::atomic< bool > readComplete_
Definition: FedRawDataInputSource.h:185
b
double b
Definition: hdecay.h:118
edm::EventPrincipal::put
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
Definition: EventPrincipal.cc:178
FedRawDataInputSource::reportEventsThisLumiInSource
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
Definition: FedRawDataInputSource.cc:1458
evf::FastMonitoringThread::inNewLumi
Definition: FastMonitoringThread.h:38
FedRawDataInputSource::processHistoryID_
edm::ProcessHistoryID processHistoryID_
Definition: FedRawDataInputSource.h:112
InputSourceDescription.h
InputFile::~InputFile
~InputFile()
Definition: FedRawDataInputSource.cc:1378
evf::FastMonitoringThread::inSupNewFileWaitThreadCopying
Definition: FastMonitoringThread.h:64
edm::EventAuxiliary
Definition: EventAuxiliary.h:14
edm::ConfigurationDescriptions
Definition: ConfigurationDescriptions.h:28
FedRawDataInputSource::startedSupervisorThread_
bool startedSupervisorThread_
Definition: FedRawDataInputSource.h:134
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
FedRawDataInputSource::exceptionState
bool exceptionState()
Definition: FedRawDataInputSource.h:62
FedRawDataInputSource::fillDescriptions
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
Definition: FedRawDataInputSource.cc:196
edm::LogWarning
Definition: MessageLogger.h:141
FedRawDataInputSource::daqProvenanceHelper_
const edm::DaqProvenanceHelper daqProvenanceHelper_
Definition: FedRawDataInputSource.h:107
crc32c
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
edm::InputSource::runAuxiliary
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:239
FedRawDataInputSource::initFileList
long initFileList()
Definition: FedRawDataInputSource.cc:1479
FedRawDataInputSource::L1EventID_
uint32_t L1EventID_
Definition: FedRawDataInputSource.h:117
UnixSignalHandlers.h
cppFunctionSkipper.exception
exception
Definition: cppFunctionSkipper.py:10
funct::true
true
Definition: Factorize.h:173
edm::ParameterSetDescription::addUntracked
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
Definition: ParameterSetDescription.h:100
InputChunk::fileIndex_
unsigned int fileIndex_
Definition: FedRawDataInputSource.h:184
reader.h
LogDebug
#define LogDebug(id)
Definition: MessageLogger.h:670
edm::InputSource::setRunAuxiliary
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:329
edm::ParameterSet
Definition: ParameterSet.h:36
edm::ParameterSetDescription::setComment
void setComment(std::string const &value)
Definition: ParameterSetDescription.cc:33
FedRawDataInputSource::currentFileIndex_
int currentFileIndex_
Definition: FedRawDataInputSource.h:154
edm::LogError
Definition: MessageLogger.h:183
FEDTrailer
Definition: FEDTrailer.h:14
a
double a
Definition: hdecay.h:119
Timestamp.h
FedRawDataInputSource::alwaysStartFromFirstLS_
const bool alwaysStartFromFirstLS_
Definition: FedRawDataInputSource.h:92
FFFNamingSchema.h
Event.h
edm::InputSource::productRegistryUpdate
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:326
FedRawDataInputSource::eventChunkBlock_
unsigned int eventChunkBlock_
Definition: FedRawDataInputSource.h:83
FEDHeader::triggerType
uint8_t triggerType() const
Event Trigger type identifier.
Definition: FEDHeader.cc:13
CommonMethods.lock
def lock()
Definition: CommonMethods.py:82
edm::DaqProvenanceHelper::daqInit
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
Definition: DaqProvenanceHelper.cc:83
evf::EvFDaqDirector::getNextFromFileBroker
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
Definition: EvFDaqDirector.cc:1635
FedRawDataInputSource::verifyChecksum_
const bool verifyChecksum_
Definition: FedRawDataInputSource.h:93
mps_check.timeout
int timeout
Definition: mps_check.py:53
SiStripCommissioningSource_FromEDM_cfg.EvFDaqDirector
EvFDaqDirector
Definition: SiStripCommissioningSource_FromEDM_cfg.py:57
evf::EvFDaqDirector::grabNextJsonFileAndUnlock
int grabNextJsonFileAndUnlock(boost::filesystem::path const &jsonSourcePath)
Definition: EvFDaqDirector.cc:1283
edm::shutdown_flag
volatile std::atomic< bool > shutdown_flag
Definition: UnixSignalHandlers.cc:22
FedRawDataInputSource::checkNext
Next checkNext() override
Definition: FedRawDataInputSource.cc:219
evf::FastMonitoringService::setInStateSup
void setInStateSup(FastMonitoringThread::InputState inputState)
Definition: FastMonitoringService.h:180
evf::FastMonitoringThread::inChunkReceived
Definition: FastMonitoringThread.h:44
printConversionInfo.aux
aux
Definition: printConversionInfo.py:19
edm::Service
Definition: Service.h:30
createfilelist.int
int
Definition: createfilelist.py:10
FedRawDataInputSource::currentFile_
std::unique_ptr< InputFile > currentFile_
Definition: FedRawDataInputSource.h:131
evf::FastMonitoringThread::inSupNewFileWaitChunk
Definition: FastMonitoringThread.h:67
FedRawDataInputSource::setExceptionState_
bool setExceptionState_
Definition: FedRawDataInputSource.h:150
FrontierConditions_GlobalTag_cff.file
file
Definition: FrontierConditions_GlobalTag_cff.py:13
FastMonitoringService.h
edm::InputSource::luminosityBlock
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:442
FedRawDataInputSource::eventID_
edm::EventID eventID_
Definition: FedRawDataInputSource.h:111
edm::RawInputSource::Next::kStop
FEDRawDataCollection.h
evf::FastMonitoringThread::inSupNewFile
Definition: FastMonitoringThread.h:63
FedRawDataInputSource::mReader_
std::mutex mReader_
Definition: FedRawDataInputSource.h:144
FedRawDataInputSource::eventsThisLumi_
unsigned int eventsThisLumi_
Definition: FedRawDataInputSource.h:119
crc32c.h
FedRawDataInputSource::~FedRawDataInputSource
~FedRawDataInputSource() override
Definition: FedRawDataInputSource.cc:164
FEDHeader::length
static const uint32_t length
Definition: FEDHeader.h:54
evf::EvFDaqDirector::getEoRFilePathOnFU
std::string getEoRFilePathOnFU() const
Definition: EvFDaqDirector.cc:484
edm::EventPrincipal::streamID
StreamID streamID() const
Definition: EventPrincipal.h:106
edm::ParameterSetDescription::setAllowAnything
void setAllowAnything()
allow any parameter label/value pairs
Definition: ParameterSetDescription.cc:37
l1tstage2_dqm_sourceclient-live_cfg.fedId
fedId
Definition: l1tstage2_dqm_sourceclient-live_cfg.py:82
evf::EvFDaqDirector::newLumi
Definition: EvFDaqDirector.h:64
itr
std::vector< std::pair< float, float > >::iterator itr
Definition: HGCDigitizer.cc:28
evf::FastMonitoringThread::inSupNewFileWaitThread
Definition: FastMonitoringThread.h:65
FedRawDataInputSource::getFile
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
Definition: FedRawDataInputSource.cc:1509
InputFile::rawFd_
int rawFd_
Definition: FedRawDataInputSource.h:207
evf::FastMonitoringThread::inSupLockPolling
Definition: FastMonitoringThread.h:60
FedRawDataInputSource::startupLock_
std::mutex startupLock_
Definition: FedRawDataInputSource.h:151
visDQMUpload.buf
buf
Definition: visDQMUpload.py:154
tcds
Definition: TCDSRaw.h:16
FedRawDataInputSource.h
evf::FastMonitoringService::reportLockWait
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
Definition: FastMonitoringService.cc:702
evf::evtn::evm_board_sense
bool evm_board_sense(const unsigned char *p, size_t size)
Definition: GlobalEventNumber.cc:15
FedRawDataInputSource::thread_quit_signal
std::vector< unsigned int > thread_quit_signal
Definition: FedRawDataInputSource.h:149
InputSourceMacros.h
InputFile::advance
bool advance(unsigned char *&dataPosition, const size_t size)
Definition: FedRawDataInputSource.cc:1329
FedRawDataInputSource::useFileBroker_
bool useFileBroker_
Definition: FedRawDataInputSource.h:96
edm::InputSource::setLuminosityBlockAuxiliary
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:333
DataPointDefinition.h
evf::FastMonitoringThread::inInit
Definition: FastMonitoringThread.h:36
edm::RawInputSource::Next
Next
Definition: RawInputSource.h:24
edm::Timestamp::invalidTimestamp
static Timestamp invalidTimestamp()
Definition: Timestamp.h:82
edm::EventAuxiliary::PhysicsTrigger
Definition: EventAuxiliary.h:20
FedRawDataInputSource::readSupervisor
void readSupervisor()
Definition: FedRawDataInputSource.cc:737
eostools.move
def move(src, dest)
Definition: eostools.py:511
std
Definition: JetResolutionObject.h:76
evf::EvFDaqDirector::createProcessingNotificationMaybe
void createProcessingNotificationMaybe() const
Definition: EvFDaqDirector.cc:1942
crc32c_hw_test
bool crc32c_hw_test()
Definition: crc32c.cc:354
dqmiodatasetharvest.inf
inf
Definition: dqmiodatasetharvest.py:38
init
Definition: init.py:1
InputFile::chunks_
tbb::concurrent_vector< InputChunk * > chunks_
Definition: FedRawDataInputSource.h:214
evf::FastMonitoringThread::inNoRequest
Definition: FastMonitoringThread.h:49
edm::RawInputSource::makeEvent
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
Definition: RawInputSource.cc:51
edm::DaqProvenanceHelper::branchDescription
BranchDescription const & branchDescription() const
Definition: DaqProvenanceHelper.h:46
FedRawDataInputSource::event_
std::unique_ptr< FRDEventMsgView > event_
Definition: FedRawDataInputSource.h:109
edm::InputSource::setNewRun
void setNewRun()
Definition: InputSource.h:351
FEDTrailer::length
static const uint32_t length
Definition: FEDTrailer.h:57
FedRawDataInputSource::nextEvent
evf::EvFDaqDirector::FileStatus nextEvent()
Definition: FedRawDataInputSource.cc:332
relativeConstraints.empty
bool empty
Definition: relativeConstraints.py:46
GlobalEventNumber.h
EventAuxiliary.h
FedRawDataInputSource::tid_active_
std::vector< unsigned int > tid_active_
Definition: FedRawDataInputSource.h:146
Exception
Definition: hltDiff.cc:246
evf::FastMonitoringThread::inSupBusy
Definition: FastMonitoringThread.h:59
FedRawDataInputSource::checkEvery_
unsigned int checkEvery_
Definition: FedRawDataInputSource.h:160
FEDRawData::resize
void resize(size_t newsize)
Definition: FEDRawData.cc:28
evf::EvFDaqDirector::setFMS
void setFMS(evf::FastMonitoringService *fms)
Definition: EvFDaqDirector.h:116
MatrixUtil.remove
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:212
FedRawDataInputSource::fileDescriptor_
int fileDescriptor_
Definition: FedRawDataInputSource.h:168
evf
Definition: fillJson.h:27
edm::InputSource::setEventCached
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:358
evf::FastMonitoringThread::inReadEvent
Definition: FastMonitoringThread.h:47
evf::FastMonitoringService::startedLookingForFile
void startedLookingForFile()
Definition: FastMonitoringService.cc:666
evf::FastMonitoringThread::inSupNoFile
Definition: FastMonitoringThread.h:62
evf::FastMonitoringService::setExceptionDetected
void setExceptionDetected(unsigned int ls)
Definition: FastMonitoringService.cc:392
edm::InputSource::resetLuminosityBlockAuxiliary
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:341
FEDHeader::sourceID
uint16_t sourceID() const
Identifier of the FED.
Definition: FEDHeader.cc:19
FedRawDataInputSource::startupCv_
std::condition_variable startupCv_
Definition: FedRawDataInputSource.h:152
evf::FastMonitoringThread::inSupNewFileWaitChunkCopying
Definition: FastMonitoringThread.h:66
FedRawDataInputSource::tcds_pointer_
unsigned char * tcds_pointer_
Definition: FedRawDataInputSource.h:118
cond::uint64_t
unsigned long long uint64_t
Definition: Time.h:13
funct::pow
Power< A, B >::type pow(const A &a, const B &b)
Definition: Power.h:30
FedRawDataInputSource::fileListMode_
const bool fileListMode_
Definition: FedRawDataInputSource.h:99
cms::Exception
Definition: Exception.h:70
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
FedRawDataInputSource::InputChunk
friend struct InputChunk
Definition: FedRawDataInputSource.h:38
FedRawDataInputSource::readSupervisorThread_
std::unique_ptr< std::thread > readSupervisorThread_
Definition: FedRawDataInputSource.h:135
command_line.start
start
Definition: command_line.py:167
ParameterSet.h
evf::FastMonitoringService::setInState
void setInState(FastMonitoringThread::InputState inputState)
Definition: FastMonitoringService.h:179
edm::InputSource::processGUID
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:204
dqmiolumiharvest.j
j
Definition: dqmiolumiharvest.py:66
runEdmFileComparison.skipped
skipped
Definition: runEdmFileComparison.py:225
edm::RawInputSource::Next::kEvent
ntuplemaker.time
time
Definition: ntuplemaker.py:310
evf::evtn::gtpe_get
unsigned int gtpe_get(const unsigned char *)
Definition: GlobalEventNumber.cc:83
event
Definition: event.py:1
edm::EventID
Definition: EventID.h:31
evf::FastMonitoringService::stoppedLookingForFile
void stoppedLookingForFile(unsigned int lumi)
Definition: FastMonitoringService.cc:674
hltrates_dqm_sourceclient-live_cfg.offset
offset
Definition: hltrates_dqm_sourceclient-live_cfg.py:78
FEDHeader
Definition: FEDHeader.h:14
evf::FastMonitoringThread::inSupWaitFreeThreadCopying
Definition: FastMonitoringThread.h:58
InputFile::waitForChunk
bool waitForChunk(unsigned int chunkid)
Definition: FedRawDataInputSource.h:248
FEDHeader.h
lumi
Definition: LumiSectionData.h:20
InputFile::rewindChunk
void rewindChunk(const size_t size)
Definition: FedRawDataInputSource.cc:1373
evf::EvFDaqDirector::FileStatus
FileStatus
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::lockFULocal2
void lockFULocal2()
Definition: EvFDaqDirector.cc:888
FedRawDataInputSource::monlock_
std::mutex monlock_
Definition: FedRawDataInputSource.h:174
InputFile::deleteFile_
bool deleteFile_
Definition: FedRawDataInputSource.h:206
FedRawDataInputSource::read
void read(edm::EventPrincipal &eventPrincipal) override
Definition: FedRawDataInputSource.cc:609
sistrip::runNumber_
static const char runNumber_[]
Definition: ConstantsForDqm.h:33
FedRawDataInputSource::numBuffers_
unsigned int numBuffers_
Definition: FedRawDataInputSource.h:85
evf::evtn::getgpshigh
unsigned int getgpshigh(const unsigned char *)
Definition: GlobalEventNumber.cc:95
FedRawDataInputSource::maxBufferedFiles_
unsigned int maxBufferedFiles_
Definition: FedRawDataInputSource.h:86
edm::TimeValue_t
unsigned long long TimeValue_t
Definition: Timestamp.h:28
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
FedRawDataInputSource::singleBufferMode_
bool singleBufferMode_
Definition: FedRawDataInputSource.h:167
FedRawDataInputSource::readWorker
void readWorker(unsigned int tid)
Definition: FedRawDataInputSource.cc:1182
FedRawDataInputSource::loopModeIterationInc_
unsigned int loopModeIterationInc_
Definition: FedRawDataInputSource.h:102
FEDNumbering::MAXFEDID
Definition: FEDNumbering.h:26
evf::EvFDaqDirector::newFile
Definition: EvFDaqDirector.h:64
edm::RunAuxiliary
Definition: RunAuxiliary.h:15
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
edm::InputSource::run
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:437
FEDTrailer::fragmentLength
uint32_t fragmentLength() const
The length of the event fragment counted in 64-bit words including header and trailer.
Definition: FEDTrailer.cc:13
FedRawDataInputSource::GTPEventID_
uint32_t GTPEventID_
Definition: FedRawDataInputSource.h:116
evf::EvFDaqDirector::setDeleteTracking
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
Definition: EvFDaqDirector.h:168
FEDNumbering::MINTriggerGTPFEDID
Definition: FEDNumbering.h:61
edm::DaqProvenanceHelper::dummyProvenance
ProductProvenance const & dummyProvenance() const
Definition: DaqProvenanceHelper.h:48
FEDTrailer.h
edm::Timestamp
Definition: Timestamp.h:30