CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <memory>
5 #include <sstream>
6 #include <sys/types.h>
7 #include <sys/file.h>
8 #include <sys/time.h>
9 #include <unistd.h>
10 #include <vector>
11 #include <fstream>
12 #include <zlib.h>
13 #include <cstdio>
14 #include <chrono>
15 
16 #include <boost/algorithm/string.hpp>
17 
21 
23 
30 
32 
34 
38 
40 
45 
46 //JSON file reader
48 
49 using namespace evf::FastMonState;
50 
52  : edm::RawInputSource(pset, desc),
53  defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
54  eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
55  eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
56  numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
57  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
58  getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
59  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
60  verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
61  useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
62  testTCDSFEDRange_(
63  pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())),
64  fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
65  fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
66  fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
67  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
68  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
69  eventID_(),
70  processHistoryID_(),
71  currentLumiSection_(0),
72  tcds_pointer_(nullptr),
73  eventsThisLumi_(0) {
74  char thishost[256];
75  gethostname(thishost, 255);
76  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
77  << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
78 
79  if (!testTCDSFEDRange_.empty()) {
80  if (testTCDSFEDRange_.size() != 2) {
81  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
82  << "Invalid TCDS Test FED range parameter";
83  }
86  }
87 
88  long autoRunNumber = -1;
89  if (fileListMode_) {
90  autoRunNumber = initFileList();
91  if (!fileListLoopMode_) {
92  if (autoRunNumber < 0)
93  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
94  //override run number
95  runNumber_ = (edm::RunNumber_t)autoRunNumber;
96  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
97  }
98  }
99 
101  setNewRun();
102  //todo:autodetect from file name (assert if names differ)
104 
105  //make sure that chunk size is N * block size
110 
111  if (!numBuffers_)
112  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
113  << "no reading enabled with numBuffers parameter 0";
114 
117  readingFilesCount_ = 0;
118 
119  if (!crc32c_hw_test())
120  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
121 
122  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
123  if (fileListMode_) {
124  try {
126  } catch (cms::Exception const&) {
127  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
128  }
129  } else {
131  if (!fms_) {
132  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
133  }
134  }
135 
137  if (!daqDirector_)
138  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
139 
141  if (useFileBroker_)
142  edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
143  //set DaqDirector to delete files in preGlobalEndLumi callback
145  if (fms_) {
147  fms_->setInputSource(this);
150  }
151  //should delete chunks when run stops
152  for (unsigned int i = 0; i < numBuffers_; i++) {
154  }
155 
156  quit_threads_ = false;
157 
158  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
159  std::unique_lock<std::mutex> lk(startupLock_);
160  //issue a memory fence here and in threads (constructor was segfaulting without this)
161  thread_quit_signal.push_back(false);
162  workerJob_.push_back(ReaderInfo(nullptr, nullptr));
163  cvReader_.push_back(new std::condition_variable);
164  tid_active_.push_back(0);
165  threadInit_.store(false, std::memory_order_release);
166  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
167  startupCv_.wait(lk);
168  }
169 
170  runAuxiliary()->setProcessHistoryID(processHistoryID_);
171 }
172 
174  quit_threads_ = true;
175 
176  //delete any remaining open files
177  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
178  it->second.reset();
179 
181  readSupervisorThread_->join();
182  } else {
183  //join aux threads in case the supervisor thread was not started
184  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
185  std::unique_lock<std::mutex> lk(mReader_);
186  thread_quit_signal[i] = true;
187  cvReader_[i]->notify_one();
188  lk.unlock();
189  workerThreads_[i]->join();
190  delete workerThreads_[i];
191  }
192  }
193  for (unsigned int i = 0; i < numConcurrentReads_; i++)
194  delete cvReader_[i];
195  /*
196  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
197  InputChunk *ch;
198  while (!freeChunks_.try_pop(ch)) {}
199  delete ch;
200  }
201  */
202 }
203 
206  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
207  desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
208  desc.addUntracked<unsigned int>("eventChunkBlock", 32)
209  ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
210  desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
211  desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
212  ->setComment("Maximum number of simultaneously buffered raw files");
213  desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
214  ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
215  desc.addUntracked<bool>("verifyChecksum", true)
216  ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
217  desc.addUntracked<bool>("useL1EventID", false)
218  ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
219  desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
220  ->setComment("[min, max] range to search for TCDS FED ID in test setup");
221  desc.addUntracked<bool>("fileListMode", false)
222  ->setComment("Use fileNames parameter to directly specify raw files to open");
223  desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
224  ->setComment("file list used when fileListMode is enabled");
225  desc.setAllowAnything();
226  descriptions.add("source", desc);
227 }
228 
231  //this thread opens new files and dispatches reading to worker readers
232  //threadInit_.store(false,std::memory_order_release);
233  std::unique_lock<std::mutex> lk(startupLock_);
234  readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
236  startupCv_.wait(lk);
237  }
238  //signal hltd to start event accounting
239  if (!currentLumiSection_)
242  switch (nextEvent()) {
244  //maybe create EoL file in working directory before ending run
245  struct stat buf;
246  if (!useFileBroker_ && currentLumiSection_ > 0) {
247  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
248  if (eolFound) {
250  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
251  if (!found) {
253  int eol_fd =
254  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
255  close(eol_fd);
257  }
258  }
259  }
260  //also create EoR file in FU data directory
261  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
262  if (!eorFound) {
263  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
264  O_RDWR | O_CREAT,
265  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
266  close(eor_fd);
267  }
269  eventsThisLumi_ = 0;
271  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
272  return Next::kStop;
273  }
275  //this is not reachable
276  return Next::kEvent;
277  }
279  //std::cout << "--------------NEW LUMI---------------" << std::endl;
280  return Next::kEvent;
281  }
282  default: {
283  if (!getLSFromFilename_) {
284  //get new lumi from file header
285  if (event_->lumi() > currentLumiSection_) {
287  eventsThisLumi_ = 0;
289  }
290  }
293  else
294  eventRunNumber_ = event_->run();
295  L1EventID_ = event_->event();
296 
297  setEventCached();
298 
299  return Next::kEvent;
300  }
301  }
302 }
303 
304 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
305  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
306  if (!useFileBroker_) {
307  if (currentLumiSection_ > 0) {
309  struct stat buf;
310  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
311  if (!found) {
313  int eol_fd =
314  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
315  close(eol_fd);
316  daqDirector_->createBoLSFile(lumiSection, false);
318  }
319  } else
320  daqDirector_->createBoLSFile(lumiSection, true); //needed for initial lumisection
321  }
322 
323  currentLumiSection_ = lumiSection;
324 
326 
327  timeval tv;
328  gettimeofday(&tv, nullptr);
329  const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
330 
332  runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
333 
334  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
335  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
336 
337  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
338  }
339 }
340 
343  while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
344  if (edm::shutdown_flag.load(std::memory_order_relaxed))
345  break;
346  }
347  return status;
348 }
349 
351  if (setExceptionState_)
352  threadError();
353  if (!currentFile_.get()) {
356  if (!fileQueue_.try_pop(currentFile_)) {
357  //sleep until wakeup (only in single-buffer mode) or timeout
358  std::unique_lock<std::mutex> lkw(mWakeup_);
359  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
361  }
362  status = currentFile_->status_;
363  if (status == evf::EvFDaqDirector::runEnded) {
365  currentFile_.reset();
366  return status;
367  } else if (status == evf::EvFDaqDirector::runAbort) {
368  throw cms::Exception("FedRawDataInputSource::getNextEvent")
369  << "Run has been aborted by the input source reader thread";
370  } else if (status == evf::EvFDaqDirector::newLumi) {
372  if (getLSFromFilename_) {
373  if (currentFile_->lumi_ > currentLumiSection_) {
375  eventsThisLumi_ = 0;
377  }
378  } else { //let this be picked up from next event
380  }
381  currentFile_.reset();
382  return status;
383  } else if (status == evf::EvFDaqDirector::newFile) {
385  } else
386  assert(false);
387  }
389 
390  //file is empty
391  if (!currentFile_->fileSize_) {
393  //try to open new lumi
394  assert(currentFile_->nChunks_ == 0);
395  if (getLSFromFilename_)
396  if (currentFile_->lumi_ > currentLumiSection_) {
398  eventsThisLumi_ = 0;
400  }
401  //immediately delete empty file
402  currentFile_.reset();
404  }
405 
406  //file is finished
407  if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
409  //release last chunk (it is never released elsewhere)
410  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
411  if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
412  throw cms::Exception("FedRawDataInputSource::getNextEvent")
413  << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
414  << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
415  }
416  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
417  if (singleBufferMode_) {
418  std::unique_lock<std::mutex> lkw(mWakeup_);
419  cvWakeup_.notify_one();
420  }
421  bufferInputRead_ = 0;
423  //put the file in pending delete list;
424  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
425  filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
426  } else {
427  //in single-thread and stream jobs, events are already processed
428  currentFile_.reset();
429  }
431  }
432 
433  //handle RAW file header
434  if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
435  if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
436  if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
437  throw cms::Exception("FedRawDataInputSource::getNextEvent")
438  << "Premature end of input file while reading file header";
439 
440  edm::LogWarning("FedRawDataInputSource")
441  << "File with only raw header and no events received in LS " << currentFile_->lumi_;
442  if (getLSFromFilename_)
443  if (currentFile_->lumi_ > currentLumiSection_) {
445  eventsThisLumi_ = 0;
447  }
448  }
449 
450  //advance buffer position to skip file header (chunk will be acquired later)
451  currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
452  currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
453  }
454 
455  //file is too short
456  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
457  throw cms::Exception("FedRawDataInputSource::getNextEvent")
458  << "Premature end of input file while reading event header";
459  }
460  if (singleBufferMode_) {
461  //should already be there
463  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
464  usleep(10000);
465  if (currentFile_->parent_->exceptionState() || setExceptionState_)
466  currentFile_->parent_->threadError();
467  }
469 
470  unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
471 
472  //conditions when read amount is not sufficient for the header to fit
473  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_] ||
474  eventChunkSize_ - currentFile_->chunkPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
476 
477  if (detectedFRDversion_ == 0) {
478  detectedFRDversion_ = *((uint16_t*)dataPosition);
479  if (detectedFRDversion_ > FRDHeaderMaxVersion)
480  throw cms::Exception("FedRawDataInputSource::getNextEvent")
481  << "Unknown FRD version -: " << detectedFRDversion_;
482  assert(detectedFRDversion_ >= 1);
483  }
484 
485  //recalculate chunk position
486  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
487  if (bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]) {
488  throw cms::Exception("FedRawDataInputSource::getNextEvent")
489  << "Premature end of input file while reading event header";
490  }
491  }
492 
493  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
494  if (event_->size() > eventChunkSize_) {
495  throw cms::Exception("FedRawDataInputSource::getNextEvent")
496  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
497  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
498  << " bytes";
499  }
500 
501  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
502 
503  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
504  throw cms::Exception("FedRawDataInputSource::getNextEvent")
505  << "Premature end of input file while reading event data";
506  }
507  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
509  //recalculate chunk position
510  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
511  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
512  }
513  currentFile_->bufferPosition_ += event_->size();
514  currentFile_->chunkPosition_ += event_->size();
515  //last chunk is released when this function is invoked next time
516 
517  }
518  //multibuffer mode:
519  else {
520  //wait for the current chunk to become added to the vector
522  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
523  usleep(10000);
524  if (setExceptionState_)
525  threadError();
526  }
528 
529  //check if header is at the boundary of two chunks
530  chunkIsFree_ = false;
531  unsigned char* dataPosition;
532 
533  //read header, copy it to a single chunk if necessary
534  bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
535 
536  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
537  if (event_->size() > eventChunkSize_) {
538  throw cms::Exception("FedRawDataInputSource::getNextEvent")
539  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
540  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
541  << " bytes";
542  }
543 
544  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
545 
546  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
547  throw cms::Exception("FedRawDataInputSource::getNextEvent")
548  << "Premature end of input file while reading event data";
549  }
550 
551  if (chunkEnd) {
552  //header was at the chunk boundary, we will have to move payload as well
553  currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
554  chunkIsFree_ = true;
555  } else {
556  //header was contiguous, but check if payload fits the chunk
557  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
558  //rewind to header start position
559  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
560  //copy event to a chunk start and move pointers
561 
563 
564  chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
565 
567 
568  assert(chunkEnd);
569  chunkIsFree_ = true;
570  //header is moved
571  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
572  } else {
573  //everything is in a single chunk, only move pointers forward
574  chunkEnd = currentFile_->advance(dataPosition, msgSize);
575  assert(!chunkEnd);
576  chunkIsFree_ = false;
577  }
578  }
579  } //end multibuffer mode
581 
582  if (verifyChecksum_ && event_->version() >= 5) {
583  uint32_t crc = 0;
584  crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
585  if (crc != event_->crc32c()) {
586  if (fms_)
588  throw cms::Exception("FedRawDataInputSource::getNextEvent")
589  << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
590  << crc;
591  }
592  } else if (verifyChecksum_ && event_->version() >= 3) {
593  uint32_t adler = adler32(0L, Z_NULL, 0);
594  adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
595 
596  if (adler != event_->adler32()) {
597  if (fms_)
599  throw cms::Exception("FedRawDataInputSource::getNextEvent")
600  << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
601  << adler;
602  }
603  }
605 
606  currentFile_->nProcessed_++;
607 
609 }
610 
613  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
614  bool tcdsInRange;
615  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);
616 
617  if (useL1EventID_) {
620  aux.setProcessHistoryID(processHistoryID_);
621  makeEvent(eventPrincipal, aux);
622  } else if (tcds_pointer_ == nullptr) {
623  if (!GTPEventID_) {
624  throw cms::Exception("FedRawDataInputSource::read")
625  << "No TCDS or GTP FED in event with FEDHeader EID -: " << L1EventID_;
626  }
629  aux.setProcessHistoryID(processHistoryID_);
630  makeEvent(eventPrincipal, aux);
631  } else {
632  const FEDHeader fedHeader(tcds_pointer_);
633  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
638  event_->isRealData(),
639  static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
640  processGUID(),
642  !tcdsInRange);
644  makeEvent(eventPrincipal, aux);
645  }
646 
647  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
648 
650 
651  eventsThisLumi_++;
653 
654  //resize vector if needed
655  while (streamFileTracker_.size() <= eventPrincipal.streamID())
656  streamFileTracker_.push_back(-1);
657 
658  streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
659 
660  //this old file check runs no more often than every 10 events
661  if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
662  //delete files that are not in processing
663  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
664  auto it = filesToDelete_.begin();
665  while (it != filesToDelete_.end()) {
666  bool fileIsBeingProcessed = false;
667  for (unsigned int i = 0; i < nStreams_; i++) {
668  if (it->first == streamFileTracker_.at(i)) {
669  fileIsBeingProcessed = true;
670  break;
671  }
672  }
673  if (!fileIsBeingProcessed) {
674  std::string fileToDelete = it->second->fileName_;
675  it = filesToDelete_.erase(it);
676  } else
677  it++;
678  }
679  }
680  if (chunkIsFree_)
681  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
682  chunkIsFree_ = false;
684  return;
685 }
686 
688  edm::TimeValue_t time;
689  timeval stv;
690  gettimeofday(&stv, nullptr);
691  time = stv.tv_sec;
692  time = (time << 32) + stv.tv_usec;
693  edm::Timestamp tstamp(time);
694 
695  uint32_t eventSize = event_->eventSize();
696  unsigned char* event = (unsigned char*)event_->payload();
697  GTPEventID_ = 0;
698  tcds_pointer_ = nullptr;
699  tcdsInRange = false;
700  uint16_t selectedTCDSFed = 0;
701  while (eventSize > 0) {
702  assert(eventSize >= FEDTrailer::length);
703  eventSize -= FEDTrailer::length;
704  const FEDTrailer fedTrailer(event + eventSize);
705  const uint32_t fedSize = fedTrailer.fragmentLength() << 3; //trailer length counts in 8 bytes
706  assert(eventSize >= fedSize - FEDHeader::length);
707  eventSize -= (fedSize - FEDHeader::length);
708  const FEDHeader fedHeader(event + eventSize);
709  const uint16_t fedId = fedHeader.sourceID();
710  if (fedId > FEDNumbering::MAXFEDID) {
711  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
712  } else if (fedId >= MINTCDSuTCAFEDID_ && fedId <= MAXTCDSuTCAFEDID_) {
713  if (!selectedTCDSFed) {
714  selectedTCDSFed = fedId;
715  tcds_pointer_ = event + eventSize;
717  tcdsInRange = true;
718  }
719  } else
720  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
721  << "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
722  }
723  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
724  if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
725  GTPEventID_ = evf::evtn::get(event + eventSize, true);
726  else
727  GTPEventID_ = evf::evtn::get(event + eventSize, false);
728  //evf::evtn::evm_board_setformat(fedSize);
729  const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
730  const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
731  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
732  }
733  //take event ID from GTPE FED
734  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_ == 0) {
735  if (evf::evtn::gtpe_board_sense(event + eventSize)) {
736  GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
737  }
738  }
739  FEDRawData& fedData = rawData.FEDData(fedId);
740  fedData.resize(fedSize);
741  memcpy(fedData.data(), event + eventSize, fedSize);
742  }
743  assert(eventSize == 0);
744 
745  return tstamp;
746 }
747 
749 
751  bool stop = false;
752  unsigned int currentLumiSection = 0;
753  //threadInit_.exchange(true,std::memory_order_acquire);
754 
755  {
756  std::unique_lock<std::mutex> lk(startupLock_);
757  startupCv_.notify_one();
758  }
759 
760  uint32_t ls = 0;
761  uint32_t monLS = 1;
762  uint32_t lockCount = 0;
763  uint64_t sumLockWaitTimeUs = 0.;
764 
765  while (!stop) {
766  //wait for at least one free thread and chunk
767  int counter = 0;
768 
769  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
771  //report state to monitoring
772  if (fms_) {
773  bool copy_active = false;
774  for (auto j : tid_active_)
775  if (j)
776  copy_active = true;
779  else if (freeChunks_.empty()) {
780  if (copy_active)
782  else
784  } else {
785  if (copy_active)
787  else
789  }
790  }
791  std::unique_lock<std::mutex> lkw(mWakeup_);
792  //sleep until woken up by condition or a timeout
793  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
794  counter++;
795  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
796  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
797  } else {
798  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
799  }
800  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
801  stop = true;
802  break;
803  }
804  }
805  //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
806 
807  if (stop)
808  break;
809 
810  //look for a new file
811  std::string nextFile;
812  uint32_t fileSizeIndex;
813  int64_t fileSizeFromMetadata;
814 
815  if (fms_) {
818  }
819 
821  uint16_t rawHeaderSize = 0;
822  uint32_t lsFromRaw = 0;
823  int32_t serverEventsInNewFile = -1;
824  int rawFd = -1;
825 
826  int backoff_exp = 0;
827 
828  //entering loop which tries to grab new file from ramdisk
829  while (status == evf::EvFDaqDirector::noFile) {
830  //check if hltd has signalled to throttle input
831  counter = 0;
832  while (daqDirector_->inputThrottled()) {
833  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
834  break;
836  if (!(counter % 50))
837  edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
838  usleep(100000);
839  counter++;
840  }
841 
842  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
843  stop = true;
844  break;
845  }
846 
847  assert(rawFd == -1);
848  uint64_t thisLockWaitTimeUs = 0.;
850  if (fileListMode_) {
851  //return LS if LS not set, otherwise return file
852  status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
853  if (status == evf::EvFDaqDirector::newFile) {
855  rawFd,
856  rawHeaderSize,
857  lsFromRaw,
858  serverEventsInNewFile,
859  fileSizeFromMetadata,
860  false,
861  false,
862  false) != 0) {
863  //error
864  setExceptionState_ = true;
865  stop = true;
866  break;
867  }
868  if (!getLSFromFilename_)
869  ls = lsFromRaw;
870  }
871  } else if (!useFileBroker_)
872  status = daqDirector_->updateFuLock(
873  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
874  else {
875  status = daqDirector_->getNextFromFileBroker(currentLumiSection,
876  ls,
877  nextFile,
878  rawFd,
879  rawHeaderSize,
880  serverEventsInNewFile,
881  fileSizeFromMetadata,
882  thisLockWaitTimeUs);
883  }
884 
886 
887  //cycle through all remaining LS even if no files get assigned
888  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
890 
891  //monitoring of lock wait time
892  if (thisLockWaitTimeUs > 0.)
893  sumLockWaitTimeUs += thisLockWaitTimeUs;
894  lockCount++;
895  if (ls > monLS) {
896  monLS = ls;
897  if (lockCount)
898  if (fms_)
899  fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
900  lockCount = 0;
901  sumLockWaitTimeUs = 0;
902  }
903 
904  //check again for any remaining index/EoLS files after EoR file is seen
907  usleep(100000);
908  //now all files should have appeared in ramdisk, check again if any raw files were left behind
909  status = daqDirector_->updateFuLock(
910  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
911  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
913  }
914 
915  if (status == evf::EvFDaqDirector::runEnded) {
916  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
917  fileQueue_.push(std::move(inf));
918  stop = true;
919  break;
920  }
921 
922  //error from filelocking function
923  if (status == evf::EvFDaqDirector::runAbort) {
924  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
925  fileQueue_.push(std::move(inf));
926  stop = true;
927  break;
928  }
929  //queue new lumisection
930  if (getLSFromFilename_) {
931  if (ls > currentLumiSection) {
932  if (!useFileBroker_) {
933  //file locking
934  //setMonStateSup(inSupNewLumi);
935  currentLumiSection = ls;
936  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
937  fileQueue_.push(std::move(inf));
938  } else {
939  //new file service
940  if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
942  //start transitions from LS specified by env, continue if not reached
943  if (ls < daqDirector_->getStartLumisectionFromEnv()) {
944  //skip file if from earlier LS than specified by env
945  if (rawFd != -1) {
946  close(rawFd);
947  rawFd = -1;
948  }
950  continue;
951  } else {
952  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
953  fileQueue_.push(std::move(inf));
954  }
955  } else if (ls < 100) {
956  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
957  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
958 
959  for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
960  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
961  fileQueue_.push(std::move(inf));
962  }
963  } else {
964  //start from current LS
965  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
966  fileQueue_.push(std::move(inf));
967  }
968  } else {
969  //queue all lumisections after last one seen to avoid gaps
970  for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
971  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
972  fileQueue_.push(std::move(inf));
973  }
974  }
975  currentLumiSection = ls;
976  }
977  }
978  //else
979  if (currentLumiSection > 0 && ls < currentLumiSection) {
980  edm::LogError("FedRawDataInputSource")
981  << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
982  << ". Aborting execution." << std::endl;
983  if (rawFd != -1)
984  close(rawFd);
985  rawFd = -1;
986  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
987  fileQueue_.push(std::move(inf));
988  stop = true;
989  break;
990  }
991  }
992 
993  int dbgcount = 0;
994  if (status == evf::EvFDaqDirector::noFile) {
996  dbgcount++;
997  if (!(dbgcount % 20))
998  LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
999  if (!useFileBroker_)
1000  usleep(100000);
1001  else {
1002  backoff_exp = std::min(4, backoff_exp); // max 1.6 seconds
1003  //backoff_exp=0; // disabled!
1004  int sleeptime = (int)(100000. * pow(2, backoff_exp));
1005  usleep(sleeptime);
1006  backoff_exp++;
1007  }
1008  } else
1009  backoff_exp = 0;
1010  }
1011  //end of file grab loop, parse result
1012  if (status == evf::EvFDaqDirector::newFile) {
1014  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1015 
1016  std::string rawFile;
1017  //file service will report raw extension
1018  if (useFileBroker_ || rawHeaderSize)
1019  rawFile = nextFile;
1020  else {
1021  std::filesystem::path rawFilePath(nextFile);
1022  rawFile = rawFilePath.replace_extension(".raw").string();
1023  }
1024 
1025  struct stat st;
1026  int stat_res = stat(rawFile.c_str(), &st);
1027  if (stat_res == -1) {
1028  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
1029  setExceptionState_ = true;
1030  break;
1031  }
1032  uint64_t fileSize = st.st_size;
1033 
1034  if (fms_) {
1038  }
1039  int eventsInNewFile;
1040  if (fileListMode_) {
1041  if (fileSize == 0)
1042  eventsInNewFile = 0;
1043  else
1044  eventsInNewFile = -1;
1045  } else {
1047  if (!useFileBroker_) {
1048  if (rawHeaderSize) {
1049  int rawFdEmpty = -1;
1050  uint16_t rawHeaderCheck;
1051  bool fileFound;
1052  eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
1053  nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
1054  assert(fileFound && rawHeaderCheck == rawHeaderSize);
1056  } else
1057  eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1058  } else
1059  eventsInNewFile = serverEventsInNewFile;
1060  assert(eventsInNewFile >= 0);
1061  assert((eventsInNewFile > 0) ==
1062  (fileSize > rawHeaderSize)); //file without events must be empty or contain only header
1063  }
1064 
1065  if (!singleBufferMode_) {
1066  //calculate number of needed chunks
1067  unsigned int neededChunks = fileSize / eventChunkSize_;
1068  if (fileSize % eventChunkSize_)
1069  neededChunks++;
1070 
1071  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1072  ls,
1073  rawFile,
1074  !fileListMode_,
1075  rawFd,
1076  fileSize,
1077  rawHeaderSize,
1078  neededChunks,
1079  eventsInNewFile,
1080  this));
1082  auto newInputFilePtr = newInputFile.get();
1083  fileQueue_.push(std::move(newInputFile));
1084 
1085  for (unsigned int i = 0; i < neededChunks; i++) {
1086  if (fms_) {
1087  bool copy_active = false;
1088  for (auto j : tid_active_)
1089  if (j)
1090  copy_active = true;
1091  if (copy_active)
1093  else
1095  }
1096  //get thread
1097  unsigned int newTid = 0xffffffff;
1098  while (!workerPool_.try_pop(newTid)) {
1099  usleep(100000);
1100  if (quit_threads_.load(std::memory_order_relaxed)) {
1101  stop = true;
1102  break;
1103  }
1104  }
1105 
1106  if (fms_) {
1107  bool copy_active = false;
1108  for (auto j : tid_active_)
1109  if (j)
1110  copy_active = true;
1111  if (copy_active)
1113  else
1115  }
1116  InputChunk* newChunk = nullptr;
1117  while (!freeChunks_.try_pop(newChunk)) {
1118  usleep(100000);
1119  if (quit_threads_.load(std::memory_order_relaxed)) {
1120  stop = true;
1121  break;
1122  }
1123  }
1124 
1125  if (newChunk == nullptr) {
1126  //return unused tid if we received shutdown (nullptr chunk)
1127  if (newTid != 0xffffffff)
1128  workerPool_.push(newTid);
1129  stop = true;
1130  break;
1131  }
1132  if (stop)
1133  break;
1135 
1136  std::unique_lock<std::mutex> lk(mReader_);
1137 
1138  unsigned int toRead = eventChunkSize_;
1139  if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1140  toRead = fileSize % eventChunkSize_;
1141  newChunk->reset(i * eventChunkSize_, toRead, i);
1142 
1143  workerJob_[newTid].first = newInputFilePtr;
1144  workerJob_[newTid].second = newChunk;
1145 
1146  //wake up the worker thread
1147  cvReader_[newTid]->notify_one();
1148  }
1149  } else {
1150  if (!eventsInNewFile) {
1151  if (rawFd) {
1152  close(rawFd);
1153  rawFd = -1;
1154  }
1155  //still queue file for lumi update
1156  std::unique_lock<std::mutex> lkw(mWakeup_);
1157  //TODO: also file with only file header fits in this edge case. Check if read correctly in single buffer mode
1158  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1159  ls,
1160  rawFile,
1161  !fileListMode_,
1162  rawFd,
1163  fileSize,
1164  rawHeaderSize,
1165  (rawHeaderSize > 0),
1166  0,
1167  this));
1169  fileQueue_.push(std::move(newInputFile));
1170  cvWakeup_.notify_one();
1171  break;
1172  }
1173  //in single-buffer mode put single chunk in the file and let the main thread read the file
1174  InputChunk* newChunk;
1175  //should be available immediately
1176  while (!freeChunks_.try_pop(newChunk)) {
1177  usleep(100000);
1178  if (quit_threads_.load(std::memory_order_relaxed))
1179  break;
1180  }
1181 
1182  std::unique_lock<std::mutex> lkw(mWakeup_);
1183 
1184  unsigned int toRead = eventChunkSize_;
1185  if (fileSize % eventChunkSize_)
1186  toRead = fileSize % eventChunkSize_;
1187  newChunk->reset(0, toRead, 0);
1188  newChunk->readComplete_ = true;
1189 
1190  //push file and wakeup main thread
1191  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1192  ls,
1193  rawFile,
1194  !fileListMode_,
1195  rawFd,
1196  fileSize,
1197  rawHeaderSize,
1198  1,
1199  eventsInNewFile,
1200  this));
1201  newInputFile->chunks_[0] = newChunk;
1203  fileQueue_.push(std::move(newInputFile));
1204  cvWakeup_.notify_one();
1205  }
1206  }
1207  }
1209  //make sure threads finish reading
1210  unsigned numFinishedThreads = 0;
1211  while (numFinishedThreads < workerThreads_.size()) {
1212  unsigned tid = 0;
1213  while (!workerPool_.try_pop(tid)) {
1214  usleep(10000);
1215  }
1216  std::unique_lock<std::mutex> lk(mReader_);
1217  thread_quit_signal[tid] = true;
1218  cvReader_[tid]->notify_one();
1219  numFinishedThreads++;
1220  }
1221  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1222  workerThreads_[i]->join();
1223  delete workerThreads_[i];
1224  }
1225 }
1226 
1227 void FedRawDataInputSource::readWorker(unsigned int tid) {
1228  bool init = true;
1229  threadInit_.exchange(true, std::memory_order_acquire);
1230 
1231  while (true) {
1232  tid_active_[tid] = false;
1233  std::unique_lock<std::mutex> lk(mReader_);
1234  workerJob_[tid].first = nullptr;
1235  workerJob_[tid].first = nullptr;
1236 
1237  assert(!thread_quit_signal[tid]); //should never get it here
1238  workerPool_.push(tid);
1239 
1240  if (init) {
1241  std::unique_lock<std::mutex> lk(startupLock_);
1242  init = false;
1243  startupCv_.notify_one();
1244  }
1245  cvReader_[tid]->wait(lk);
1246 
1247  if (thread_quit_signal[tid])
1248  return;
1249  tid_active_[tid] = true;
1250 
1251  InputFile* file;
1252  InputChunk* chunk;
1253 
1254  assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1255 
1256  file = workerJob_[tid].first;
1257  chunk = workerJob_[tid].second;
1258 
1259  //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1260  unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1261 
1262  //if only one worker thread exists, use single fd for all operations
1263  //if more worker threads exist, use rawFd_ for only the first read operation and then close file
1264  int fileDescriptor;
1265  bool fileOpenedHere = false;
1266 
1267  if (numConcurrentReads_ == 1) {
1268  fileDescriptor = file->rawFd_;
1269  if (fileDescriptor == -1) {
1270  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1271  fileOpenedHere = true;
1272  file->rawFd_ = fileDescriptor;
1273  }
1274  } else {
1275  if (chunk->offset_ == 0) {
1276  fileDescriptor = file->rawFd_;
1277  file->rawFd_ = -1;
1278  if (fileDescriptor == -1) {
1279  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1280  fileOpenedHere = true;
1281  }
1282  } else {
1283  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1284  fileOpenedHere = true;
1285  }
1286  }
1287 
1288  if (fileDescriptor < 0) {
1289  edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1290  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1291  setExceptionState_ = true;
1292  continue;
1293  }
1294  if (fileOpenedHere) { //fast forward to this chunk position
1295  off_t pos = 0;
1296  pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1297  if (pos == -1) {
1298  edm::LogError("FedRawDataInputSource")
1299  << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1300  << chunk->offset_ << " error: " << strerror(errno);
1301  setExceptionState_ = true;
1302  continue;
1303  }
1304  }
1305 
1306  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1307  << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1308 
1309  unsigned int skipped = bufferLeft;
1311  for (unsigned int i = 0; i < readBlocks_; i++) {
1312  ssize_t last;
1313 
1314  //protect against reading into next block
1315  last = ::read(
1316  fileDescriptor, (void*)(chunk->buf_ + bufferLeft), std::min(chunk->usedSize_ - bufferLeft, eventChunkBlock_));
1317 
1318  if (last < 0) {
1319  edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1320  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1321  setExceptionState_ = true;
1322  break;
1323  }
1324  if (last > 0)
1325  bufferLeft += last;
1326  if (last < eventChunkBlock_) { //last read
1327  //check if this is last block, then total read size must match file size
1328  if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + last)) {
1329  edm::LogError("FedRawDataInputSource")
1330  << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1331  << " expectedChunkSize:" << chunk->usedSize_
1332  << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1333  << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1334  setExceptionState_ = true;
1335  }
1336  break;
1337  }
1338  }
1339  if (setExceptionState_)
1340  continue;
1341 
1343  auto diff = end - start;
1344  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1345  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1346  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1347  << " GB/s)";
1348 
1349  if (chunk->offset_ + bufferLeft == file->fileSize_) { //file reading finished using same fd
1350  close(fileDescriptor);
1351  fileDescriptor = -1;
1352  if (numConcurrentReads_ == 1)
1353  file->rawFd_ = -1;
1354  }
1355  if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1356  close(fileDescriptor);
1357 
1358  //detect FRD event version. Skip file Header if it exists
1359  if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1360  detectedFRDversion_ = *((uint16_t*)(chunk->buf_ + file->rawHeaderSize_));
1361  }
1363  chunk->readComplete_ =
1364  true; //this is atomic to secure the sequential buffer fill before becoming available for processing)
1365  file->chunks_[chunk->fileIndex_] = chunk; //put the completed chunk in the file chunk vector at predetermined index
1366  }
1367 }
1368 
1370  quit_threads_ = true;
1371  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1372 }
1373 
1375  if (fms_)
1376  fms_->setInState(state);
1377 }
1378 
1380  if (fms_)
1381  fms_->setInStateSup(state);
1382 }
1383 
1384 inline bool InputFile::advance(unsigned char*& dataPosition, const size_t size) {
1385  //wait for chunk
1386 
1387  while (!waitForChunk(currentChunk_)) {
1389  usleep(100000);
1391  if (parent_->exceptionState())
1392  parent_->threadError();
1393  }
1394 
1395  dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1396  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1397 
1398  if (currentLeft < size) {
1399  //we need next chunk
1400  while (!waitForChunk(currentChunk_ + 1)) {
1402  usleep(100000);
1404  if (parent_->exceptionState())
1405  parent_->threadError();
1406  }
1407  //copy everything to beginning of the first chunk
1408  dataPosition -= chunkPosition_;
1409  assert(dataPosition == chunks_[currentChunk_]->buf_);
1410  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1411  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1412  //set pointers at the end of the old data position
1413  bufferPosition_ += size;
1414  chunkPosition_ = size - currentLeft;
1415  currentChunk_++;
1416  return true;
1417  } else {
1418  chunkPosition_ += size;
1419  bufferPosition_ += size;
1420  return false;
1421  }
1422 }
1423 
1424 inline void InputFile::moveToPreviousChunk(const size_t size, const size_t offset) {
1425  //this will fail in case of events that are too large
1426  assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1427  assert(size - offset < chunks_[currentChunk_]->size_);
1428  memcpy(chunks_[currentChunk_ - 1]->buf_ + offset, chunks_[currentChunk_]->buf_ + chunkPosition_, size);
1429  chunkPosition_ += size;
1430  bufferPosition_ += size;
1431 }
1432 
1433 inline void InputFile::rewindChunk(const size_t size) {
1434  chunkPosition_ -= size;
1435  bufferPosition_ -= size;
1436 }
1437 
1439  if (rawFd_ != -1)
1440  close(rawFd_);
1441 
1442  if (deleteFile_ && !fileName_.empty()) {
1444  try {
1445  //sometimes this fails but file gets deleted
1446  LogDebug("FedRawDataInputSource:InputFile") << "Deleting input file -:" << fileName_;
1447  std::filesystem::remove(filePath);
1448  return;
1449  } catch (const std::filesystem::filesystem_error& ex) {
1450  edm::LogError("FedRawDataInputSource:InputFile")
1451  << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() << ". Trying again.";
1452  } catch (std::exception& ex) {
1453  edm::LogError("FedRawDataInputSource:InputFile")
1454  << " - deleteFile std::exception CAUGHT -: " << ex.what() << ". Trying again.";
1455  }
1456  std::filesystem::remove(filePath);
1457  }
1458 }
1459 
1460 //single-buffer mode file reading
1462  uint32_t existingSize = 0;
1463 
1464  if (fileDescriptor_ < 0) {
1465  bufferInputRead_ = 0;
1466  if (file->rawFd_ == -1) {
1467  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1468  if (file->rawHeaderSize_)
1469  lseek(fileDescriptor_, file->rawHeaderSize_, SEEK_SET);
1470  } else
1471  fileDescriptor_ = file->rawFd_;
1472 
1473  //skip header size in destination buffer (chunk position was already adjusted)
1475  existingSize += file->rawHeaderSize_;
1476 
1477  if (fileDescriptor_ >= 0)
1478  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1479  else {
1480  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1481  << "failed to open file " << std::endl
1482  << file->fileName_ << " fd:" << fileDescriptor_;
1483  }
1484  //fill chunk (skipping file header if present)
1485  for (unsigned int i = 0; i < readBlocks_; i++) {
1486  const ssize_t last = ::read(fileDescriptor_,
1487  (void*)(file->chunks_[0]->buf_ + existingSize),
1488  eventChunkBlock_ - (i == readBlocks_ - 1 ? existingSize : 0));
1490  existingSize += last;
1491  }
1492 
1493  } else {
1494  //continue reading
1495  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1496  for (unsigned int i = 0; i < readBlocks_; i++) {
1497  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1499  existingSize += last;
1500  }
1501  } else {
1502  //event didn't fit in last chunk, so leftover must be moved to the beginning and completed
1503  uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1504  memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1505 
1506  //calculate amount of data that can be added
1507  const uint32_t blockcount = file->chunkPosition_ / eventChunkBlock_;
1508  const uint32_t leftsize = file->chunkPosition_ % eventChunkBlock_;
1509 
1510  for (uint32_t i = 0; i < blockcount; i++) {
1511  const ssize_t last =
1512  ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1514  existingSizeLeft += last;
1515  }
1516  if (leftsize) {
1517  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1519  }
1520  file->chunkPosition_ = 0; //data was moved to beginning of the chunk
1521  }
1522  }
1523  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1524  if (fileDescriptor_ != -1) {
1525  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1526  close(fileDescriptor_);
1527  file->rawFd_ = fileDescriptor_ = -1;
1528  }
1529  }
1530 }
1531 
1533  std::lock_guard<std::mutex> lock(monlock_);
1534  auto itr = sourceEventsReport_.find(lumi);
1535  if (itr != sourceEventsReport_.end())
1536  itr->second += events;
1537  else
1538  sourceEventsReport_[lumi] = events;
1539 }
1540 
1541 std::pair<bool, unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase) {
1542  std::lock_guard<std::mutex> lock(monlock_);
1543  auto itr = sourceEventsReport_.find(lumi);
1544  if (itr != sourceEventsReport_.end()) {
1545  std::pair<bool, unsigned int> ret(true, itr->second);
1546  if (erase)
1547  sourceEventsReport_.erase(itr);
1548  return ret;
1549  } else
1550  return std::pair<bool, unsigned int>(false, 0);
1551 }
1552 
1554  std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1555  if (a.rfind('/') != std::string::npos)
1556  a = a.substr(a.rfind('/'));
1557  if (b.rfind('/') != std::string::npos)
1558  b = b.substr(b.rfind('/'));
1559  return b > a;
1560  });
1561 
1562  if (!fileNames_.empty()) {
1563  //get run number from first file in the vector
1565  std::string fileStem = fileName.stem().string();
1566  if (fileStem.find("file://") == 0)
1567  fileStem = fileStem.substr(7);
1568  else if (fileStem.find("file:") == 0)
1569  fileStem = fileStem.substr(5);
1570  auto end = fileStem.find('_');
1571 
1572  if (fileStem.find("run") == 0) {
1573  std::string runStr = fileStem.substr(3, end - 3);
1574  try {
1575  //get long to support test run numbers < 2^32
1576  long rval = std::stol(runStr);
1577  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1578  return rval;
1579  } catch (const std::exception&) {
1580  edm::LogWarning("FedRawDataInputSource")
1581  << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1582  }
1583  }
1584  }
1585  return -1;
1586 }
1587 
1589  std::string& nextFile,
1590  uint32_t& fsize,
1591  uint64_t& lockWaitTime) {
1592  if (fileListIndex_ < fileNames_.size()) {
1593  nextFile = fileNames_[fileListIndex_];
1594  if (nextFile.find("file://") == 0)
1595  nextFile = nextFile.substr(7);
1596  else if (nextFile.find("file:") == 0)
1597  nextFile = nextFile.substr(5);
1598  std::filesystem::path fileName = nextFile;
1599  std::string fileStem = fileName.stem().string();
1600  if (fileStem.find("ls"))
1601  fileStem = fileStem.substr(fileStem.find("ls") + 2);
1602  if (fileStem.find('_'))
1603  fileStem = fileStem.substr(0, fileStem.find('_'));
1604 
1605  if (!fileListLoopMode_)
1606  ls = std::stoul(fileStem);
1607  else //always starting from LS 1 in loop mode
1608  ls = 1 + loopModeIterationInc_;
1609 
1610  //fsize = 0;
1611  //lockWaitTime = 0;
1612  fileListIndex_++;
1614  } else {
1615  if (!fileListLoopMode_)
1617  else {
1618  //loop through files until interrupted
1620  fileListIndex_ = 0;
1621  return getFile(ls, nextFile, fsize, lockWaitTime);
1622  }
1623  }
1624 }
static const char runNumber_[]
void setComment(std::string const &value)
std::vector< std::string > fileNames_
tuple ret
prodAgent to be discontinued
unsigned int getgpshigh(const unsigned char *)
uint32_t chunkPosition_
std::condition_variable cvWakeup_
void read(edm::EventPrincipal &eventPrincipal) override
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:330
bool gtpe_board_sense(const unsigned char *p)
constexpr size_t FRDHeaderMaxVersion
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
Definition: Timestamp.h:82
uint8_t triggerType() const
Event Trigger type identifier.
Definition: FEDHeader.cc:13
bool crc32c_hw_test()
Definition: crc32c.cc:354
unsigned int offset_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
static const uint32_t length
Definition: FEDTrailer.h:57
tbb::concurrent_queue< unsigned int > workerPool_
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
static const uint32_t length
Definition: FEDHeader.h:54
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
friend struct InputFile
Open Root file and provide MEs ############.
void rewindChunk(const size_t size)
volatile std::atomic< bool > shutdown_flag
int init
Definition: HydjetWrapper.h:64
def ls
Definition: eostools.py:349
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
list status
Definition: mps_update.py:107
uint16_t sourceID() const
Identifier of the FED.
Definition: FEDHeader.cc:19
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:457
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
void createProcessingNotificationMaybe() const
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &rawData, bool &tcdsInRange)
std::pair< InputFile *, InputChunk * > ReaderInfo
uint16_t rawHeaderSize_
std::unique_ptr< std::thread > readSupervisorThread_
ProductProvenance const & dummyProvenance() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
Log< level::Error, false > LogError
int timeout
Definition: mps_check.py:53
assert(be >=bs)
std::vector< std::condition_variable * > cvReader_
uint32_t fragmentLength() const
The length of the event fragment counted in 64-bit words including header and trailer.
Definition: FEDTrailer.cc:13
FedRawDataInputSource * parent_
bool useFileBroker() const
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:195
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, bool isRealData, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection, bool suppressWarning)
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
U second(std::pair< T, U > const &p)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
static Timestamp beginOfTime()
Definition: Timestamp.h:84
void setComment(std::string const &value)
const std::vector< unsigned int > testTCDSFEDRange_
uint32_t bufferPosition_
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void resize(size_t newsize)
Definition: FEDRawData.cc:28
bool advance(unsigned char *&dataPosition, const size_t size)
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:337
std::vector< int > streamFileTracker_
void moveToPreviousChunk(const size_t size, const size_t offset)
unsigned char * buf_
if(conf_.getParameter< bool >("UseStripCablingDB"))
def move
Definition: eostools.py:511
def load
Definition: svgfig.py:547
string inf
Definition: EcalCondDB.py:96
StreamID streamID() const
BranchDescription const & branchDescription() const
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
void setMonStateSup(evf::FastMonState::InputState state)
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::map< unsigned int, unsigned int > sourceEventsReport_
edm::ProcessHistoryID processHistoryID_
T min(T a, T b)
Definition: MathUtil.h:58
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
bool waitForChunk(unsigned int chunkid)
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
list lumi
Definition: dqmdumpme.py:53
unsigned long long TimeValue_t
Definition: Timestamp.h:28
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
bool evm_board_sense(const unsigned char *p, size_t size)
void setMonState(evf::FastMonState::InputState state)
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:362
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
std::atomic< bool > readComplete_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:462
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
unsigned int getLumisectionToStart() const
void setInStateSup(FastMonState::InputState inputState)
unsigned long long uint64_t
Definition: Time.h:13
unsigned int currentChunk_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
double b
Definition: hdecay.h:118
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void setProcessHistoryID(ProcessHistoryID const &phid)
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:230
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:345
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:331
std::atomic< bool > threadInit_
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:333
tuple events
Definition: patZpeak.py:20
void readWorker(unsigned int tid)
double a
Definition: hdecay.h:119
static std::atomic< unsigned int > counter
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:24
string end
Definition: dataset.py:937
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
unsigned int getStartLumisectionFromEnv() const
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::FastMonitoringService * fms_
unsigned int gtpe_get(const unsigned char *)
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
std::string getEoLSFilePathOnFU(const unsigned int ls) const
constexpr std::array< uint32, FRDHeaderMaxVersion+1 > FRDHeaderVersionSize
tuple last
Definition: dqmdumpme.py:56
tbb::concurrent_queue< InputChunk * > freeChunks_
Log< level::Warning, false > LogWarning
evf::EvFDaqDirector::FileStatus getNextEvent()
evf::EvFDaqDirector::FileStatus nextEvent()
void readNextChunkIntoBuffer(InputFile *file)
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:233
void setFMS(evf::FastMonitoringService *fms)
std::unique_ptr< InputFile > currentFile_
unsigned int getgpslow(const unsigned char *)
tuple size
Write out results.
Power< A, B >::type pow(const A &a, const B &b)
Definition: Power.h:29
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::string getEoRFilePathOnFU() const
#define LogDebug(id)
void setInState(FastMonState::InputState inputState)