CMS 3D CMS Logo

FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <memory>
5 #include <sstream>
6 #include <sys/types.h>
7 #include <sys/file.h>
8 #include <sys/time.h>
9 #include <unistd.h>
10 #include <vector>
11 #include <fstream>
12 #include <zlib.h>
13 #include <cstdio>
14 #include <chrono>
15 
16 #include <boost/algorithm/string.hpp>
17 
21 
23 
30 
32 
34 
38 
40 
45 
46 //JSON file reader
48 
49 using namespace evf::FastMonState;
50 
52  : edm::RawInputSource(pset, desc),
53  defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
54  eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
55  eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
56  numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
57  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
58  getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
59  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
60  verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
61  useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
62  testTCDSFEDRange_(
63  pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())),
64  fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
65  fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
66  fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
67  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
68  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
69  eventID_(),
70  processHistoryID_(),
71  currentLumiSection_(0),
72  tcds_pointer_(nullptr),
73  eventsThisLumi_(0) {
74  char thishost[256];
75  gethostname(thishost, 255);
76  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
77  << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
78 
79  if (!testTCDSFEDRange_.empty()) {
80  if (testTCDSFEDRange_.size() != 2) {
81  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
82  << "Invalid TCDS Test FED range parameter";
83  }
86  }
87 
88  long autoRunNumber = -1;
89  if (fileListMode_) {
90  autoRunNumber = initFileList();
91  if (!fileListLoopMode_) {
92  if (autoRunNumber < 0)
93  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
94  //override run number
95  runNumber_ = (edm::RunNumber_t)autoRunNumber;
96  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
97  }
98  }
99 
101  setNewRun();
102  //todo:autodetect from file name (assert if names differ)
104 
105  //make sure that chunk size is N * block size
110 
111  if (!numBuffers_)
112  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
113  << "no reading enabled with numBuffers parameter 0";
114 
117  readingFilesCount_ = 0;
118 
119  if (!crc32c_hw_test())
120  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
121 
122  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
123  if (fileListMode_) {
124  try {
126  } catch (cms::Exception const&) {
127  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
128  }
129  } else {
131  if (!fms_) {
132  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
133  }
134  }
135 
137  if (!daqDirector_)
138  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
139 
141  if (useFileBroker_)
142  edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
143  //set DaqDirector to delete files in preGlobalEndLumi callback
145  if (fms_) {
147  fms_->setInputSource(this);
150  }
151  //should delete chunks when run stops
152  for (unsigned int i = 0; i < numBuffers_; i++) {
154  }
155 
156  quit_threads_ = false;
157 
158  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
159  std::unique_lock<std::mutex> lk(startupLock_);
160  //issue a memory fence here and in threads (constructor was segfaulting without this)
161  thread_quit_signal.push_back(false);
162  workerJob_.push_back(ReaderInfo(nullptr, nullptr));
163  cvReader_.push_back(new std::condition_variable);
164  tid_active_.push_back(0);
165  threadInit_.store(false, std::memory_order_release);
166  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
167  startupCv_.wait(lk);
168  }
169 
170  runAuxiliary()->setProcessHistoryID(processHistoryID_);
171 }
172 
174  quit_threads_ = true;
175 
176  //delete any remaining open files
177  if (!fms_ || !fms_->exceptionDetected()) {
178  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
179  it->second.reset();
180  } else {
181  //skip deleting files with exception
182  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
183  //it->second->unsetDeleteFile();
184  if (fms_->isExceptionOnData(it->second->lumi_))
185  it->second->unsetDeleteFile();
186  else
187  it->second.reset();
188  }
189  //disable deleting current file with exception
190  if (currentFile_.get())
191  if (fms_->isExceptionOnData(currentFile_->lumi_))
192  currentFile_->unsetDeleteFile();
193  }
194 
196  readSupervisorThread_->join();
197  } else {
198  //join aux threads in case the supervisor thread was not started
199  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
200  std::unique_lock<std::mutex> lk(mReader_);
201  thread_quit_signal[i] = true;
202  cvReader_[i]->notify_one();
203  lk.unlock();
204  workerThreads_[i]->join();
205  delete workerThreads_[i];
206  }
207  }
208  for (unsigned int i = 0; i < numConcurrentReads_; i++)
209  delete cvReader_[i];
210  /*
211  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
212  InputChunk *ch;
213  while (!freeChunks_.try_pop(ch)) {}
214  delete ch;
215  }
216  */
217 }
218 
221  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
222  desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
223  desc.addUntracked<unsigned int>("eventChunkBlock", 32)
224  ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
225  desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
226  desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
227  ->setComment("Maximum number of simultaneously buffered raw files");
228  desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
229  ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
230  desc.addUntracked<bool>("verifyChecksum", true)
231  ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
232  desc.addUntracked<bool>("useL1EventID", false)
233  ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
234  desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
235  ->setComment("[min, max] range to search for TCDS FED ID in test setup");
236  desc.addUntracked<bool>("fileListMode", false)
237  ->setComment("Use fileNames parameter to directly specify raw files to open");
238  desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
239  ->setComment("file list used when fileListMode is enabled");
240  desc.setAllowAnything();
241  descriptions.add("source", desc);
242 }
243 
246  //this thread opens new files and dispatches reading to worker readers
247  //threadInit_.store(false,std::memory_order_release);
248  std::unique_lock<std::mutex> lk(startupLock_);
249  readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
251  startupCv_.wait(lk);
252  }
253  //signal hltd to start event accounting
254  if (!currentLumiSection_)
257  switch (nextEvent()) {
259  //maybe create EoL file in working directory before ending run
260  struct stat buf;
261  if (!useFileBroker_ && currentLumiSection_ > 0) {
262  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
263  if (eolFound) {
265  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
266  if (!found) {
268  int eol_fd =
269  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
270  close(eol_fd);
272  }
273  }
274  }
275  //also create EoR file in FU data directory
276  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
277  if (!eorFound) {
278  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
279  O_RDWR | O_CREAT,
280  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
281  close(eor_fd);
282  }
284  eventsThisLumi_ = 0;
286  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
287  return Next::kStop;
288  }
290  //this is not reachable
291  return Next::kEvent;
292  }
294  //std::cout << "--------------NEW LUMI---------------" << std::endl;
295  return Next::kEvent;
296  }
297  default: {
298  if (!getLSFromFilename_) {
299  //get new lumi from file header
300  if (event_->lumi() > currentLumiSection_) {
302  eventsThisLumi_ = 0;
304  }
305  }
308  else
309  eventRunNumber_ = event_->run();
310  L1EventID_ = event_->event();
311 
312  setEventCached();
313 
314  return Next::kEvent;
315  }
316  }
317 }
318 
319 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
320  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
321  if (!useFileBroker_) {
322  if (currentLumiSection_ > 0) {
324  struct stat buf;
325  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
326  if (!found) {
328  int eol_fd =
329  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
330  close(eol_fd);
331  daqDirector_->createBoLSFile(lumiSection, false);
333  }
334  } else
335  daqDirector_->createBoLSFile(lumiSection, true); //needed for initial lumisection
336  }
337 
338  currentLumiSection_ = lumiSection;
339 
341 
342  timeval tv;
343  gettimeofday(&tv, nullptr);
344  const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
345 
347  runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
348 
349  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
350  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
351 
352  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
353  }
354 }
355 
359  if (edm::shutdown_flag.load(std::memory_order_relaxed))
360  break;
361  }
362  return status;
363 }
364 
366  if (setExceptionState_)
367  threadError();
368  if (!currentFile_.get()) {
371  if (!fileQueue_.try_pop(currentFile_)) {
372  //sleep until wakeup (only in single-buffer mode) or timeout
373  std::unique_lock<std::mutex> lkw(mWakeup_);
374  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
376  }
377  status = currentFile_->status_;
380  currentFile_.reset();
381  return status;
382  } else if (status == evf::EvFDaqDirector::runAbort) {
383  throw cms::Exception("FedRawDataInputSource::getNextEvent")
384  << "Run has been aborted by the input source reader thread";
385  } else if (status == evf::EvFDaqDirector::newLumi) {
387  if (getLSFromFilename_) {
388  if (currentFile_->lumi_ > currentLumiSection_) {
390  eventsThisLumi_ = 0;
392  }
393  } else { //let this be picked up from next event
395  }
396  currentFile_.reset();
397  return status;
398  } else if (status == evf::EvFDaqDirector::newFile) {
400  } else
401  assert(false);
402  }
404 
405  //file is empty
406  if (!currentFile_->fileSize_) {
408  //try to open new lumi
409  assert(currentFile_->nChunks_ == 0);
410  if (getLSFromFilename_)
411  if (currentFile_->lumi_ > currentLumiSection_) {
413  eventsThisLumi_ = 0;
415  }
416  //immediately delete empty file
417  currentFile_.reset();
419  }
420 
421  //file is finished
422  if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
424  //release last chunk (it is never released elsewhere)
425  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
426  if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
427  throw cms::Exception("FedRawDataInputSource::getNextEvent")
428  << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
429  << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
430  }
431  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
432  if (singleBufferMode_) {
433  std::unique_lock<std::mutex> lkw(mWakeup_);
434  cvWakeup_.notify_one();
435  }
436  bufferInputRead_ = 0;
438  //put the file in pending delete list;
439  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
440  filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
441  } else {
442  //in single-thread and stream jobs, events are already processed
443  currentFile_.reset();
444  }
446  }
447 
448  //handle RAW file header
449  if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
450  if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
451  if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
452  throw cms::Exception("FedRawDataInputSource::getNextEvent")
453  << "Premature end of input file while reading file header";
454 
455  edm::LogWarning("FedRawDataInputSource")
456  << "File with only raw header and no events received in LS " << currentFile_->lumi_;
457  if (getLSFromFilename_)
458  if (currentFile_->lumi_ > currentLumiSection_) {
460  eventsThisLumi_ = 0;
462  }
463  }
464 
465  //advance buffer position to skip file header (chunk will be acquired later)
466  currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
467  currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
468  }
469 
470  //file is too short
471  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
472  throw cms::Exception("FedRawDataInputSource::getNextEvent")
473  << "Premature end of input file while reading event header";
474  }
475  if (singleBufferMode_) {
476  //should already be there
478  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
479  usleep(10000);
480  if (currentFile_->parent_->exceptionState() || setExceptionState_)
481  currentFile_->parent_->threadError();
482  }
484 
485  unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
486 
487  //conditions when read amount is not sufficient for the header to fit
491 
492  if (detectedFRDversion_ == 0) {
493  detectedFRDversion_ = *((uint16_t*)dataPosition);
495  throw cms::Exception("FedRawDataInputSource::getNextEvent")
496  << "Unknown FRD version -: " << detectedFRDversion_;
498  }
499 
500  //recalculate chunk position
501  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
503  throw cms::Exception("FedRawDataInputSource::getNextEvent")
504  << "Premature end of input file while reading event header";
505  }
506  }
507 
508  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
509  if (event_->size() > eventChunkSize_) {
510  throw cms::Exception("FedRawDataInputSource::getNextEvent")
511  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
512  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
513  << " bytes";
514  }
515 
516  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
517 
518  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
519  throw cms::Exception("FedRawDataInputSource::getNextEvent")
520  << "Premature end of input file while reading event data";
521  }
522  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
524  //recalculate chunk position
525  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
526  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
527  }
528  currentFile_->bufferPosition_ += event_->size();
529  currentFile_->chunkPosition_ += event_->size();
530  //last chunk is released when this function is invoked next time
531 
532  }
533  //multibuffer mode:
534  else {
535  //wait for the current chunk to become added to the vector
537  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
538  usleep(10000);
539  if (setExceptionState_)
540  threadError();
541  }
543 
544  //check if header is at the boundary of two chunks
545  chunkIsFree_ = false;
546  unsigned char* dataPosition;
547 
548  //read header, copy it to a single chunk if necessary
549  bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
550 
551  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
552  if (event_->size() > eventChunkSize_) {
553  throw cms::Exception("FedRawDataInputSource::getNextEvent")
554  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
555  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
556  << " bytes";
557  }
558 
559  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
560 
561  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
562  throw cms::Exception("FedRawDataInputSource::getNextEvent")
563  << "Premature end of input file while reading event data";
564  }
565 
566  if (chunkEnd) {
567  //header was at the chunk boundary, we will have to move payload as well
568  currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
569  chunkIsFree_ = true;
570  } else {
571  //header was contiguous, but check if payload fits the chunk
572  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
573  //rewind to header start position
575  //copy event to a chunk start and move pointers
576 
578 
579  chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
580 
582 
583  assert(chunkEnd);
584  chunkIsFree_ = true;
585  //header is moved
586  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
587  } else {
588  //everything is in a single chunk, only move pointers forward
589  chunkEnd = currentFile_->advance(dataPosition, msgSize);
590  assert(!chunkEnd);
591  chunkIsFree_ = false;
592  }
593  }
594  } //end multibuffer mode
596 
597  if (verifyChecksum_ && event_->version() >= 5) {
598  uint32_t crc = 0;
599  crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
600  if (crc != event_->crc32c()) {
601  if (fms_)
603  throw cms::Exception("FedRawDataInputSource::getNextEvent")
604  << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
605  << crc;
606  }
607  } else if (verifyChecksum_ && event_->version() >= 3) {
608  uint32_t adler = adler32(0L, Z_NULL, 0);
609  adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
610 
611  if (adler != event_->adler32()) {
612  if (fms_)
614  throw cms::Exception("FedRawDataInputSource::getNextEvent")
615  << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
616  << adler;
617  }
618  }
620 
621  currentFile_->nProcessed_++;
622 
624 }
625 
628  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
629  bool tcdsInRange;
630  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);
631 
632  if (useL1EventID_) {
635  aux.setProcessHistoryID(processHistoryID_);
636  makeEvent(eventPrincipal, aux);
637  } else if (tcds_pointer_ == nullptr) {
638  if (!GTPEventID_) {
639  throw cms::Exception("FedRawDataInputSource::read")
640  << "No TCDS or GTP FED in event with FEDHeader EID -: " << L1EventID_;
641  }
644  aux.setProcessHistoryID(processHistoryID_);
645  makeEvent(eventPrincipal, aux);
646  } else {
647  const FEDHeader fedHeader(tcds_pointer_);
648  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
653  event_->isRealData(),
654  static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
655  processGUID(),
657  !tcdsInRange);
658  aux.setProcessHistoryID(processHistoryID_);
659  makeEvent(eventPrincipal, aux);
660  }
661 
662  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
663 
665 
666  eventsThisLumi_++;
668 
669  //resize vector if needed
670  while (streamFileTracker_.size() <= eventPrincipal.streamID())
671  streamFileTracker_.push_back(-1);
672 
673  streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
674 
675  //this old file check runs no more often than every 10 events
676  if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
677  //delete files that are not in processing
678  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
679  auto it = filesToDelete_.begin();
680  while (it != filesToDelete_.end()) {
681  bool fileIsBeingProcessed = false;
682  for (unsigned int i = 0; i < nStreams_; i++) {
683  if (it->first == streamFileTracker_.at(i)) {
684  fileIsBeingProcessed = true;
685  break;
686  }
687  }
688  if (!fileIsBeingProcessed && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
689  std::string fileToDelete = it->second->fileName_;
690  it = filesToDelete_.erase(it);
691  } else
692  it++;
693  }
694  }
695  if (chunkIsFree_)
696  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
697  chunkIsFree_ = false;
699  return;
700 }
701 
704  timeval stv;
705  gettimeofday(&stv, nullptr);
706  time = stv.tv_sec;
707  time = (time << 32) + stv.tv_usec;
708  edm::Timestamp tstamp(time);
709 
710  uint32_t eventSize = event_->eventSize();
711  unsigned char* event = (unsigned char*)event_->payload();
712  GTPEventID_ = 0;
713  tcds_pointer_ = nullptr;
714  tcdsInRange = false;
715  uint16_t selectedTCDSFed = 0;
716  while (eventSize > 0) {
717  assert(eventSize >= FEDTrailer::length);
718  eventSize -= FEDTrailer::length;
719  const FEDTrailer fedTrailer(event + eventSize);
720  const uint32_t fedSize = fedTrailer.fragmentLength() << 3; //trailer length counts in 8 bytes
721  assert(eventSize >= fedSize - FEDHeader::length);
722  eventSize -= (fedSize - FEDHeader::length);
723  const FEDHeader fedHeader(event + eventSize);
724  const uint16_t fedId = fedHeader.sourceID();
726  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
727  } else if (fedId >= MINTCDSuTCAFEDID_ && fedId <= MAXTCDSuTCAFEDID_) {
728  if (!selectedTCDSFed) {
729  selectedTCDSFed = fedId;
730  tcds_pointer_ = event + eventSize;
732  tcdsInRange = true;
733  }
734  } else
735  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
736  << "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
737  }
739  if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
740  GTPEventID_ = evf::evtn::get(event + eventSize, true);
741  else
742  GTPEventID_ = evf::evtn::get(event + eventSize, false);
743  //evf::evtn::evm_board_setformat(fedSize);
744  const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
745  const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
746  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
747  }
748  //take event ID from GTPE FED
750  if (evf::evtn::gtpe_board_sense(event + eventSize)) {
751  GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
752  }
753  }
754  FEDRawData& fedData = rawData.FEDData(fedId);
755  fedData.resize(fedSize);
756  memcpy(fedData.data(), event + eventSize, fedSize);
757  }
758  assert(eventSize == 0);
759 
760  return tstamp;
761 }
762 
764 
766  bool stop = false;
767  unsigned int currentLumiSection = 0;
768  //threadInit_.exchange(true,std::memory_order_acquire);
769 
770  {
771  std::unique_lock<std::mutex> lk(startupLock_);
772  startupCv_.notify_one();
773  }
774 
775  uint32_t ls = 0;
776  uint32_t monLS = 1;
777  uint32_t lockCount = 0;
778  uint64_t sumLockWaitTimeUs = 0.;
779 
780  while (!stop) {
781  //wait for at least one free thread and chunk
782  int counter = 0;
783 
784  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
786  //report state to monitoring
787  if (fms_) {
788  bool copy_active = false;
789  for (auto j : tid_active_)
790  if (j)
791  copy_active = true;
794  else if (freeChunks_.empty()) {
795  if (copy_active)
797  else
799  } else {
800  if (copy_active)
802  else
804  }
805  }
806  std::unique_lock<std::mutex> lkw(mWakeup_);
807  //sleep until woken up by condition or a timeout
808  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
809  counter++;
810  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
811  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
812  } else {
813  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
814  }
815  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
816  stop = true;
817  break;
818  }
819  }
820  //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
821 
822  if (stop)
823  break;
824 
825  //look for a new file
826  std::string nextFile;
827  uint32_t fileSizeIndex;
828  int64_t fileSizeFromMetadata;
829 
830  if (fms_) {
833  }
834 
836  uint16_t rawHeaderSize = 0;
837  uint32_t lsFromRaw = 0;
838  int32_t serverEventsInNewFile = -1;
839  int rawFd = -1;
840 
841  int backoff_exp = 0;
842 
843  //entering loop which tries to grab new file from ramdisk
845  //check if hltd has signalled to throttle input
846  counter = 0;
847  while (daqDirector_->inputThrottled()) {
848  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
849  break;
850 
851  unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
852  unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
853  unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
854  bool hasDiscardedLumi = false;
855  for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
857  edm::LogWarning("FedRawDataInputSource") << "Source detected that the lumisection is discarded -: " << i;
858  hasDiscardedLumi = true;
859  break;
860  }
861  }
862  if (hasDiscardedLumi)
863  break;
864 
866 
867  if (!(counter % 50))
868  edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
869  usleep(100000);
870  counter++;
871  }
872 
873  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
874  stop = true;
875  break;
876  }
877 
878  assert(rawFd == -1);
879  uint64_t thisLockWaitTimeUs = 0.;
881  if (fileListMode_) {
882  //return LS if LS not set, otherwise return file
883  status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
885  uint16_t rawDataType;
887  rawFd,
888  rawHeaderSize,
889  rawDataType,
890  lsFromRaw,
891  serverEventsInNewFile,
892  fileSizeFromMetadata,
893  false,
894  false,
895  false) != 0) {
896  //error
897  setExceptionState_ = true;
898  stop = true;
899  break;
900  }
901  if (!getLSFromFilename_)
902  ls = lsFromRaw;
903  }
904  } else if (!useFileBroker_)
906  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
907  else {
908  status = daqDirector_->getNextFromFileBroker(currentLumiSection,
909  ls,
910  nextFile,
911  rawFd,
912  rawHeaderSize,
913  serverEventsInNewFile,
914  fileSizeFromMetadata,
915  thisLockWaitTimeUs);
916  }
917 
919 
920  //cycle through all remaining LS even if no files get assigned
921  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
923 
924  //monitoring of lock wait time
925  if (thisLockWaitTimeUs > 0.)
926  sumLockWaitTimeUs += thisLockWaitTimeUs;
927  lockCount++;
928  if (ls > monLS) {
929  monLS = ls;
930  if (lockCount)
931  if (fms_)
932  fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
933  lockCount = 0;
934  sumLockWaitTimeUs = 0;
935  }
936 
937  //check again for any remaining index/EoLS files after EoR file is seen
940  usleep(100000);
941  //now all files should have appeared in ramdisk, check again if any raw files were left behind
943  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
944  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
946  }
947 
949  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
950  fileQueue_.push(std::move(inf));
951  stop = true;
952  break;
953  }
954 
955  //error from filelocking function
957  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
958  fileQueue_.push(std::move(inf));
959  stop = true;
960  break;
961  }
962  //queue new lumisection
963  if (getLSFromFilename_) {
964  if (ls > currentLumiSection) {
965  if (!useFileBroker_) {
966  //file locking
967  //setMonStateSup(inSupNewLumi);
968  currentLumiSection = ls;
969  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
970  fileQueue_.push(std::move(inf));
971  } else {
972  //new file service
973  if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
975  //start transitions from LS specified by env, continue if not reached
976  if (ls < daqDirector_->getStartLumisectionFromEnv()) {
977  //skip file if from earlier LS than specified by env
978  if (rawFd != -1) {
979  close(rawFd);
980  rawFd = -1;
981  }
983  continue;
984  } else {
985  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
986  fileQueue_.push(std::move(inf));
987  }
988  } else if (ls < 100) {
989  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
990  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
991 
992  for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
993  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
994  fileQueue_.push(std::move(inf));
995  }
996  } else {
997  //start from current LS
998  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
999  fileQueue_.push(std::move(inf));
1000  }
1001  } else {
1002  //queue all lumisections after last one seen to avoid gaps
1003  for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
1004  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1005  fileQueue_.push(std::move(inf));
1006  }
1007  }
1008  currentLumiSection = ls;
1009  }
1010  }
1011  //else
1012  if (currentLumiSection > 0 && ls < currentLumiSection) {
1013  edm::LogError("FedRawDataInputSource")
1014  << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
1015  << ". Aborting execution." << std::endl;
1016  if (rawFd != -1)
1017  close(rawFd);
1018  rawFd = -1;
1019  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
1020  fileQueue_.push(std::move(inf));
1021  stop = true;
1022  break;
1023  }
1024  }
1025 
1026  int dbgcount = 0;
1029  dbgcount++;
1030  if (!(dbgcount % 20))
1031  LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1032  if (!useFileBroker_)
1033  usleep(100000);
1034  else {
1035  backoff_exp = std::min(4, backoff_exp); // max 1.6 seconds
1036  //backoff_exp=0; // disabled!
1037  int sleeptime = (int)(100000. * pow(2, backoff_exp));
1038  usleep(sleeptime);
1039  backoff_exp++;
1040  }
1041  } else
1042  backoff_exp = 0;
1043  }
1044  //end of file grab loop, parse result
1047  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1048 
1049  std::string rawFile;
1050  //file service will report raw extension
1051  if (useFileBroker_ || rawHeaderSize)
1052  rawFile = nextFile;
1053  else {
1054  std::filesystem::path rawFilePath(nextFile);
1055  rawFile = rawFilePath.replace_extension(".raw").string();
1056  }
1057 
1058  struct stat st;
1059  int stat_res = stat(rawFile.c_str(), &st);
1060  if (stat_res == -1) {
1061  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
1062  setExceptionState_ = true;
1063  break;
1064  }
1065  uint64_t fileSize = st.st_size;
1066 
1067  if (fms_) {
1071  }
1072  int eventsInNewFile;
1073  if (fileListMode_) {
1074  if (fileSize == 0)
1075  eventsInNewFile = 0;
1076  else
1077  eventsInNewFile = -1;
1078  } else {
1080  if (!useFileBroker_) {
1081  if (rawHeaderSize) {
1082  int rawFdEmpty = -1;
1083  uint16_t rawHeaderCheck;
1084  bool fileFound;
1085  eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
1086  nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
1087  assert(fileFound && rawHeaderCheck == rawHeaderSize);
1089  } else
1090  eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1091  } else
1092  eventsInNewFile = serverEventsInNewFile;
1093  assert(eventsInNewFile >= 0);
1094  assert((eventsInNewFile > 0) ==
1095  (fileSize > rawHeaderSize)); //file without events must be empty or contain only header
1096  }
1097 
1098  if (!singleBufferMode_) {
1099  //calculate number of needed chunks
1100  unsigned int neededChunks = fileSize / eventChunkSize_;
1101  if (fileSize % eventChunkSize_)
1102  neededChunks++;
1103 
1104  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1105  ls,
1106  rawFile,
1107  !fileListMode_,
1108  rawFd,
1109  fileSize,
1110  rawHeaderSize,
1111  neededChunks,
1112  eventsInNewFile,
1113  this));
1115  auto newInputFilePtr = newInputFile.get();
1116  fileQueue_.push(std::move(newInputFile));
1117 
1118  for (unsigned int i = 0; i < neededChunks; i++) {
1119  if (fms_) {
1120  bool copy_active = false;
1121  for (auto j : tid_active_)
1122  if (j)
1123  copy_active = true;
1124  if (copy_active)
1126  else
1128  }
1129  //get thread
1130  unsigned int newTid = 0xffffffff;
1131  while (!workerPool_.try_pop(newTid)) {
1132  usleep(100000);
1133  if (quit_threads_.load(std::memory_order_relaxed)) {
1134  stop = true;
1135  break;
1136  }
1137  }
1138 
1139  if (fms_) {
1140  bool copy_active = false;
1141  for (auto j : tid_active_)
1142  if (j)
1143  copy_active = true;
1144  if (copy_active)
1146  else
1148  }
1149  InputChunk* newChunk = nullptr;
1150  while (!freeChunks_.try_pop(newChunk)) {
1151  usleep(100000);
1152  if (quit_threads_.load(std::memory_order_relaxed)) {
1153  stop = true;
1154  break;
1155  }
1156  }
1157 
1158  if (newChunk == nullptr) {
1159  //return unused tid if we received shutdown (nullptr chunk)
1160  if (newTid != 0xffffffff)
1161  workerPool_.push(newTid);
1162  stop = true;
1163  break;
1164  }
1165  if (stop)
1166  break;
1168 
1169  std::unique_lock<std::mutex> lk(mReader_);
1170 
1171  unsigned int toRead = eventChunkSize_;
1172  if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1173  toRead = fileSize % eventChunkSize_;
1174  newChunk->reset(i * eventChunkSize_, toRead, i);
1175 
1176  workerJob_[newTid].first = newInputFilePtr;
1177  workerJob_[newTid].second = newChunk;
1178 
1179  //wake up the worker thread
1180  cvReader_[newTid]->notify_one();
1181  }
1182  } else {
1183  if (!eventsInNewFile) {
1184  if (rawFd) {
1185  close(rawFd);
1186  rawFd = -1;
1187  }
1188  //still queue file for lumi update
1189  std::unique_lock<std::mutex> lkw(mWakeup_);
1190  //TODO: also file with only file header fits in this edge case. Check if read correctly in single buffer mode
1191  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1192  ls,
1193  rawFile,
1194  !fileListMode_,
1195  rawFd,
1196  fileSize,
1197  rawHeaderSize,
1198  (rawHeaderSize > 0),
1199  0,
1200  this));
1202  fileQueue_.push(std::move(newInputFile));
1203  cvWakeup_.notify_one();
1204  break;
1205  }
1206  //in single-buffer mode put single chunk in the file and let the main thread read the file
1207  InputChunk* newChunk = nullptr;
1208  //should be available immediately
1209  while (!freeChunks_.try_pop(newChunk)) {
1210  usleep(100000);
1211  if (quit_threads_.load(std::memory_order_relaxed)) {
1212  stop = true;
1213  break;
1214  }
1215  }
1216 
1217  if (newChunk == nullptr) {
1218  stop = true;
1219  }
1220 
1221  if (stop)
1222  break;
1223 
1224  std::unique_lock<std::mutex> lkw(mWakeup_);
1225 
1226  unsigned int toRead = eventChunkSize_;
1227  if (fileSize % eventChunkSize_)
1228  toRead = fileSize % eventChunkSize_;
1229  newChunk->reset(0, toRead, 0);
1230  newChunk->readComplete_ = true;
1231 
1232  //push file and wakeup main thread
1233  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1234  ls,
1235  rawFile,
1236  !fileListMode_,
1237  rawFd,
1238  fileSize,
1239  rawHeaderSize,
1240  1,
1241  eventsInNewFile,
1242  this));
1243  newInputFile->chunks_[0] = newChunk;
1245  fileQueue_.push(std::move(newInputFile));
1246  cvWakeup_.notify_one();
1247  }
1248  }
1249  }
1251  //make sure threads finish reading
1252  unsigned numFinishedThreads = 0;
1253  while (numFinishedThreads < workerThreads_.size()) {
1254  unsigned tid = 0;
1255  while (!workerPool_.try_pop(tid)) {
1256  usleep(10000);
1257  }
1258  std::unique_lock<std::mutex> lk(mReader_);
1259  thread_quit_signal[tid] = true;
1260  cvReader_[tid]->notify_one();
1261  numFinishedThreads++;
1262  }
1263  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1264  workerThreads_[i]->join();
1265  delete workerThreads_[i];
1266  }
1267 }
1268 
1269 void FedRawDataInputSource::readWorker(unsigned int tid) {
1270  bool init = true;
1271  threadInit_.exchange(true, std::memory_order_acquire);
1272 
1273  while (true) {
1274  tid_active_[tid] = false;
1275  std::unique_lock<std::mutex> lk(mReader_);
1276  workerJob_[tid].first = nullptr;
1277  workerJob_[tid].first = nullptr;
1278 
1279  assert(!thread_quit_signal[tid]); //should never get it here
1280  workerPool_.push(tid);
1281 
1282  if (init) {
1283  std::unique_lock<std::mutex> lk(startupLock_);
1284  init = false;
1285  startupCv_.notify_one();
1286  }
1287  cvReader_[tid]->wait(lk);
1288 
1289  if (thread_quit_signal[tid])
1290  return;
1291  tid_active_[tid] = true;
1292 
1293  InputFile* file;
1294  InputChunk* chunk;
1295 
1296  assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1297 
1298  file = workerJob_[tid].first;
1299  chunk = workerJob_[tid].second;
1300 
1301  //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1302  unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1303 
1304  //if only one worker thread exists, use single fd for all operations
1305  //if more worker threads exist, use rawFd_ for only the first read operation and then close file
1306  int fileDescriptor;
1307  bool fileOpenedHere = false;
1308 
1309  if (numConcurrentReads_ == 1) {
1310  fileDescriptor = file->rawFd_;
1311  if (fileDescriptor == -1) {
1312  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1313  fileOpenedHere = true;
1314  file->rawFd_ = fileDescriptor;
1315  }
1316  } else {
1317  if (chunk->offset_ == 0) {
1318  fileDescriptor = file->rawFd_;
1319  file->rawFd_ = -1;
1320  if (fileDescriptor == -1) {
1321  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1322  fileOpenedHere = true;
1323  }
1324  } else {
1325  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1326  fileOpenedHere = true;
1327  }
1328  }
1329 
1330  if (fileDescriptor < 0) {
1331  edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1332  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1333  setExceptionState_ = true;
1334  continue;
1335  }
1336  if (fileOpenedHere) { //fast forward to this chunk position
1337  off_t pos = 0;
1338  pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1339  if (pos == -1) {
1340  edm::LogError("FedRawDataInputSource")
1341  << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1342  << chunk->offset_ << " error: " << strerror(errno);
1343  setExceptionState_ = true;
1344  continue;
1345  }
1346  }
1347 
1348  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1349  << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1350 
1351  unsigned int skipped = bufferLeft;
1353  for (unsigned int i = 0; i < readBlocks_; i++) {
1354  ssize_t last;
1355 
1356  //protect against reading into next block
1357  last = ::read(fileDescriptor,
1358  (void*)(chunk->buf_ + bufferLeft),
1359  std::min(chunk->usedSize_ - bufferLeft, (uint64_t)eventChunkBlock_));
1360 
1361  if (last < 0) {
1362  edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1363  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1364  setExceptionState_ = true;
1365  break;
1366  }
1367  if (last > 0)
1368  bufferLeft += last;
1369  if (last < eventChunkBlock_) { //last read
1370  //check if this is last block, then total read size must match file size
1371  if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (unsigned int)last)) {
1372  edm::LogError("FedRawDataInputSource")
1373  << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1374  << " expectedChunkSize:" << chunk->usedSize_
1375  << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1376  << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1377  setExceptionState_ = true;
1378  }
1379  break;
1380  }
1381  }
1382  if (setExceptionState_)
1383  continue;
1384 
1386  auto diff = end - start;
1387  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1388  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1389  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1390  << " GB/s)";
1391 
1392  if (chunk->offset_ + bufferLeft == file->fileSize_) { //file reading finished using same fd
1393  close(fileDescriptor);
1394  fileDescriptor = -1;
1395  if (numConcurrentReads_ == 1)
1396  file->rawFd_ = -1;
1397  }
1398  if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1399  close(fileDescriptor);
1400 
1401  //detect FRD event version. Skip file Header if it exists
1402  if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1403  detectedFRDversion_ = *((uint16_t*)(chunk->buf_ + file->rawHeaderSize_));
1404  }
1406  chunk->readComplete_ =
1407  true; //this is atomic to secure the sequential buffer fill before becoming available for processing)
1408  file->chunks_[chunk->fileIndex_] = chunk; //put the completed chunk in the file chunk vector at predetermined index
1409  }
1410 }
1411 
1413  quit_threads_ = true;
1414  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1415 }
1416 
1418  if (fms_)
1419  fms_->setInState(state);
1420 }
1421 
1423  if (fms_)
1425 }
1426 
1427 inline bool InputFile::advance(unsigned char*& dataPosition, const size_t size) {
1428  //wait for chunk
1429 
1430  while (!waitForChunk(currentChunk_)) {
1432  usleep(100000);
1434  if (parent_->exceptionState())
1435  parent_->threadError();
1436  }
1437 
1438  dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1439  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1440 
1441  if (currentLeft < size) {
1442  //we need next chunk
1443  while (!waitForChunk(currentChunk_ + 1)) {
1445  usleep(100000);
1447  if (parent_->exceptionState())
1448  parent_->threadError();
1449  }
1450  //copy everything to beginning of the first chunk
1451  dataPosition -= chunkPosition_;
1452  assert(dataPosition == chunks_[currentChunk_]->buf_);
1453  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1454  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1455  //set pointers at the end of the old data position
1456  bufferPosition_ += size;
1457  chunkPosition_ = size - currentLeft;
1458  currentChunk_++;
1459  return true;
1460  } else {
1461  chunkPosition_ += size;
1462  bufferPosition_ += size;
1463  return false;
1464  }
1465 }
1466 
1467 void InputFile::moveToPreviousChunk(const size_t size, const size_t offset) {
1468  //this will fail in case of events that are too large
1470  assert(size - offset < chunks_[currentChunk_]->size_);
1471  memcpy(chunks_[currentChunk_ - 1]->buf_ + offset, chunks_[currentChunk_]->buf_ + chunkPosition_, size);
1472  chunkPosition_ += size;
1473  bufferPosition_ += size;
1474 }
1475 
1476 void InputFile::rewindChunk(const size_t size) {
1477  chunkPosition_ -= size;
1478  bufferPosition_ -= size;
1479 }
1480 
1482  if (rawFd_ != -1)
1483  close(rawFd_);
1484 
1485  if (deleteFile_) {
1486  for (auto& fileName : fileNames_) {
1487  if (!fileName.empty()) {
1489  try {
1490  //sometimes this fails but file gets deleted
1491  LogDebug("FedRawDataInputSource:InputFile") << "Deleting input file -:" << fileName;
1493  continue;
1494  } catch (const std::filesystem::filesystem_error& ex) {
1495  edm::LogError("FedRawDataInputSource:InputFile")
1496  << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() << ". Trying again.";
1497  } catch (std::exception& ex) {
1498  edm::LogError("FedRawDataInputSource:InputFile")
1499  << " - deleteFile std::exception CAUGHT -: " << ex.what() << ". Trying again.";
1500  }
1502  }
1503  }
1504  }
1505 }
1506 
1507 //single-buffer mode file reading
1509  uint32_t existingSize = 0;
1510 
1511  if (fileDescriptor_ < 0) {
1512  bufferInputRead_ = 0;
1513  if (file->rawFd_ == -1) {
1514  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1515  if (file->rawHeaderSize_)
1516  lseek(fileDescriptor_, file->rawHeaderSize_, SEEK_SET);
1517  } else
1518  fileDescriptor_ = file->rawFd_;
1519 
1520  //skip header size in destination buffer (chunk position was already adjusted)
1521  bufferInputRead_ += file->rawHeaderSize_;
1522  existingSize += file->rawHeaderSize_;
1523 
1524  if (fileDescriptor_ >= 0)
1525  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1526  else {
1527  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1528  << "failed to open file " << std::endl
1529  << file->fileName_ << " fd:" << fileDescriptor_;
1530  }
1531  //fill chunk (skipping file header if present)
1532  for (unsigned int i = 0; i < readBlocks_; i++) {
1533  const ssize_t last = ::read(fileDescriptor_,
1534  (void*)(file->chunks_[0]->buf_ + existingSize),
1535  eventChunkBlock_ - (i == readBlocks_ - 1 ? existingSize : 0));
1537  existingSize += last;
1538  }
1539 
1540  } else {
1541  //continue reading
1542  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1543  for (unsigned int i = 0; i < readBlocks_; i++) {
1544  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1546  existingSize += last;
1547  }
1548  } else {
1549  //event didn't fit in last chunk, so leftover must be moved to the beginning and completed
1550  uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1551  memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1552 
1553  //calculate amount of data that can be added
1554  const uint32_t blockcount = file->chunkPosition_ / eventChunkBlock_;
1555  const uint32_t leftsize = file->chunkPosition_ % eventChunkBlock_;
1556 
1557  for (uint32_t i = 0; i < blockcount; i++) {
1558  const ssize_t last =
1559  ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1561  existingSizeLeft += last;
1562  }
1563  if (leftsize) {
1564  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1566  }
1567  file->chunkPosition_ = 0; //data was moved to beginning of the chunk
1568  }
1569  }
1570  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1571  if (fileDescriptor_ != -1) {
1572  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1573  close(fileDescriptor_);
1574  file->rawFd_ = fileDescriptor_ = -1;
1575  }
1576  }
1577 }
1578 
1580  std::lock_guard<std::mutex> lock(monlock_);
1581  auto itr = sourceEventsReport_.find(lumi);
1582  if (itr != sourceEventsReport_.end())
1583  itr->second += events;
1584  else
1586 }
1587 
1588 std::pair<bool, unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase) {
1589  std::lock_guard<std::mutex> lock(monlock_);
1590  auto itr = sourceEventsReport_.find(lumi);
1591  if (itr != sourceEventsReport_.end()) {
1592  std::pair<bool, unsigned int> ret(true, itr->second);
1593  if (erase)
1594  sourceEventsReport_.erase(itr);
1595  return ret;
1596  } else
1597  return std::pair<bool, unsigned int>(false, 0);
1598 }
1599 
1601  std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1602  if (a.rfind('/') != std::string::npos)
1603  a = a.substr(a.rfind('/'));
1604  if (b.rfind('/') != std::string::npos)
1605  b = b.substr(b.rfind('/'));
1606  return b > a;
1607  });
1608 
1609  if (!fileNames_.empty()) {
1610  //get run number from first file in the vector
1612  std::string fileStem = fileName.stem().string();
1613  if (fileStem.find("file://") == 0)
1614  fileStem = fileStem.substr(7);
1615  else if (fileStem.find("file:") == 0)
1616  fileStem = fileStem.substr(5);
1617  auto end = fileStem.find('_');
1618 
1619  if (fileStem.find("run") == 0) {
1620  std::string runStr = fileStem.substr(3, end - 3);
1621  try {
1622  //get long to support test run numbers < 2^32
1623  long rval = std::stol(runStr);
1624  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1625  return rval;
1626  } catch (const std::exception&) {
1627  edm::LogWarning("FedRawDataInputSource")
1628  << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1629  }
1630  }
1631  }
1632  return -1;
1633 }
1634 
1636  std::string& nextFile,
1637  uint32_t& fsize,
1638  uint64_t& lockWaitTime) {
1639  if (fileListIndex_ < fileNames_.size()) {
1640  nextFile = fileNames_[fileListIndex_];
1641  if (nextFile.find("file://") == 0)
1642  nextFile = nextFile.substr(7);
1643  else if (nextFile.find("file:") == 0)
1644  nextFile = nextFile.substr(5);
1645  std::filesystem::path fileName = nextFile;
1646  std::string fileStem = fileName.stem().string();
1647  if (fileStem.find("ls"))
1648  fileStem = fileStem.substr(fileStem.find("ls") + 2);
1649  if (fileStem.find('_'))
1650  fileStem = fileStem.substr(0, fileStem.find('_'));
1651 
1652  if (!fileListLoopMode_)
1653  ls = std::stoul(fileStem);
1654  else //always starting from LS 1 in loop mode
1655  ls = 1 + loopModeIterationInc_;
1656 
1657  //fsize = 0;
1658  //lockWaitTime = 0;
1659  fileListIndex_++;
1661  } else {
1662  if (!fileListLoopMode_)
1664  else {
1665  //loop through files until interrupted
1667  fileListIndex_ = 0;
1668  return getFile(ls, nextFile, fsize, lockWaitTime);
1669  }
1670  }
1671 }
size
Write out results.
static const char runNumber_[]
uint8_t triggerType() const
Event Trigger type identifier.
Definition: FEDHeader.cc:13
Definition: start.py:1
Definition: fillJson.h:27
std::vector< std::string > fileNames_
unsigned int getgpshigh(const unsigned char *)
std::condition_variable cvWakeup_
void read(edm::EventPrincipal &eventPrincipal) override
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:232
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:330
bool gtpe_board_sense(const unsigned char *p)
constexpr size_t FRDHeaderMaxVersion
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
bool crc32c_hw_test()
Definition: crc32c.cc:354
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
static const uint32_t length
Definition: FEDTrailer.h:57
tbb::concurrent_queue< unsigned int > workerPool_
unsigned int get(const unsigned char *, bool)
bool useFileBroker() const
void maybeOpenNewLumiSection(const uint32_t lumiSection)
std::vector< std::string > fileNames_
static const uint32_t length
Definition: FEDHeader.h:54
ret
prodAgent to be discontinued
uint16_t sourceID() const
Identifier of the FED.
Definition: FEDHeader.cc:19
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
FedRawDataInputSource * parent_
volatile std::atomic< bool > shutdown_flag
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:457
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
bool advance(unsigned char *&dataPosition, const size_t size)
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
bool lumisectionDiscarded(unsigned int ls)
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &rawData, bool &tcdsInRange)
std::pair< InputFile *, InputChunk * > ReaderInfo
std::unique_ptr< std::thread > readSupervisorThread_
Log< level::Error, false > LogError
unsigned int numConcurrentLumis() const
StreamID streamID() const
int timeout
Definition: mps_check.py:53
assert(be >=bs)
bool isExceptionOnData(unsigned int ls)
std::vector< std::condition_variable * > cvReader_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool waitForChunk(unsigned int chunkid)
std::string getEoRFilePathOnFU() const
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, bool isRealData, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection, bool suppressWarning)
uint32_t fragmentLength() const
The length of the event fragment counted in 64-bit words including header and trailer.
Definition: FEDTrailer.cc:13
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
U second(std::pair< T, U > const &p)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string getEoLSFilePathOnBU(const unsigned int ls) const
static Timestamp beginOfTime()
Definition: Timestamp.h:77
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
const std::vector< unsigned int > testTCDSFEDRange_
Definition: TCDSRaw.h:16
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:197
void rewindChunk(const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
void resize(size_t newsize)
Definition: FEDRawData.cc:28
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:337
std::vector< int > streamFileTracker_
void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex)
unsigned char * buf_
ProductProvenance const & dummyProvenance() const
void setMonStateSup(evf::FastMonState::InputState state)
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::map< unsigned int, unsigned int > sourceEventsReport_
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
unsigned long long TimeValue_t
Definition: Timestamp.h:21
std::vector< std::thread * > workerThreads_
bool evm_board_sense(const unsigned char *p, size_t size)
void setMonState(evf::FastMonState::InputState state)
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:362
void createProcessingNotificationMaybe() const
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
std::atomic< bool > readComplete_
Definition: init.py:1
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
def ls(path, rec=False)
Definition: eostools.py:349
BranchDescription const & branchDescription() const
tbb::concurrent_vector< InputChunk * > chunks_
unsigned int getStartLumisectionFromEnv() const
def getRunNumber(filename)
void setInStateSup(FastMonState::InputState inputState)
unsigned long long uint64_t
Definition: Time.h:13
std::unique_ptr< FRDEventMsgView > event_
def load(fileName)
Definition: svgfig.py:547
uint32_t chunkPosition_
double b
Definition: hdecay.h:118
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:223
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:345
void stoppedLookingForFile(unsigned int lumi)
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:331
ItemType state() const
Definition: InputSource.h:332
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:462
std::atomic< bool > threadInit_
HLT enums.
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:333
void readWorker(unsigned int tid)
double a
Definition: hdecay.h:119
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:235
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
std::vector< unsigned int > thread_quit_signal
unsigned int getLumisectionToStart() const
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:24
evf::FastMonitoringService * fms_
unsigned int gtpe_get(const unsigned char *)
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
constexpr std::array< uint32, FRDHeaderMaxVersion+1 > FRDHeaderVersionSize
tbb::concurrent_queue< InputChunk * > freeChunks_
Log< level::Warning, false > LogWarning
evf::EvFDaqDirector::FileStatus getNextEvent()
unsigned int currentChunk_
evf::EvFDaqDirector::FileStatus nextEvent()
void readNextChunkIntoBuffer(InputFile *file)
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
void setFMS(evf::FastMonitoringService *fms)
std::unique_ptr< InputFile > currentFile_
unsigned int getgpslow(const unsigned char *)
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
int events
Power< A, B >::type pow(const A &a, const B &b)
Definition: Power.h:29
def move(src, dest)
Definition: eostools.py:511
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
Definition: event.py:1
#define LogDebug(id)
void setInState(FastMonState::InputState inputState)
uint32_t bufferPosition_