CMS 3D CMS Logo

FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <memory>
5 #include <sstream>
6 #include <sys/types.h>
7 #include <sys/file.h>
8 #include <sys/time.h>
9 #include <unistd.h>
10 #include <vector>
11 #include <fstream>
12 #include <zlib.h>
13 #include <cstdio>
14 #include <chrono>
15 
16 #include <boost/algorithm/string.hpp>
17 
21 
23 
30 
32 
34 
38 
40 
45 
46 //JSON file reader
48 
49 using namespace evf::FastMonState;
50 
52  : edm::RawInputSource(pset, desc),
53  defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
54  eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
55  eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
56  numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
57  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
58  getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
59  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
60  verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
61  useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
62  testTCDSFEDRange_(
63  pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())),
64  fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
65  fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
66  fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
67  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
68  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
69  eventID_(),
70  processHistoryID_(),
71  currentLumiSection_(0),
72  tcds_pointer_(nullptr),
73  eventsThisLumi_(0) {
74  char thishost[256];
75  gethostname(thishost, 255);
76  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
77  << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
78 
79  if (!testTCDSFEDRange_.empty()) {
80  if (testTCDSFEDRange_.size() != 2) {
81  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
82  << "Invalid TCDS Test FED range parameter";
83  }
86  }
87 
88  long autoRunNumber = -1;
89  if (fileListMode_) {
90  autoRunNumber = initFileList();
91  if (!fileListLoopMode_) {
92  if (autoRunNumber < 0)
93  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
94  //override run number
95  runNumber_ = (edm::RunNumber_t)autoRunNumber;
96  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
97  }
98  }
99 
101  setNewRun();
102  //todo:autodetect from file name (assert if names differ)
104 
105  //make sure that chunk size is N * block size
110 
111  if (!numBuffers_)
112  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
113  << "no reading enabled with numBuffers parameter 0";
114 
117  readingFilesCount_ = 0;
118 
119  if (!crc32c_hw_test())
120  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
121 
122  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
123  if (fileListMode_) {
124  try {
126  } catch (cms::Exception const&) {
127  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
128  }
129  } else {
131  if (!fms_) {
132  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
133  }
134  }
135 
137  if (!daqDirector_)
138  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
139 
141  if (useFileBroker_)
142  edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
143  //set DaqDirector to delete files in preGlobalEndLumi callback
145  if (fms_) {
147  fms_->setInputSource(this);
150  }
151  //should delete chunks when run stops
152  for (unsigned int i = 0; i < numBuffers_; i++) {
154  }
155 
156  quit_threads_ = false;
157 
158  //prepare data shared by threads
159  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
160  thread_quit_signal.push_back(false);
161  workerJob_.push_back(ReaderInfo(nullptr, nullptr));
162  cvReader_.push_back(std::make_unique<std::condition_variable>());
163  tid_active_.push_back(0);
164  }
165 
166  //start threads
167  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
168  //wait for each thread to complete initialization
169  std::unique_lock<std::mutex> lk(startupLock_);
170  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
171  startupCv_.wait(lk);
172  }
173 
174  runAuxiliary()->setProcessHistoryID(processHistoryID_);
175 }
176 
178  quit_threads_ = true;
179 
180  //delete any remaining open files
181  if (!fms_ || !fms_->exceptionDetected()) {
182  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
183  it->second.reset();
184  } else {
185  //skip deleting files with exception
186  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
187  //it->second->unsetDeleteFile();
188  if (fms_->isExceptionOnData(it->second->lumi_))
189  it->second->unsetDeleteFile();
190  else
191  it->second.reset();
192  }
193  //disable deleting current file with exception
194  if (currentFile_.get())
195  if (fms_->isExceptionOnData(currentFile_->lumi_))
196  currentFile_->unsetDeleteFile();
197  }
198 
200  readSupervisorThread_->join();
201  } else {
202  //join aux threads in case the supervisor thread was not started
203  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
204  std::unique_lock<std::mutex> lk(mReader_);
205  thread_quit_signal[i] = true;
206  cvReader_[i]->notify_one();
207  lk.unlock();
208  workerThreads_[i]->join();
209  delete workerThreads_[i];
210  }
211  }
212 }
213 
216  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
217  desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
218  desc.addUntracked<unsigned int>("eventChunkBlock", 32)
219  ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
220  desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
221  desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
222  ->setComment("Maximum number of simultaneously buffered raw files");
223  desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
224  ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
225  desc.addUntracked<bool>("verifyChecksum", true)
226  ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
227  desc.addUntracked<bool>("useL1EventID", false)
228  ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
229  desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
230  ->setComment("[min, max] range to search for TCDS FED ID in test setup");
231  desc.addUntracked<bool>("fileListMode", false)
232  ->setComment("Use fileNames parameter to directly specify raw files to open");
233  desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
234  ->setComment("file list used when fileListMode is enabled");
235  desc.setAllowAnything();
236  descriptions.add("source", desc);
237 }
238 
241  //this thread opens new files and dispatches reading to worker readers
242  std::unique_lock<std::mutex> lk(startupLock_);
243  readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
245  startupCv_.wait(lk);
246  }
247  //signal hltd to start event accounting
248  if (!currentLumiSection_)
251  switch (nextEvent()) {
253  //maybe create EoL file in working directory before ending run
254  struct stat buf;
255  if (!useFileBroker_ && currentLumiSection_ > 0) {
256  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
257  if (eolFound) {
259  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
260  if (!found) {
262  int eol_fd =
263  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
264  close(eol_fd);
266  }
267  }
268  }
269  //also create EoR file in FU data directory
270  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
271  if (!eorFound) {
272  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
273  O_RDWR | O_CREAT,
274  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
275  close(eor_fd);
276  }
278  eventsThisLumi_ = 0;
280  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
281  return Next::kStop;
282  }
284  //this is not reachable
285  return Next::kEvent;
286  }
288  //std::cout << "--------------NEW LUMI---------------" << std::endl;
289  return Next::kEvent;
290  }
291  default: {
292  if (!getLSFromFilename_) {
293  //get new lumi from file header
294  if (event_->lumi() > currentLumiSection_) {
296  eventsThisLumi_ = 0;
298  }
299  }
302  else
303  eventRunNumber_ = event_->run();
304  L1EventID_ = event_->event();
305 
306  setEventCached();
307 
308  return Next::kEvent;
309  }
310  }
311 }
312 
313 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
314  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
315  if (!useFileBroker_) {
316  if (currentLumiSection_ > 0) {
318  struct stat buf;
319  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
320  if (!found) {
322  int eol_fd =
323  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
324  close(eol_fd);
325  daqDirector_->createBoLSFile(lumiSection, false);
327  }
328  } else
329  daqDirector_->createBoLSFile(lumiSection, true); //needed for initial lumisection
330  }
331 
332  currentLumiSection_ = lumiSection;
333 
335 
336  timeval tv;
337  gettimeofday(&tv, nullptr);
338  const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
339 
341  runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
342 
343  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
344  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
345 
346  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
347  }
348 }
349 
353  if (edm::shutdown_flag.load(std::memory_order_relaxed))
354  break;
355  }
356  return status;
357 }
358 
360  if (setExceptionState_)
361  threadError();
362  if (!currentFile_.get()) {
365  {
366  IdleSourceSentry ids(fms_);
367  if (!fileQueue_.try_pop(currentFile_)) {
368  //sleep until wakeup (only in single-buffer mode) or timeout
369  std::unique_lock<std::mutex> lkw(mWakeup_);
370  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
372  }
373  }
374  status = currentFile_->status_;
377  currentFile_.reset();
378  return status;
379  } else if (status == evf::EvFDaqDirector::runAbort) {
380  throw cms::Exception("FedRawDataInputSource::getNextEvent")
381  << "Run has been aborted by the input source reader thread";
382  } else if (status == evf::EvFDaqDirector::newLumi) {
384  if (getLSFromFilename_) {
385  if (currentFile_->lumi_ > currentLumiSection_) {
387  eventsThisLumi_ = 0;
389  }
390  } else { //let this be picked up from next event
392  }
393  currentFile_.reset();
394  return status;
395  } else if (status == evf::EvFDaqDirector::newFile) {
397  } else
398  assert(false);
399  }
401 
402  //file is empty
403  if (!currentFile_->fileSize_) {
405  //try to open new lumi
406  assert(currentFile_->nChunks_ == 0);
407  if (getLSFromFilename_)
408  if (currentFile_->lumi_ > currentLumiSection_) {
410  eventsThisLumi_ = 0;
412  }
413  //immediately delete empty file
414  currentFile_.reset();
416  }
417 
418  //file is finished
419  if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
421  //release last chunk (it is never released elsewhere)
422  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
423  if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
424  throw cms::Exception("FedRawDataInputSource::getNextEvent")
425  << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
426  << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
427  }
428  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
429  if (singleBufferMode_) {
430  std::unique_lock<std::mutex> lkw(mWakeup_);
431  cvWakeup_.notify_one();
432  }
433  bufferInputRead_ = 0;
435  //put the file in pending delete list;
436  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
437  filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
438  } else {
439  //in single-thread and stream jobs, events are already processed
440  currentFile_.reset();
441  }
443  }
444 
445  //handle RAW file header
446  if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
447  if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
448  if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
449  throw cms::Exception("FedRawDataInputSource::getNextEvent")
450  << "Premature end of input file while reading file header";
451 
452  edm::LogWarning("FedRawDataInputSource")
453  << "File with only raw header and no events received in LS " << currentFile_->lumi_;
454  if (getLSFromFilename_)
455  if (currentFile_->lumi_ > currentLumiSection_) {
457  eventsThisLumi_ = 0;
459  }
460  }
461 
462  //advance buffer position to skip file header (chunk will be acquired later)
463  currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
464  currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
465  }
466 
467  //file is too short
468  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
469  throw cms::Exception("FedRawDataInputSource::getNextEvent")
470  << "Premature end of input file while reading event header";
471  }
472  if (singleBufferMode_) {
473  //should already be there
475  {
476  IdleSourceSentry ids(fms_);
477  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
478  usleep(10000);
479  if (currentFile_->parent_->exceptionState() || setExceptionState_)
480  currentFile_->parent_->threadError();
481  }
482  }
484 
485  unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
486 
487  //conditions when read amount is not sufficient for the header to fit
491 
492  if (detectedFRDversion_ == 0) {
493  detectedFRDversion_ = *((uint16_t*)dataPosition);
495  throw cms::Exception("FedRawDataInputSource::getNextEvent")
496  << "Unknown FRD version -: " << detectedFRDversion_;
498  }
499 
500  //recalculate chunk position
501  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
503  throw cms::Exception("FedRawDataInputSource::getNextEvent")
504  << "Premature end of input file while reading event header";
505  }
506  }
507 
508  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
509  if (event_->size() > eventChunkSize_) {
510  throw cms::Exception("FedRawDataInputSource::getNextEvent")
511  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
512  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
513  << " bytes";
514  }
515 
516  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
517 
518  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
519  throw cms::Exception("FedRawDataInputSource::getNextEvent")
520  << "Premature end of input file while reading event data";
521  }
522  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
524  //recalculate chunk position
525  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
526  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
527  }
528  currentFile_->bufferPosition_ += event_->size();
529  currentFile_->chunkPosition_ += event_->size();
530  //last chunk is released when this function is invoked next time
531 
532  }
533  //multibuffer mode:
534  else {
535  //wait for the current chunk to become added to the vector
537  {
538  IdleSourceSentry ids(fms_);
539  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
540  usleep(10000);
541  if (setExceptionState_)
542  threadError();
543  }
544  }
546 
547  //check if header is at the boundary of two chunks
548  chunkIsFree_ = false;
549  unsigned char* dataPosition;
550 
551  //read header, copy it to a single chunk if necessary
553  throw cms::Exception("FedRawDataInputSource::getNextEvent")
554  << "Premature end of input file (missing:"
556  << ") while reading event data for next event header";
557  bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
558 
559  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
560  if (event_->size() > eventChunkSize_) {
561  throw cms::Exception("FedRawDataInputSource::getNextEvent")
562  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
563  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
564  << " bytes";
565  }
566 
567  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
568 
569  if (currentFile_->fileSizeLeft() < msgSize) {
570  throw cms::Exception("FedRawDataInputSource::getNextEvent")
571  << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
572  << ") while reading event data for event " << event_->event() << " lumi:" << event_->lumi();
573  }
574 
575  if (chunkEnd) {
576  //header was at the chunk boundary, we will have to move payload as well
577  currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
578  chunkIsFree_ = true;
579  } else {
580  //header was contiguous, but check if payload fits the chunk
581  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
582  //rewind to header start position
584  //copy event to a chunk start and move pointers
585 
587  {
588  IdleSourceSentry ids(fms_);
589  chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
590  }
592 
593  assert(chunkEnd);
594  chunkIsFree_ = true;
595  //header is moved
596  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
597  } else {
598  //everything is in a single chunk, only move pointers forward
599  chunkEnd = currentFile_->advance(dataPosition, msgSize);
600  assert(!chunkEnd);
601  chunkIsFree_ = false;
602  }
603  }
604  //sanity-check check that the buffer position has not exceeded file size after preparing event
605  if (currentFile_->fileSize_ < currentFile_->bufferPosition_) {
606  throw cms::Exception("FedRawDataInputSource::getNextEvent")
607  << "Exceeded file size by " << currentFile_->bufferPosition_ - currentFile_->fileSize_
608  << " after reading last event declared size of " << event_->size() << " bytes";
609  }
610  } //end multibuffer mode
612 
613  if (verifyChecksum_ && event_->version() >= 5) {
614  uint32_t crc = 0;
615  crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
616  if (crc != event_->crc32c()) {
617  if (fms_)
619  throw cms::Exception("FedRawDataInputSource::getNextEvent")
620  << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
621  << crc;
622  }
623  } else if (verifyChecksum_ && event_->version() >= 3) {
624  uint32_t adler = adler32(0L, Z_NULL, 0);
625  adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
626 
627  if (adler != event_->adler32()) {
628  if (fms_)
630  throw cms::Exception("FedRawDataInputSource::getNextEvent")
631  << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
632  << adler;
633  }
634  }
636 
637  currentFile_->nProcessed_++;
638 
640 }
641 
644  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
645  bool tcdsInRange;
646  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);
647 
648  if (useL1EventID_) {
651  aux.setProcessHistoryID(processHistoryID_);
652  makeEvent(eventPrincipal, aux);
653  } else if (tcds_pointer_ == nullptr) {
654  if (!GTPEventID_) {
655  throw cms::Exception("FedRawDataInputSource::read")
656  << "No TCDS or GTP FED in event with FEDHeader EID -: " << L1EventID_;
657  }
660  aux.setProcessHistoryID(processHistoryID_);
661  makeEvent(eventPrincipal, aux);
662  } else {
663  const FEDHeader fedHeader(tcds_pointer_);
664  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
669  event_->isRealData(),
670  static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
671  processGUID(),
673  !tcdsInRange);
674  aux.setProcessHistoryID(processHistoryID_);
675  makeEvent(eventPrincipal, aux);
676  }
677 
678  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
679 
681 
682  eventsThisLumi_++;
684 
685  //resize vector if needed
686  while (streamFileTracker_.size() <= eventPrincipal.streamID())
687  streamFileTracker_.push_back(-1);
688 
689  streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
690 
691  //this old file check runs no more often than every 10 events
692  if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
693  //delete files that are not in processing
694  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
695  auto it = filesToDelete_.begin();
696  while (it != filesToDelete_.end()) {
697  bool fileIsBeingProcessed = false;
698  for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
699  if (it->first == streamFileTracker_.at(i)) {
700  fileIsBeingProcessed = true;
701  break;
702  }
703  }
704  if (!fileIsBeingProcessed && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
705  std::string fileToDelete = it->second->fileName_;
706  it = filesToDelete_.erase(it);
707  } else
708  it++;
709  }
710  }
711  if (chunkIsFree_)
712  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
713  chunkIsFree_ = false;
715  return;
716 }
717 
720  timeval stv;
721  gettimeofday(&stv, nullptr);
722  time = stv.tv_sec;
723  time = (time << 32) + stv.tv_usec;
724  edm::Timestamp tstamp(time);
725 
726  uint32_t eventSize = event_->eventSize();
727  unsigned char* event = (unsigned char*)event_->payload();
728  GTPEventID_ = 0;
729  tcds_pointer_ = nullptr;
730  tcdsInRange = false;
731  uint16_t selectedTCDSFed = 0;
732  while (eventSize > 0) {
733  assert(eventSize >= FEDTrailer::length);
734  eventSize -= FEDTrailer::length;
735  const FEDTrailer fedTrailer(event + eventSize);
736  const uint32_t fedSize = fedTrailer.fragmentLength() << 3; //trailer length counts in 8 bytes
737  assert(eventSize >= fedSize - FEDHeader::length);
738  eventSize -= (fedSize - FEDHeader::length);
739  const FEDHeader fedHeader(event + eventSize);
740  const uint16_t fedId = fedHeader.sourceID();
742  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
743  } else if (fedId >= MINTCDSuTCAFEDID_ && fedId <= MAXTCDSuTCAFEDID_) {
744  if (!selectedTCDSFed) {
745  selectedTCDSFed = fedId;
746  tcds_pointer_ = event + eventSize;
748  tcdsInRange = true;
749  }
750  } else
751  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
752  << "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
753  }
755  if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
756  GTPEventID_ = evf::evtn::get(event + eventSize, true);
757  else
758  GTPEventID_ = evf::evtn::get(event + eventSize, false);
759  //evf::evtn::evm_board_setformat(fedSize);
760  const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
761  const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
762  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
763  }
764  //take event ID from GTPE FED
766  if (evf::evtn::gtpe_board_sense(event + eventSize)) {
767  GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
768  }
769  }
770  FEDRawData& fedData = rawData.FEDData(fedId);
771  fedData.resize(fedSize);
772  memcpy(fedData.data(), event + eventSize, fedSize);
773  }
774  assert(eventSize == 0);
775 
776  return tstamp;
777 }
778 
780 
782  bool stop = false;
783  unsigned int currentLumiSection = 0;
784 
785  {
786  std::unique_lock<std::mutex> lk(startupLock_);
787  startupCv_.notify_one();
788  }
789 
790  uint32_t ls = 0;
791  uint32_t monLS = 1;
792  uint32_t lockCount = 0;
793  uint64_t sumLockWaitTimeUs = 0.;
794 
795  while (!stop) {
796  //wait for at least one free thread and chunk
797  int counter = 0;
798 
799  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
801  //report state to monitoring
802  if (fms_) {
803  bool copy_active = false;
804  for (auto j : tid_active_)
805  if (j)
806  copy_active = true;
809  else if (freeChunks_.empty()) {
810  if (copy_active)
812  else
814  } else {
815  if (copy_active)
817  else
819  }
820  }
821  std::unique_lock<std::mutex> lkw(mWakeup_);
822  //sleep until woken up by condition or a timeout
823  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
824  counter++;
825  if (!(counter % 6000)) {
826  edm::LogWarning("FedRawDataInputSource")
827  << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
828  << ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
829  << " / " << maxBufferedFiles_;
830  }
831  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
832  } else {
833  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
834  }
835  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
836  stop = true;
837  break;
838  }
839  }
840  //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
841 
842  if (stop)
843  break;
844 
845  //look for a new file
846  std::string nextFile;
847  uint32_t fileSizeIndex;
848  int64_t fileSizeFromMetadata;
849 
850  if (fms_) {
853  }
854 
856  uint16_t rawHeaderSize = 0;
857  uint32_t lsFromRaw = 0;
858  int32_t serverEventsInNewFile = -1;
859  int rawFd = -1;
860 
861  int backoff_exp = 0;
862 
863  //entering loop which tries to grab new file from ramdisk
865  //check if hltd has signalled to throttle input
866  counter = 0;
867  while (daqDirector_->inputThrottled()) {
868  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
869  break;
870 
871  unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
872  unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
873  unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
874  bool hasDiscardedLumi = false;
875  for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
877  edm::LogWarning("FedRawDataInputSource") << "Source detected that the lumisection is discarded -: " << i;
878  hasDiscardedLumi = true;
879  break;
880  }
881  }
882  if (hasDiscardedLumi)
883  break;
884 
886 
887  if (!(counter % 50))
888  edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
889  usleep(100000);
890  counter++;
891  }
892 
893  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
894  stop = true;
895  break;
896  }
897 
898  assert(rawFd == -1);
899  uint64_t thisLockWaitTimeUs = 0.;
901  if (fileListMode_) {
902  //return LS if LS not set, otherwise return file
903  status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
905  uint16_t rawDataType;
907  rawFd,
908  rawHeaderSize,
909  rawDataType,
910  lsFromRaw,
911  serverEventsInNewFile,
912  fileSizeFromMetadata,
913  false,
914  false,
915  false) != 0) {
916  //error
917  setExceptionState_ = true;
918  stop = true;
919  break;
920  }
921  if (!getLSFromFilename_)
922  ls = lsFromRaw;
923  }
924  } else if (!useFileBroker_)
926  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
927  else {
928  status = daqDirector_->getNextFromFileBroker(currentLumiSection,
929  ls,
930  nextFile,
931  rawFd,
932  rawHeaderSize,
933  serverEventsInNewFile,
934  fileSizeFromMetadata,
935  thisLockWaitTimeUs);
936  }
937 
939 
940  //cycle through all remaining LS even if no files get assigned
941  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
943 
944  //monitoring of lock wait time
945  if (thisLockWaitTimeUs > 0.)
946  sumLockWaitTimeUs += thisLockWaitTimeUs;
947  lockCount++;
948  if (ls > monLS) {
949  monLS = ls;
950  if (lockCount)
951  if (fms_)
952  fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
953  lockCount = 0;
954  sumLockWaitTimeUs = 0;
955  }
956 
957  //check again for any remaining index/EoLS files after EoR file is seen
960  usleep(100000);
961  //now all files should have appeared in ramdisk, check again if any raw files were left behind
963  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
964  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
966  }
967 
969  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
970  fileQueue_.push(std::move(inf));
971  stop = true;
972  break;
973  }
974 
975  //error from filelocking function
977  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
978  fileQueue_.push(std::move(inf));
979  stop = true;
980  break;
981  }
982  //queue new lumisection
983  if (getLSFromFilename_) {
984  if (ls > currentLumiSection) {
985  if (!useFileBroker_) {
986  //file locking
987  //setMonStateSup(inSupNewLumi);
988  currentLumiSection = ls;
989  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
990  fileQueue_.push(std::move(inf));
991  } else {
992  //new file service
993  if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
995  //start transitions from LS specified by env, continue if not reached
996  if (ls < daqDirector_->getStartLumisectionFromEnv()) {
997  //skip file if from earlier LS than specified by env
998  if (rawFd != -1) {
999  close(rawFd);
1000  rawFd = -1;
1001  }
1003  continue;
1004  } else {
1005  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1006  fileQueue_.push(std::move(inf));
1007  }
1008  } else if (ls < 100) {
1009  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
1010  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
1011 
1012  for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
1013  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1014  fileQueue_.push(std::move(inf));
1015  }
1016  } else {
1017  //start from current LS
1018  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1019  fileQueue_.push(std::move(inf));
1020  }
1021  } else {
1022  //queue all lumisections after last one seen to avoid gaps
1023  for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
1024  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1025  fileQueue_.push(std::move(inf));
1026  }
1027  }
1028  currentLumiSection = ls;
1029  }
1030  }
1031  //else
1032  if (currentLumiSection > 0 && ls < currentLumiSection) {
1033  edm::LogError("FedRawDataInputSource")
1034  << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
1035  << ". Aborting execution." << std::endl;
1036  if (rawFd != -1)
1037  close(rawFd);
1038  rawFd = -1;
1039  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
1040  fileQueue_.push(std::move(inf));
1041  stop = true;
1042  break;
1043  }
1044  }
1045 
1046  int dbgcount = 0;
1049  dbgcount++;
1050  if (!(dbgcount % 20))
1051  LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1052  if (!useFileBroker_)
1053  usleep(100000);
1054  else {
1055  backoff_exp = std::min(4, backoff_exp); // max 1.6 seconds
1056  //backoff_exp=0; // disabled!
1057  int sleeptime = (int)(100000. * pow(2, backoff_exp));
1058  usleep(sleeptime);
1059  backoff_exp++;
1060  }
1061  } else
1062  backoff_exp = 0;
1063  }
1064  //end of file grab loop, parse result
1067  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1068 
1069  std::string rawFile;
1070  //file service will report raw extension
1071  if (useFileBroker_ || rawHeaderSize)
1072  rawFile = nextFile;
1073  else {
1074  std::filesystem::path rawFilePath(nextFile);
1075  rawFile = rawFilePath.replace_extension(".raw").string();
1076  }
1077 
1078  struct stat st;
1079  int stat_res = stat(rawFile.c_str(), &st);
1080  if (stat_res == -1) {
1081  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
1082  setExceptionState_ = true;
1083  break;
1084  }
1085  uint64_t fileSize = st.st_size;
1086 
1087  if (fms_) {
1091  }
1092  int eventsInNewFile;
1093  if (fileListMode_) {
1094  if (fileSize == 0)
1095  eventsInNewFile = 0;
1096  else
1097  eventsInNewFile = -1;
1098  } else {
1100  if (!useFileBroker_) {
1101  if (rawHeaderSize) {
1102  int rawFdEmpty = -1;
1103  uint16_t rawHeaderCheck;
1104  bool fileFound;
1105  eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
1106  nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
1107  assert(fileFound && rawHeaderCheck == rawHeaderSize);
1109  } else
1110  eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1111  } else
1112  eventsInNewFile = serverEventsInNewFile;
1113  assert(eventsInNewFile >= 0);
1114  assert((eventsInNewFile > 0) ==
1115  (fileSize > rawHeaderSize)); //file without events must be empty or contain only header
1116  }
1117 
1118  if (!singleBufferMode_) {
1119  //calculate number of needed chunks
1120  unsigned int neededChunks = fileSize / eventChunkSize_;
1121  if (fileSize % eventChunkSize_)
1122  neededChunks++;
1123 
1124  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1125  ls,
1126  rawFile,
1127  !fileListMode_,
1128  rawFd,
1129  fileSize,
1130  rawHeaderSize,
1131  neededChunks,
1132  eventsInNewFile,
1133  this));
1135  auto newInputFilePtr = newInputFile.get();
1136  fileQueue_.push(std::move(newInputFile));
1137 
1138  for (unsigned int i = 0; i < neededChunks; i++) {
1139  if (fms_) {
1140  bool copy_active = false;
1141  for (auto j : tid_active_)
1142  if (j)
1143  copy_active = true;
1144  if (copy_active)
1146  else
1148  }
1149  //get thread
1150  unsigned int newTid = 0xffffffff;
1151  while (!workerPool_.try_pop(newTid)) {
1152  usleep(100000);
1153  if (quit_threads_.load(std::memory_order_relaxed)) {
1154  stop = true;
1155  break;
1156  }
1157  }
1158 
1159  if (fms_) {
1160  bool copy_active = false;
1161  for (auto j : tid_active_)
1162  if (j)
1163  copy_active = true;
1164  if (copy_active)
1166  else
1168  }
1169  InputChunk* newChunk = nullptr;
1170  while (!freeChunks_.try_pop(newChunk)) {
1171  usleep(100000);
1172  if (quit_threads_.load(std::memory_order_relaxed)) {
1173  stop = true;
1174  break;
1175  }
1176  }
1177 
1178  if (newChunk == nullptr) {
1179  //return unused tid if we received shutdown (nullptr chunk)
1180  if (newTid != 0xffffffff)
1181  workerPool_.push(newTid);
1182  stop = true;
1183  break;
1184  }
1185  if (stop)
1186  break;
1188 
1189  std::unique_lock<std::mutex> lk(mReader_);
1190 
1191  unsigned int toRead = eventChunkSize_;
1192  if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1193  toRead = fileSize % eventChunkSize_;
1194  newChunk->reset(i * eventChunkSize_, toRead, i);
1195 
1196  workerJob_[newTid].first = newInputFilePtr;
1197  workerJob_[newTid].second = newChunk;
1198 
1199  //wake up the worker thread
1200  cvReader_[newTid]->notify_one();
1201  }
1202  } else {
1203  if (!eventsInNewFile) {
1204  if (rawFd) {
1205  close(rawFd);
1206  rawFd = -1;
1207  }
1208  //still queue file for lumi update
1209  std::unique_lock<std::mutex> lkw(mWakeup_);
1210  //TODO: also file with only file header fits in this edge case. Check if read correctly in single buffer mode
1211  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1212  ls,
1213  rawFile,
1214  !fileListMode_,
1215  rawFd,
1216  fileSize,
1217  rawHeaderSize,
1218  (rawHeaderSize > 0),
1219  0,
1220  this));
1222  fileQueue_.push(std::move(newInputFile));
1223  cvWakeup_.notify_one();
1224  break;
1225  }
1226  //in single-buffer mode put single chunk in the file and let the main thread read the file
1227  InputChunk* newChunk = nullptr;
1228  //should be available immediately
1229  while (!freeChunks_.try_pop(newChunk)) {
1230  usleep(100000);
1231  if (quit_threads_.load(std::memory_order_relaxed)) {
1232  stop = true;
1233  break;
1234  }
1235  }
1236 
1237  if (newChunk == nullptr) {
1238  stop = true;
1239  }
1240 
1241  if (stop)
1242  break;
1243 
1244  std::unique_lock<std::mutex> lkw(mWakeup_);
1245 
1246  unsigned int toRead = eventChunkSize_;
1247  if (fileSize % eventChunkSize_)
1248  toRead = fileSize % eventChunkSize_;
1249  newChunk->reset(0, toRead, 0);
1250  newChunk->readComplete_ = true;
1251 
1252  //push file and wakeup main thread
1253  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1254  ls,
1255  rawFile,
1256  !fileListMode_,
1257  rawFd,
1258  fileSize,
1259  rawHeaderSize,
1260  1,
1261  eventsInNewFile,
1262  this));
1263  newInputFile->chunks_[0] = newChunk;
1265  fileQueue_.push(std::move(newInputFile));
1266  cvWakeup_.notify_one();
1267  }
1268  }
1269  }
1271  //make sure threads finish reading
1272  unsigned numFinishedThreads = 0;
1273  while (numFinishedThreads < workerThreads_.size()) {
1274  unsigned tid = 0;
1275  while (!workerPool_.try_pop(tid)) {
1276  usleep(10000);
1277  }
1278  std::unique_lock<std::mutex> lk(mReader_);
1279  thread_quit_signal[tid] = true;
1280  cvReader_[tid]->notify_one();
1281  numFinishedThreads++;
1282  }
1283  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1284  workerThreads_[i]->join();
1285  delete workerThreads_[i];
1286  }
1287 }
1288 
1289 void FedRawDataInputSource::readWorker(unsigned int tid) {
1290  bool init = true;
1291 
1292  while (true) {
1293  tid_active_[tid] = false;
1294  std::unique_lock<std::mutex> lk(mReader_);
1295  workerJob_[tid].first = nullptr;
1296  workerJob_[tid].first = nullptr;
1297 
1298  assert(!thread_quit_signal[tid]); //should never get it here
1299  workerPool_.push(tid);
1300 
1301  if (init) {
1302  std::unique_lock<std::mutex> lk(startupLock_);
1303  init = false;
1304  startupCv_.notify_one();
1305  }
1306  cvReader_[tid]->wait(lk);
1307 
1308  if (thread_quit_signal[tid])
1309  return;
1310  tid_active_[tid] = true;
1311 
1312  InputFile* file;
1313  InputChunk* chunk;
1314 
1315  assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1316 
1317  file = workerJob_[tid].first;
1318  chunk = workerJob_[tid].second;
1319 
1320  //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1321  unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1322 
1323  //if only one worker thread exists, use single fd for all operations
1324  //if more worker threads exist, use rawFd_ for only the first read operation and then close file
1325  int fileDescriptor;
1326  bool fileOpenedHere = false;
1327 
1328  if (numConcurrentReads_ == 1) {
1329  fileDescriptor = file->rawFd_;
1330  if (fileDescriptor == -1) {
1331  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1332  fileOpenedHere = true;
1333  file->rawFd_ = fileDescriptor;
1334  }
1335  } else {
1336  if (chunk->offset_ == 0) {
1337  fileDescriptor = file->rawFd_;
1338  file->rawFd_ = -1;
1339  if (fileDescriptor == -1) {
1340  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1341  fileOpenedHere = true;
1342  }
1343  } else {
1344  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1345  fileOpenedHere = true;
1346  }
1347  }
1348 
1349  if (fileDescriptor < 0) {
1350  edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1351  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1352  setExceptionState_ = true;
1353  continue;
1354  }
1355  if (fileOpenedHere) { //fast forward to this chunk position
1356  off_t pos = 0;
1357  pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1358  if (pos == -1) {
1359  edm::LogError("FedRawDataInputSource")
1360  << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1361  << chunk->offset_ << " error: " << strerror(errno);
1362  setExceptionState_ = true;
1363  continue;
1364  }
1365  }
1366 
1367  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1368  << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1369 
1370  unsigned int skipped = bufferLeft;
1372  for (unsigned int i = 0; i < readBlocks_; i++) {
1373  ssize_t last;
1374 
1375  //protect against reading into next block
1376  last = ::read(fileDescriptor,
1377  (void*)(chunk->buf_ + bufferLeft),
1378  std::min(chunk->usedSize_ - bufferLeft, (uint64_t)eventChunkBlock_));
1379 
1380  if (last < 0) {
1381  edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1382  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1383  setExceptionState_ = true;
1384  break;
1385  }
1386  if (last > 0)
1387  bufferLeft += last;
1388  if (last < eventChunkBlock_) { //last read
1389  //check if this is last block, then total read size must match file size
1390  if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (unsigned int)last)) {
1391  edm::LogError("FedRawDataInputSource")
1392  << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1393  << " expectedChunkSize:" << chunk->usedSize_
1394  << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1395  << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1396  setExceptionState_ = true;
1397  }
1398  break;
1399  }
1400  }
1401  if (setExceptionState_)
1402  continue;
1403 
1405  auto diff = end - start;
1406  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1407  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1408  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1409  << " GB/s)";
1410 
1411  if (chunk->offset_ + bufferLeft == file->fileSize_) { //file reading finished using same fd
1412  close(fileDescriptor);
1413  fileDescriptor = -1;
1414  if (numConcurrentReads_ == 1)
1415  file->rawFd_ = -1;
1416  }
1417  if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1418  close(fileDescriptor);
1419 
1420  //detect FRD event version. Skip file Header if it exists
1421  if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1422  detectedFRDversion_ = *((uint16_t*)(chunk->buf_ + file->rawHeaderSize_));
1423  }
1425  chunk->readComplete_ =
1426  true; //this is atomic to secure the sequential buffer fill before becoming available for processing)
1427  file->chunks_[chunk->fileIndex_] = chunk; //put the completed chunk in the file chunk vector at predetermined index
1428  }
1429 }
1430 
1432  quit_threads_ = true;
1433  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1434 }
1435 
1437  if (fms_)
1438  fms_->setInState(state);
1439 }
1440 
1442  if (fms_)
1444 }
1445 
1446 inline bool InputFile::advance(unsigned char*& dataPosition, const size_t size) {
1447  //wait for chunk
1448 
1449  while (!waitForChunk(currentChunk_)) {
1451  usleep(100000);
1453  if (parent_->exceptionState())
1454  parent_->threadError();
1455  }
1456 
1457  dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1458  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1459 
1460  if (currentLeft < size) {
1461  //we need next chunk
1462  assert(chunks_.size() > currentChunk_ + 1);
1463  while (!waitForChunk(currentChunk_ + 1)) {
1465  usleep(100000);
1467  if (parent_->exceptionState())
1468  parent_->threadError();
1469  }
1470  //copy everything to beginning of the first chunk
1471  dataPosition -= chunkPosition_;
1472  assert(dataPosition == chunks_[currentChunk_]->buf_);
1473  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1474  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1475  //set pointers at the end of the old data position
1476  bufferPosition_ += size;
1477  chunkPosition_ = size - currentLeft;
1478  currentChunk_++;
1479  return true;
1480  } else {
1481  chunkPosition_ += size;
1482  bufferPosition_ += size;
1483  return false;
1484  }
1485 }
1486 
1487 void InputFile::moveToPreviousChunk(const size_t size, const size_t offset) {
1488  //this will fail in case of events that are too large
1490  assert(size - offset < chunks_[currentChunk_]->size_);
1491  memcpy(chunks_[currentChunk_ - 1]->buf_ + offset, chunks_[currentChunk_]->buf_ + chunkPosition_, size);
1492  chunkPosition_ += size;
1493  bufferPosition_ += size;
1494 }
1495 
1496 void InputFile::rewindChunk(const size_t size) {
1497  chunkPosition_ -= size;
1498  bufferPosition_ -= size;
1499 }
1500 
1502  if (rawFd_ != -1)
1503  close(rawFd_);
1504 
1505  if (deleteFile_) {
1506  for (auto& fileName : fileNames_) {
1507  if (!fileName.empty()) {
1509  try {
1510  //sometimes this fails but file gets deleted
1511  LogDebug("FedRawDataInputSource:InputFile") << "Deleting input file -:" << fileName;
1513  continue;
1514  } catch (const std::filesystem::filesystem_error& ex) {
1515  edm::LogError("FedRawDataInputSource:InputFile")
1516  << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() << ". Trying again.";
1517  } catch (std::exception& ex) {
1518  edm::LogError("FedRawDataInputSource:InputFile")
1519  << " - deleteFile std::exception CAUGHT -: " << ex.what() << ". Trying again.";
1520  }
1522  }
1523  }
1524  }
1525 }
1526 
1527 //single-buffer mode file reading
1529  uint32_t existingSize = 0;
1530 
1531  if (fileDescriptor_ < 0) {
1532  bufferInputRead_ = 0;
1533  if (file->rawFd_ == -1) {
1534  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1535  if (file->rawHeaderSize_)
1536  lseek(fileDescriptor_, file->rawHeaderSize_, SEEK_SET);
1537  } else
1538  fileDescriptor_ = file->rawFd_;
1539 
1540  //skip header size in destination buffer (chunk position was already adjusted)
1541  bufferInputRead_ += file->rawHeaderSize_;
1542  existingSize += file->rawHeaderSize_;
1543 
1544  if (fileDescriptor_ >= 0)
1545  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1546  else {
1547  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1548  << "failed to open file " << std::endl
1549  << file->fileName_ << " fd:" << fileDescriptor_;
1550  }
1551  //fill chunk (skipping file header if present)
1552  for (unsigned int i = 0; i < readBlocks_; i++) {
1553  const ssize_t last = ::read(fileDescriptor_,
1554  (void*)(file->chunks_[0]->buf_ + existingSize),
1555  eventChunkBlock_ - (i == readBlocks_ - 1 ? existingSize : 0));
1557  existingSize += last;
1558  }
1559 
1560  } else {
1561  //continue reading
1562  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1563  for (unsigned int i = 0; i < readBlocks_; i++) {
1564  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1566  existingSize += last;
1567  }
1568  } else {
1569  //event didn't fit in last chunk, so leftover must be moved to the beginning and completed
1570  uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1571  memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1572 
1573  //calculate amount of data that can be added
1574  const uint32_t blockcount = file->chunkPosition_ / eventChunkBlock_;
1575  const uint32_t leftsize = file->chunkPosition_ % eventChunkBlock_;
1576 
1577  for (uint32_t i = 0; i < blockcount; i++) {
1578  const ssize_t last =
1579  ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1581  existingSizeLeft += last;
1582  }
1583  if (leftsize) {
1584  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1586  }
1587  file->chunkPosition_ = 0; //data was moved to beginning of the chunk
1588  }
1589  }
1590  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1591  if (fileDescriptor_ != -1) {
1592  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1593  close(fileDescriptor_);
1594  file->rawFd_ = fileDescriptor_ = -1;
1595  }
1596  }
1597 }
1598 
1600  std::lock_guard<std::mutex> lock(monlock_);
1601  auto itr = sourceEventsReport_.find(lumi);
1602  if (itr != sourceEventsReport_.end())
1603  itr->second += events;
1604  else
1606 }
1607 
1608 std::pair<bool, unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase) {
1609  std::lock_guard<std::mutex> lock(monlock_);
1610  auto itr = sourceEventsReport_.find(lumi);
1611  if (itr != sourceEventsReport_.end()) {
1612  std::pair<bool, unsigned int> ret(true, itr->second);
1613  if (erase)
1614  sourceEventsReport_.erase(itr);
1615  return ret;
1616  } else
1617  return std::pair<bool, unsigned int>(false, 0);
1618 }
1619 
1621  std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1622  if (a.rfind('/') != std::string::npos)
1623  a = a.substr(a.rfind('/'));
1624  if (b.rfind('/') != std::string::npos)
1625  b = b.substr(b.rfind('/'));
1626  return b > a;
1627  });
1628 
1629  if (!fileNames_.empty()) {
1630  //get run number from first file in the vector
1632  std::string fileStem = fileName.stem().string();
1633  if (fileStem.find("file://") == 0)
1634  fileStem = fileStem.substr(7);
1635  else if (fileStem.find("file:") == 0)
1636  fileStem = fileStem.substr(5);
1637  auto end = fileStem.find('_');
1638 
1639  if (fileStem.find("run") == 0) {
1640  std::string runStr = fileStem.substr(3, end - 3);
1641  try {
1642  //get long to support test run numbers < 2^32
1643  long rval = std::stol(runStr);
1644  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1645  return rval;
1646  } catch (const std::exception&) {
1647  edm::LogWarning("FedRawDataInputSource")
1648  << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1649  }
1650  }
1651  }
1652  return -1;
1653 }
1654 
1656  std::string& nextFile,
1657  uint32_t& fsize,
1658  uint64_t& lockWaitTime) {
1659  if (fileListIndex_ < fileNames_.size()) {
1660  nextFile = fileNames_[fileListIndex_];
1661  if (nextFile.find("file://") == 0)
1662  nextFile = nextFile.substr(7);
1663  else if (nextFile.find("file:") == 0)
1664  nextFile = nextFile.substr(5);
1665  std::filesystem::path fileName = nextFile;
1666  std::string fileStem = fileName.stem().string();
1667  if (fileStem.find("ls"))
1668  fileStem = fileStem.substr(fileStem.find("ls") + 2);
1669  if (fileStem.find('_'))
1670  fileStem = fileStem.substr(0, fileStem.find('_'));
1671 
1672  if (!fileListLoopMode_)
1673  ls = std::stoul(fileStem);
1674  else //always starting from LS 1 in loop mode
1675  ls = 1 + loopModeIterationInc_;
1676 
1677  //fsize = 0;
1678  //lockWaitTime = 0;
1679  fileListIndex_++;
1681  } else {
1682  if (!fileListLoopMode_)
1684  else {
1685  //loop through files until interrupted
1687  fileListIndex_ = 0;
1688  return getFile(ls, nextFile, fsize, lockWaitTime);
1689  }
1690  }
1691 }
size
Write out results.
static const char runNumber_[]
uint8_t triggerType() const
Event Trigger type identifier.
Definition: FEDHeader.cc:13
Definition: start.py:1
Definition: fillJson.h:27
std::vector< std::string > fileNames_
unsigned int getgpshigh(const unsigned char *)
std::condition_variable cvWakeup_
void read(edm::EventPrincipal &eventPrincipal) override
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:261
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:359
bool gtpe_board_sense(const unsigned char *p)
constexpr size_t FRDHeaderMaxVersion
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
bool crc32c_hw_test()
Definition: crc32c.cc:354
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
static const uint32_t length
Definition: FEDTrailer.h:57
tbb::concurrent_queue< unsigned int > workerPool_
unsigned int get(const unsigned char *, bool)
bool useFileBroker() const
void maybeOpenNewLumiSection(const uint32_t lumiSection)
std::vector< std::string > fileNames_
static const uint32_t length
Definition: FEDHeader.h:54
ret
prodAgent to be discontinued
uint16_t sourceID() const
Identifier of the FED.
Definition: FEDHeader.cc:19
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
FedRawDataInputSource * parent_
volatile std::atomic< bool > shutdown_flag
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:458
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
bool advance(unsigned char *&dataPosition, const size_t size)
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
bool lumisectionDiscarded(unsigned int ls)
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &rawData, bool &tcdsInRange)
std::pair< InputFile *, InputChunk * > ReaderInfo
std::unique_ptr< std::thread > readSupervisorThread_
Log< level::Error, false > LogError
unsigned int numConcurrentLumis() const
StreamID streamID() const
int timeout
Definition: mps_check.py:53
assert(be >=bs)
bool isExceptionOnData(unsigned int ls)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool waitForChunk(unsigned int chunkid)
std::string getEoRFilePathOnFU() const
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, bool isRealData, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection, bool suppressWarning)
uint32_t fragmentLength() const
The length of the event fragment counted in 64-bit words including header and trailer.
Definition: FEDTrailer.cc:13
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
U second(std::pair< T, U > const &p)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string getEoLSFilePathOnBU(const unsigned int ls) const
static Timestamp beginOfTime()
Definition: Timestamp.h:77
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
const std::vector< unsigned int > testTCDSFEDRange_
Definition: TCDSRaw.h:16
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:226
void rewindChunk(const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:366
std::vector< int > streamFileTracker_
void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex)
unsigned char * buf_
ProductProvenance const & dummyProvenance() const
void setMonStateSup(evf::FastMonState::InputState state)
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::map< unsigned int, unsigned int > sourceEventsReport_
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
unsigned long long TimeValue_t
Definition: Timestamp.h:21
std::vector< std::thread * > workerThreads_
bool evm_board_sense(const unsigned char *p, size_t size)
void setMonState(evf::FastMonState::InputState state)
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:391
void createProcessingNotificationMaybe() const
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
std::atomic< bool > readComplete_
Definition: init.py:1
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
def ls(path, rec=False)
Definition: eostools.py:349
BranchDescription const & branchDescription() const
tbb::concurrent_vector< InputChunk * > chunks_
unsigned int getStartLumisectionFromEnv() const
def getRunNumber(filename)
void setInStateSup(FastMonState::InputState inputState)
unsigned long long uint64_t
Definition: Time.h:13
std::unique_ptr< FRDEventMsgView > event_
def load(fileName)
Definition: svgfig.py:547
void resize(size_t newsize, size_t wordsize=8)
Definition: FEDRawData.cc:28
uint32_t chunkPosition_
double b
Definition: hdecay.h:120
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:223
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:374
void stoppedLookingForFile(unsigned int lumi)
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:360
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:463
HLT enums.
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:362
ItemTypeInfo state() const
Definition: InputSource.h:361
void readWorker(unsigned int tid)
double a
Definition: hdecay.h:121
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:264
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
std::vector< unsigned int > thread_quit_signal
unsigned int getLumisectionToStart() const
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:24
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
evf::FastMonitoringService * fms_
unsigned int gtpe_get(const unsigned char *)
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
constexpr std::array< uint32, FRDHeaderMaxVersion+1 > FRDHeaderVersionSize
tbb::concurrent_queue< InputChunk * > freeChunks_
Log< level::Warning, false > LogWarning
evf::EvFDaqDirector::FileStatus getNextEvent()
unsigned int currentChunk_
evf::EvFDaqDirector::FileStatus nextEvent()
void readNextChunkIntoBuffer(InputFile *file)
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
void setFMS(evf::FastMonitoringService *fms)
std::unique_ptr< InputFile > currentFile_
unsigned int getgpslow(const unsigned char *)
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
int events
Power< A, B >::type pow(const A &a, const B &b)
Definition: Power.h:29
def move(src, dest)
Definition: eostools.py:511
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
Definition: event.py:1
#define LogDebug(id)
void setInState(FastMonState::InputState inputState)
uint32_t bufferPosition_