CMS 3D CMS Logo

FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <memory>
5 #include <sstream>
6 #include <sys/types.h>
7 #include <sys/file.h>
8 #include <sys/time.h>
9 #include <unistd.h>
10 #include <vector>
11 #include <fstream>
12 #include <zlib.h>
13 #include <cstdio>
14 #include <chrono>
15 
16 #include <boost/algorithm/string.hpp>
17 
21 
23 
30 
32 
34 
38 
40 
45 
46 //JSON file reader
48 
49 using namespace evf::FastMonState;
50 using namespace edm::streamer;
51 
53  : edm::RawInputSource(pset, desc),
54  defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
55  eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
56  eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
57  numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
58  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
59  getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
60  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
61  verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
62  useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
63  testTCDSFEDRange_(
64  pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())),
65  fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
66  fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
67  fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
68  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
69  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
70  eventID_(),
71  processHistoryID_(),
72  currentLumiSection_(0),
73  tcds_pointer_(nullptr),
74  eventsThisLumi_(0) {
75  char thishost[256];
76  gethostname(thishost, 255);
77  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
78  << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
79 
80  if (!testTCDSFEDRange_.empty()) {
81  if (testTCDSFEDRange_.size() != 2) {
82  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
83  << "Invalid TCDS Test FED range parameter";
84  }
87  }
88 
89  long autoRunNumber = -1;
90  if (fileListMode_) {
91  autoRunNumber = initFileList();
92  if (!fileListLoopMode_) {
93  if (autoRunNumber < 0)
94  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
95  //override run number
96  runNumber_ = (edm::RunNumber_t)autoRunNumber;
97  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
98  }
99  }
100 
102  setNewRun();
103  //todo:autodetect from file name (assert if names differ)
105 
106  //make sure that chunk size is N * block size
111 
112  if (!numBuffers_)
113  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
114  << "no reading enabled with numBuffers parameter 0";
115 
118  readingFilesCount_ = 0;
119 
120  if (!crc32c_hw_test())
121  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
122 
123  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
124  if (fileListMode_) {
125  try {
127  } catch (cms::Exception const&) {
128  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
129  }
130  } else {
132  if (!fms_) {
133  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
134  }
135  }
136 
138  if (!daqDirector_)
139  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
140 
142  if (useFileBroker_)
143  edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
144  //set DaqDirector to delete files in preGlobalEndLumi callback
146  if (fms_) {
148  fms_->setInputSource(this);
151  }
152  //should delete chunks when run stops
153  for (unsigned int i = 0; i < numBuffers_; i++) {
155  }
156 
157  quit_threads_ = false;
158 
159  //prepare data shared by threads
160  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
161  thread_quit_signal.push_back(false);
162  workerJob_.push_back(ReaderInfo(nullptr, nullptr));
163  cvReader_.push_back(std::make_unique<std::condition_variable>());
164  tid_active_.push_back(0);
165  }
166 
167  //start threads
168  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
169  //wait for each thread to complete initialization
170  std::unique_lock<std::mutex> lk(startupLock_);
171  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
172  startupCv_.wait(lk);
173  }
174 
175  runAuxiliary()->setProcessHistoryID(processHistoryID_);
176 }
177 
179  quit_threads_ = true;
180 
181  //delete any remaining open files
182  if (!fms_ || !fms_->exceptionDetected()) {
183  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
184  it->second.reset();
185  } else {
186  //skip deleting files with exception
187  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
188  //it->second->unsetDeleteFile();
189  if (fms_->isExceptionOnData(it->second->lumi_))
190  it->second->unsetDeleteFile();
191  else
192  it->second.reset();
193  }
194  //disable deleting current file with exception
195  if (currentFile_.get())
196  if (fms_->isExceptionOnData(currentFile_->lumi_))
197  currentFile_->unsetDeleteFile();
198  }
199 
201  readSupervisorThread_->join();
202  } else {
203  //join aux threads in case the supervisor thread was not started
204  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
205  std::unique_lock<std::mutex> lk(mReader_);
206  thread_quit_signal[i] = true;
207  cvReader_[i]->notify_one();
208  lk.unlock();
209  workerThreads_[i]->join();
210  delete workerThreads_[i];
211  }
212  }
213 }
214 
217  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
218  desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
219  desc.addUntracked<unsigned int>("eventChunkBlock", 32)
220  ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
221  desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
222  desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
223  ->setComment("Maximum number of simultaneously buffered raw files");
224  desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
225  ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
226  desc.addUntracked<bool>("verifyChecksum", true)
227  ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
228  desc.addUntracked<bool>("useL1EventID", false)
229  ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
230  desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
231  ->setComment("[min, max] range to search for TCDS FED ID in test setup");
232  desc.addUntracked<bool>("fileListMode", false)
233  ->setComment("Use fileNames parameter to directly specify raw files to open");
234  desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
235  ->setComment("file list used when fileListMode is enabled");
236  desc.setAllowAnything();
237  descriptions.add("source", desc);
238 }
239 
242  //this thread opens new files and dispatches reading to worker readers
243  std::unique_lock<std::mutex> lk(startupLock_);
244  readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
246  startupCv_.wait(lk);
247  }
248  //signal hltd to start event accounting
249  if (!currentLumiSection_)
252  switch (nextEvent()) {
254  //maybe create EoL file in working directory before ending run
255  struct stat buf;
256  if (!useFileBroker_ && currentLumiSection_ > 0) {
257  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
258  if (eolFound) {
260  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
261  if (!found) {
263  int eol_fd =
264  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
265  close(eol_fd);
267  }
268  }
269  }
270  //also create EoR file in FU data directory
271  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
272  if (!eorFound) {
273  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
274  O_RDWR | O_CREAT,
275  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
276  close(eor_fd);
277  }
279  eventsThisLumi_ = 0;
281  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
282  return Next::kStop;
283  }
285  //this is not reachable
286  return Next::kEvent;
287  }
289  //std::cout << "--------------NEW LUMI---------------" << std::endl;
290  return Next::kEvent;
291  }
292  default: {
293  if (!getLSFromFilename_) {
294  //get new lumi from file header
295  if (event_->lumi() > currentLumiSection_) {
297  eventsThisLumi_ = 0;
299  }
300  }
303  else
304  eventRunNumber_ = event_->run();
305  L1EventID_ = event_->event();
306 
307  setEventCached();
308 
309  return Next::kEvent;
310  }
311  }
312 }
313 
314 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection) {
315  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
316  if (!useFileBroker_) {
317  if (currentLumiSection_ > 0) {
319  struct stat buf;
320  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
321  if (!found) {
323  int eol_fd =
324  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
325  close(eol_fd);
326  daqDirector_->createBoLSFile(lumiSection, false);
328  }
329  } else
330  daqDirector_->createBoLSFile(lumiSection, true); //needed for initial lumisection
331  }
332 
333  currentLumiSection_ = lumiSection;
334 
336 
337  timeval tv;
338  gettimeofday(&tv, nullptr);
339  const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
340 
342  runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
343 
344  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
345  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
346 
347  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
348  }
349 }
350 
354  if (edm::shutdown_flag.load(std::memory_order_relaxed))
355  break;
356  }
357  return status;
358 }
359 
361  if (setExceptionState_)
362  threadError();
363  if (!currentFile_.get()) {
366  {
367  IdleSourceSentry ids(fms_);
368  if (!fileQueue_.try_pop(currentFile_)) {
369  //sleep until wakeup (only in single-buffer mode) or timeout
370  std::unique_lock<std::mutex> lkw(mWakeup_);
371  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
373  }
374  }
375  status = currentFile_->status_;
378  currentFile_.reset();
379  return status;
380  } else if (status == evf::EvFDaqDirector::runAbort) {
381  throw cms::Exception("FedRawDataInputSource::getNextEvent")
382  << "Run has been aborted by the input source reader thread";
383  } else if (status == evf::EvFDaqDirector::newLumi) {
385  if (getLSFromFilename_) {
386  if (currentFile_->lumi_ > currentLumiSection_) {
388  eventsThisLumi_ = 0;
390  }
391  } else { //let this be picked up from next event
393  }
394  currentFile_.reset();
395  return status;
396  } else if (status == evf::EvFDaqDirector::newFile) {
398  } else
399  assert(false);
400  }
402 
403  //file is empty
404  if (!currentFile_->fileSize_) {
406  //try to open new lumi
407  assert(currentFile_->nChunks_ == 0);
408  if (getLSFromFilename_)
409  if (currentFile_->lumi_ > currentLumiSection_) {
411  eventsThisLumi_ = 0;
413  }
414  //immediately delete empty file
415  currentFile_.reset();
417  }
418 
419  //file is finished
420  if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
422  //release last chunk (it is never released elsewhere)
423  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
424  if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
425  throw cms::Exception("FedRawDataInputSource::getNextEvent")
426  << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
427  << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
428  }
429  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
430  if (singleBufferMode_) {
431  std::unique_lock<std::mutex> lkw(mWakeup_);
432  cvWakeup_.notify_one();
433  }
434  bufferInputRead_ = 0;
436  //put the file in pending delete list;
437  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
438  filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
439  } else {
440  //in single-thread and stream jobs, events are already processed
441  currentFile_.reset();
442  }
444  }
445 
446  //handle RAW file header
447  if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
448  if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
449  if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
450  throw cms::Exception("FedRawDataInputSource::getNextEvent")
451  << "Premature end of input file while reading file header";
452 
453  edm::LogWarning("FedRawDataInputSource")
454  << "File with only raw header and no events received in LS " << currentFile_->lumi_;
455  if (getLSFromFilename_)
456  if (currentFile_->lumi_ > currentLumiSection_) {
458  eventsThisLumi_ = 0;
460  }
461  }
462 
463  //advance buffer position to skip file header (chunk will be acquired later)
464  currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
465  currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
466  }
467 
468  //file is too short
469  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
470  throw cms::Exception("FedRawDataInputSource::getNextEvent")
471  << "Premature end of input file while reading event header";
472  }
473  if (singleBufferMode_) {
474  //should already be there
476  {
477  IdleSourceSentry ids(fms_);
478  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
479  usleep(10000);
480  if (currentFile_->parent_->exceptionState() || setExceptionState_)
481  currentFile_->parent_->threadError();
482  }
483  }
485 
486  unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
487 
488  //conditions when read amount is not sufficient for the header to fit
492 
493  if (detectedFRDversion_ == 0) {
494  detectedFRDversion_ = *((uint16_t*)dataPosition);
496  throw cms::Exception("FedRawDataInputSource::getNextEvent")
497  << "Unknown FRD version -: " << detectedFRDversion_;
499  }
500 
501  //recalculate chunk position
502  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
504  throw cms::Exception("FedRawDataInputSource::getNextEvent")
505  << "Premature end of input file while reading event header";
506  }
507  }
508 
509  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
510  if (event_->size() > eventChunkSize_) {
511  throw cms::Exception("FedRawDataInputSource::getNextEvent")
512  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
513  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
514  << " bytes";
515  }
516 
517  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
518 
519  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
520  throw cms::Exception("FedRawDataInputSource::getNextEvent")
521  << "Premature end of input file while reading event data";
522  }
523  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
525  //recalculate chunk position
526  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
527  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
528  }
529  currentFile_->bufferPosition_ += event_->size();
530  currentFile_->chunkPosition_ += event_->size();
531  //last chunk is released when this function is invoked next time
532 
533  }
534  //multibuffer mode:
535  else {
536  //wait for the current chunk to become added to the vector
538  {
539  IdleSourceSentry ids(fms_);
540  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
541  usleep(10000);
542  if (setExceptionState_)
543  threadError();
544  }
545  }
547 
548  //check if header is at the boundary of two chunks
549  chunkIsFree_ = false;
550  unsigned char* dataPosition;
551 
552  //read header, copy it to a single chunk if necessary
554  throw cms::Exception("FedRawDataInputSource::getNextEvent")
555  << "Premature end of input file (missing:"
557  << ") while reading event data for next event header";
558  bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
559 
560  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
561  if (event_->size() > eventChunkSize_) {
562  throw cms::Exception("FedRawDataInputSource::getNextEvent")
563  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
564  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
565  << " bytes";
566  }
567 
568  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
569 
570  if (currentFile_->fileSizeLeft() < msgSize) {
571  throw cms::Exception("FedRawDataInputSource::getNextEvent")
572  << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
573  << ") while reading event data for event " << event_->event() << " lumi:" << event_->lumi();
574  }
575 
576  if (chunkEnd) {
577  //header was at the chunk boundary, we will have to move payload as well
578  currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
579  chunkIsFree_ = true;
580  } else {
581  //header was contiguous, but check if payload fits the chunk
582  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
583  //rewind to header start position
585  //copy event to a chunk start and move pointers
586 
588  {
589  IdleSourceSentry ids(fms_);
590  chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
591  }
593 
594  assert(chunkEnd);
595  chunkIsFree_ = true;
596  //header is moved
597  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
598  } else {
599  //everything is in a single chunk, only move pointers forward
600  chunkEnd = currentFile_->advance(dataPosition, msgSize);
601  assert(!chunkEnd);
602  chunkIsFree_ = false;
603  }
604  }
605  //sanity-check check that the buffer position has not exceeded file size after preparing event
606  if (currentFile_->fileSize_ < currentFile_->bufferPosition_) {
607  throw cms::Exception("FedRawDataInputSource::getNextEvent")
608  << "Exceeded file size by " << currentFile_->bufferPosition_ - currentFile_->fileSize_
609  << " after reading last event declared size of " << event_->size() << " bytes";
610  }
611  } //end multibuffer mode
613 
614  if (verifyChecksum_ && event_->version() >= 5) {
615  uint32_t crc = 0;
616  crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
617  if (crc != event_->crc32c()) {
618  if (fms_)
620  throw cms::Exception("FedRawDataInputSource::getNextEvent")
621  << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
622  << crc;
623  }
624  } else if (verifyChecksum_ && event_->version() >= 3) {
625  uint32_t adler = adler32(0L, Z_NULL, 0);
626  adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
627 
628  if (adler != event_->adler32()) {
629  if (fms_)
631  throw cms::Exception("FedRawDataInputSource::getNextEvent")
632  << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
633  << adler;
634  }
635  }
637 
638  currentFile_->nProcessed_++;
639 
641 }
642 
645  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
646  bool tcdsInRange;
647  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);
648 
649  if (useL1EventID_) {
652  aux.setProcessHistoryID(processHistoryID_);
653  makeEvent(eventPrincipal, aux);
654  } else if (tcds_pointer_ == nullptr) {
655  if (!GTPEventID_) {
656  throw cms::Exception("FedRawDataInputSource::read")
657  << "No TCDS or GTP FED in event with FEDHeader EID -: " << L1EventID_;
658  }
661  aux.setProcessHistoryID(processHistoryID_);
662  makeEvent(eventPrincipal, aux);
663  } else {
664  const FEDHeader fedHeader(tcds_pointer_);
665  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
670  event_->isRealData(),
671  static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
672  processGUID(),
674  !tcdsInRange);
675  aux.setProcessHistoryID(processHistoryID_);
676  makeEvent(eventPrincipal, aux);
677  }
678 
679  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
680 
682 
683  eventsThisLumi_++;
685 
686  //resize vector if needed
687  while (streamFileTracker_.size() <= eventPrincipal.streamID())
688  streamFileTracker_.push_back(-1);
689 
690  streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
691 
692  //this old file check runs no more often than every 10 events
693  if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
694  //delete files that are not in processing
695  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
696  auto it = filesToDelete_.begin();
697  while (it != filesToDelete_.end()) {
698  bool fileIsBeingProcessed = false;
699  for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
700  if (it->first == streamFileTracker_.at(i)) {
701  fileIsBeingProcessed = true;
702  break;
703  }
704  }
705  if (!fileIsBeingProcessed && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
706  std::string fileToDelete = it->second->fileName_;
707  it = filesToDelete_.erase(it);
708  } else
709  it++;
710  }
711  }
712  if (chunkIsFree_)
713  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
714  chunkIsFree_ = false;
716  return;
717 }
718 
721  timeval stv;
722  gettimeofday(&stv, nullptr);
723  time = stv.tv_sec;
724  time = (time << 32) + stv.tv_usec;
725  edm::Timestamp tstamp(time);
726 
727  uint32_t eventSize = event_->eventSize();
728  unsigned char* event = (unsigned char*)event_->payload();
729  GTPEventID_ = 0;
730  tcds_pointer_ = nullptr;
731  tcdsInRange = false;
732  uint16_t selectedTCDSFed = 0;
733  while (eventSize > 0) {
734  assert(eventSize >= FEDTrailer::length);
735  eventSize -= FEDTrailer::length;
736  const FEDTrailer fedTrailer(event + eventSize);
737  const uint32_t fedSize = fedTrailer.fragmentLength() << 3; //trailer length counts in 8 bytes
738  assert(eventSize >= fedSize - FEDHeader::length);
739  eventSize -= (fedSize - FEDHeader::length);
740  const FEDHeader fedHeader(event + eventSize);
741  const uint16_t fedId = fedHeader.sourceID();
743  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
744  } else if (fedId >= MINTCDSuTCAFEDID_ && fedId <= MAXTCDSuTCAFEDID_) {
745  if (!selectedTCDSFed) {
746  selectedTCDSFed = fedId;
747  tcds_pointer_ = event + eventSize;
749  tcdsInRange = true;
750  }
751  } else
752  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
753  << "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
754  }
756  if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
757  GTPEventID_ = evf::evtn::get(event + eventSize, true);
758  else
759  GTPEventID_ = evf::evtn::get(event + eventSize, false);
760  //evf::evtn::evm_board_setformat(fedSize);
761  const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
762  const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
763  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
764  }
765  //take event ID from GTPE FED
767  if (evf::evtn::gtpe_board_sense(event + eventSize)) {
768  GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
769  }
770  }
771  FEDRawData& fedData = rawData.FEDData(fedId);
772  fedData.resize(fedSize);
773  memcpy(fedData.data(), event + eventSize, fedSize);
774  }
775  assert(eventSize == 0);
776 
777  return tstamp;
778 }
779 
781 
783  bool stop = false;
784  unsigned int currentLumiSection = 0;
785 
786  {
787  std::unique_lock<std::mutex> lk(startupLock_);
788  startupCv_.notify_one();
789  }
790 
791  uint32_t ls = 0;
792  uint32_t monLS = 1;
793  uint32_t lockCount = 0;
794  uint64_t sumLockWaitTimeUs = 0.;
795 
796  while (!stop) {
797  //wait for at least one free thread and chunk
798  int counter = 0;
799 
800  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
802  //report state to monitoring
803  if (fms_) {
804  bool copy_active = false;
805  for (auto j : tid_active_)
806  if (j)
807  copy_active = true;
810  else if (freeChunks_.empty()) {
811  if (copy_active)
813  else
815  } else {
816  if (copy_active)
818  else
820  }
821  }
822  std::unique_lock<std::mutex> lkw(mWakeup_);
823  //sleep until woken up by condition or a timeout
824  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
825  counter++;
826  if (!(counter % 6000)) {
827  edm::LogWarning("FedRawDataInputSource")
828  << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
829  << ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
830  << " / " << maxBufferedFiles_;
831  }
832  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
833  } else {
834  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
835  }
836  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
837  stop = true;
838  break;
839  }
840  }
841  //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
842 
843  if (stop)
844  break;
845 
846  //look for a new file
847  std::string nextFile;
848  uint32_t fileSizeIndex;
849  int64_t fileSizeFromMetadata;
850 
851  if (fms_) {
854  }
855 
857  uint16_t rawHeaderSize = 0;
858  uint32_t lsFromRaw = 0;
859  int32_t serverEventsInNewFile = -1;
860  int rawFd = -1;
861 
862  int backoff_exp = 0;
863 
864  //entering loop which tries to grab new file from ramdisk
866  //check if hltd has signalled to throttle input
867  counter = 0;
868  while (daqDirector_->inputThrottled()) {
869  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
870  break;
871 
872  unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
873  unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
874  unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
875  bool hasDiscardedLumi = false;
876  for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
878  edm::LogWarning("FedRawDataInputSource") << "Source detected that the lumisection is discarded -: " << i;
879  hasDiscardedLumi = true;
880  break;
881  }
882  }
883  if (hasDiscardedLumi)
884  break;
885 
887 
888  if (!(counter % 50))
889  edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
890  usleep(100000);
891  counter++;
892  }
893 
894  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
895  stop = true;
896  break;
897  }
898 
899  assert(rawFd == -1);
900  uint64_t thisLockWaitTimeUs = 0.;
902  if (fileListMode_) {
903  //return LS if LS not set, otherwise return file
904  status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
906  uint16_t rawDataType;
908  rawFd,
909  rawHeaderSize,
910  rawDataType,
911  lsFromRaw,
912  serverEventsInNewFile,
913  fileSizeFromMetadata,
914  false,
915  false,
916  false) != 0) {
917  //error
918  setExceptionState_ = true;
919  stop = true;
920  break;
921  }
922  if (!getLSFromFilename_)
923  ls = lsFromRaw;
924  }
925  } else if (!useFileBroker_)
927  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
928  else {
929  status = daqDirector_->getNextFromFileBroker(currentLumiSection,
930  ls,
931  nextFile,
932  rawFd,
933  rawHeaderSize,
934  serverEventsInNewFile,
935  fileSizeFromMetadata,
936  thisLockWaitTimeUs);
937  }
938 
940 
941  //cycle through all remaining LS even if no files get assigned
942  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
944 
945  //monitoring of lock wait time
946  if (thisLockWaitTimeUs > 0.)
947  sumLockWaitTimeUs += thisLockWaitTimeUs;
948  lockCount++;
949  if (ls > monLS) {
950  monLS = ls;
951  if (lockCount)
952  if (fms_)
953  fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
954  lockCount = 0;
955  sumLockWaitTimeUs = 0;
956  }
957 
958  //check again for any remaining index/EoLS files after EoR file is seen
961  usleep(100000);
962  //now all files should have appeared in ramdisk, check again if any raw files were left behind
964  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
965  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
967  }
968 
970  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
971  fileQueue_.push(std::move(inf));
972  stop = true;
973  break;
974  }
975 
976  //error from filelocking function
978  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
979  fileQueue_.push(std::move(inf));
980  stop = true;
981  break;
982  }
983  //queue new lumisection
984  if (getLSFromFilename_) {
985  if (ls > currentLumiSection) {
986  if (!useFileBroker_) {
987  //file locking
988  //setMonStateSup(inSupNewLumi);
989  currentLumiSection = ls;
990  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
991  fileQueue_.push(std::move(inf));
992  } else {
993  //new file service
994  if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
996  //start transitions from LS specified by env, continue if not reached
997  if (ls < daqDirector_->getStartLumisectionFromEnv()) {
998  //skip file if from earlier LS than specified by env
999  if (rawFd != -1) {
1000  close(rawFd);
1001  rawFd = -1;
1002  }
1004  continue;
1005  } else {
1006  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1007  fileQueue_.push(std::move(inf));
1008  }
1009  } else if (ls < 100) {
1010  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
1011  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
1012 
1013  for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
1014  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1015  fileQueue_.push(std::move(inf));
1016  }
1017  } else {
1018  //start from current LS
1019  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1020  fileQueue_.push(std::move(inf));
1021  }
1022  } else {
1023  //queue all lumisections after last one seen to avoid gaps
1024  for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
1025  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1026  fileQueue_.push(std::move(inf));
1027  }
1028  }
1029  currentLumiSection = ls;
1030  }
1031  }
1032  //else
1033  if (currentLumiSection > 0 && ls < currentLumiSection) {
1034  edm::LogError("FedRawDataInputSource")
1035  << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
1036  << ". Aborting execution." << std::endl;
1037  if (rawFd != -1)
1038  close(rawFd);
1039  rawFd = -1;
1040  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
1041  fileQueue_.push(std::move(inf));
1042  stop = true;
1043  break;
1044  }
1045  }
1046 
1047  int dbgcount = 0;
1050  dbgcount++;
1051  if (!(dbgcount % 20))
1052  LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1053  if (!useFileBroker_)
1054  usleep(100000);
1055  else {
1056  backoff_exp = std::min(4, backoff_exp); // max 1.6 seconds
1057  //backoff_exp=0; // disabled!
1058  int sleeptime = (int)(100000. * pow(2, backoff_exp));
1059  usleep(sleeptime);
1060  backoff_exp++;
1061  }
1062  } else
1063  backoff_exp = 0;
1064  }
1065  //end of file grab loop, parse result
1068  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1069 
1070  std::string rawFile;
1071  //file service will report raw extension
1072  if (useFileBroker_ || rawHeaderSize)
1073  rawFile = nextFile;
1074  else {
1075  std::filesystem::path rawFilePath(nextFile);
1076  rawFile = rawFilePath.replace_extension(".raw").string();
1077  }
1078 
1079  struct stat st;
1080  int stat_res = stat(rawFile.c_str(), &st);
1081  if (stat_res == -1) {
1082  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
1083  setExceptionState_ = true;
1084  break;
1085  }
1086  uint64_t fileSize = st.st_size;
1087 
1088  if (fms_) {
1092  }
1093  int eventsInNewFile;
1094  if (fileListMode_) {
1095  if (fileSize == 0)
1096  eventsInNewFile = 0;
1097  else
1098  eventsInNewFile = -1;
1099  } else {
1101  if (!useFileBroker_) {
1102  if (rawHeaderSize) {
1103  int rawFdEmpty = -1;
1104  uint16_t rawHeaderCheck;
1105  bool fileFound;
1106  eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
1107  nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
1108  assert(fileFound && rawHeaderCheck == rawHeaderSize);
1110  } else
1111  eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1112  } else
1113  eventsInNewFile = serverEventsInNewFile;
1114  assert(eventsInNewFile >= 0);
1115  assert((eventsInNewFile > 0) ==
1116  (fileSize > rawHeaderSize)); //file without events must be empty or contain only header
1117  }
1118 
1119  if (!singleBufferMode_) {
1120  //calculate number of needed chunks
1121  unsigned int neededChunks = fileSize / eventChunkSize_;
1122  if (fileSize % eventChunkSize_)
1123  neededChunks++;
1124 
1125  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1126  ls,
1127  rawFile,
1128  !fileListMode_,
1129  rawFd,
1130  fileSize,
1131  rawHeaderSize,
1132  neededChunks,
1133  eventsInNewFile,
1134  this));
1136  auto newInputFilePtr = newInputFile.get();
1137  fileQueue_.push(std::move(newInputFile));
1138 
1139  for (unsigned int i = 0; i < neededChunks; i++) {
1140  if (fms_) {
1141  bool copy_active = false;
1142  for (auto j : tid_active_)
1143  if (j)
1144  copy_active = true;
1145  if (copy_active)
1147  else
1149  }
1150  //get thread
1151  unsigned int newTid = 0xffffffff;
1152  while (!workerPool_.try_pop(newTid)) {
1153  usleep(100000);
1154  if (quit_threads_.load(std::memory_order_relaxed)) {
1155  stop = true;
1156  break;
1157  }
1158  }
1159 
1160  if (fms_) {
1161  bool copy_active = false;
1162  for (auto j : tid_active_)
1163  if (j)
1164  copy_active = true;
1165  if (copy_active)
1167  else
1169  }
1170  InputChunk* newChunk = nullptr;
1171  while (!freeChunks_.try_pop(newChunk)) {
1172  usleep(100000);
1173  if (quit_threads_.load(std::memory_order_relaxed)) {
1174  stop = true;
1175  break;
1176  }
1177  }
1178 
1179  if (newChunk == nullptr) {
1180  //return unused tid if we received shutdown (nullptr chunk)
1181  if (newTid != 0xffffffff)
1182  workerPool_.push(newTid);
1183  stop = true;
1184  break;
1185  }
1186  if (stop)
1187  break;
1189 
1190  std::unique_lock<std::mutex> lk(mReader_);
1191 
1192  unsigned int toRead = eventChunkSize_;
1193  if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1194  toRead = fileSize % eventChunkSize_;
1195  newChunk->reset(i * eventChunkSize_, toRead, i);
1196 
1197  workerJob_[newTid].first = newInputFilePtr;
1198  workerJob_[newTid].second = newChunk;
1199 
1200  //wake up the worker thread
1201  cvReader_[newTid]->notify_one();
1202  }
1203  } else {
1204  if (!eventsInNewFile) {
1205  if (rawFd) {
1206  close(rawFd);
1207  rawFd = -1;
1208  }
1209  //still queue file for lumi update
1210  std::unique_lock<std::mutex> lkw(mWakeup_);
1211  //TODO: also file with only file header fits in this edge case. Check if read correctly in single buffer mode
1212  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1213  ls,
1214  rawFile,
1215  !fileListMode_,
1216  rawFd,
1217  fileSize,
1218  rawHeaderSize,
1219  (rawHeaderSize > 0),
1220  0,
1221  this));
1223  fileQueue_.push(std::move(newInputFile));
1224  cvWakeup_.notify_one();
1225  break;
1226  }
1227  //in single-buffer mode put single chunk in the file and let the main thread read the file
1228  InputChunk* newChunk = nullptr;
1229  //should be available immediately
1230  while (!freeChunks_.try_pop(newChunk)) {
1231  usleep(100000);
1232  if (quit_threads_.load(std::memory_order_relaxed)) {
1233  stop = true;
1234  break;
1235  }
1236  }
1237 
1238  if (newChunk == nullptr) {
1239  stop = true;
1240  }
1241 
1242  if (stop)
1243  break;
1244 
1245  std::unique_lock<std::mutex> lkw(mWakeup_);
1246 
1247  unsigned int toRead = eventChunkSize_;
1248  if (fileSize % eventChunkSize_)
1249  toRead = fileSize % eventChunkSize_;
1250  newChunk->reset(0, toRead, 0);
1251  newChunk->readComplete_ = true;
1252 
1253  //push file and wakeup main thread
1254  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1255  ls,
1256  rawFile,
1257  !fileListMode_,
1258  rawFd,
1259  fileSize,
1260  rawHeaderSize,
1261  1,
1262  eventsInNewFile,
1263  this));
1264  newInputFile->chunks_[0] = newChunk;
1266  fileQueue_.push(std::move(newInputFile));
1267  cvWakeup_.notify_one();
1268  }
1269  }
1270  }
1272  //make sure threads finish reading
1273  unsigned numFinishedThreads = 0;
1274  while (numFinishedThreads < workerThreads_.size()) {
1275  unsigned tid = 0;
1276  while (!workerPool_.try_pop(tid)) {
1277  usleep(10000);
1278  }
1279  std::unique_lock<std::mutex> lk(mReader_);
1280  thread_quit_signal[tid] = true;
1281  cvReader_[tid]->notify_one();
1282  numFinishedThreads++;
1283  }
1284  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1285  workerThreads_[i]->join();
1286  delete workerThreads_[i];
1287  }
1288 }
1289 
1290 void FedRawDataInputSource::readWorker(unsigned int tid) {
1291  bool init = true;
1292 
1293  while (true) {
1294  tid_active_[tid] = false;
1295  std::unique_lock<std::mutex> lk(mReader_);
1296  workerJob_[tid].first = nullptr;
1297  workerJob_[tid].first = nullptr;
1298 
1299  assert(!thread_quit_signal[tid]); //should never get it here
1300  workerPool_.push(tid);
1301 
1302  if (init) {
1303  std::unique_lock<std::mutex> lks(startupLock_);
1304  init = false;
1305  startupCv_.notify_one();
1306  }
1307  cvWakeup_.notify_all();
1308  cvReader_[tid]->wait(lk);
1309  lk.unlock();
1310 
1311  if (thread_quit_signal[tid])
1312  return;
1313  tid_active_[tid] = true;
1314 
1315  InputFile* file;
1316  InputChunk* chunk;
1317 
1318  assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1319 
1320  file = workerJob_[tid].first;
1321  chunk = workerJob_[tid].second;
1322 
1323  //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1324  unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1325 
1326  //if only one worker thread exists, use single fd for all operations
1327  //if more worker threads exist, use rawFd_ for only the first read operation and then close file
1328  int fileDescriptor;
1329  bool fileOpenedHere = false;
1330 
1331  if (numConcurrentReads_ == 1) {
1332  fileDescriptor = file->rawFd_;
1333  if (fileDescriptor == -1) {
1334  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1335  fileOpenedHere = true;
1336  file->rawFd_ = fileDescriptor;
1337  }
1338  } else {
1339  if (chunk->offset_ == 0) {
1340  fileDescriptor = file->rawFd_;
1341  file->rawFd_ = -1;
1342  if (fileDescriptor == -1) {
1343  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1344  fileOpenedHere = true;
1345  }
1346  } else {
1347  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1348  fileOpenedHere = true;
1349  }
1350  }
1351 
1352  if (fileDescriptor < 0) {
1353  edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1354  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1355  setExceptionState_ = true;
1356  continue;
1357  }
1358  if (fileOpenedHere) { //fast forward to this chunk position
1359  off_t pos = 0;
1360  pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1361  if (pos == -1) {
1362  edm::LogError("FedRawDataInputSource")
1363  << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1364  << chunk->offset_ << " error: " << strerror(errno);
1365  setExceptionState_ = true;
1366  continue;
1367  }
1368  }
1369 
1370  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1371  << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1372 
1373  unsigned int skipped = bufferLeft;
1375  for (unsigned int i = 0; i < readBlocks_; i++) {
1376  ssize_t last;
1377 
1378  //protect against reading into next block
1379  last = ::read(fileDescriptor,
1380  (void*)(chunk->buf_ + bufferLeft),
1381  std::min(chunk->usedSize_ - bufferLeft, (uint64_t)eventChunkBlock_));
1382 
1383  if (last < 0) {
1384  edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1385  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1386  setExceptionState_ = true;
1387  break;
1388  }
1389  if (last > 0)
1390  bufferLeft += last;
1391  if (last < eventChunkBlock_) { //last read
1392  //check if this is last block, then total read size must match file size
1393  if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (unsigned int)last)) {
1394  edm::LogError("FedRawDataInputSource")
1395  << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1396  << " expectedChunkSize:" << chunk->usedSize_
1397  << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1398  << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1399  setExceptionState_ = true;
1400  }
1401  break;
1402  }
1403  }
1404  if (setExceptionState_)
1405  continue;
1406 
1408  auto diff = end - start;
1409  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1410  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1411  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1412  << " GB/s)";
1413 
1414  if (chunk->offset_ + bufferLeft == file->fileSize_) { //file reading finished using same fd
1415  close(fileDescriptor);
1416  fileDescriptor = -1;
1417  if (numConcurrentReads_ == 1)
1418  file->rawFd_ = -1;
1419  }
1420  if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1421  close(fileDescriptor);
1422 
1423  //detect FRD event version. Skip file Header if it exists
1424  if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1425  detectedFRDversion_ = *((uint16_t*)(chunk->buf_ + file->rawHeaderSize_));
1426  }
1428  chunk->readComplete_ =
1429  true; //this is atomic to secure the sequential buffer fill before becoming available for processing)
1430  file->chunks_[chunk->fileIndex_] = chunk; //put the completed chunk in the file chunk vector at predetermined index
1431  }
1432 }
1433 
1435  quit_threads_ = true;
1436  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1437 }
1438 
1440  if (fms_)
1441  fms_->setInState(state);
1442 }
1443 
1445  if (fms_)
1447 }
1448 
1449 inline bool InputFile::advance(unsigned char*& dataPosition, const size_t size) {
1450  //wait for chunk
1451 
1452  while (!waitForChunk(currentChunk_)) {
1454  usleep(100000);
1456  if (parent_->exceptionState())
1457  parent_->threadError();
1458  }
1459 
1460  dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_;
1461  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1462 
1463  if (currentLeft < size) {
1464  //we need next chunk
1465  assert(chunks_.size() > currentChunk_ + 1);
1466  while (!waitForChunk(currentChunk_ + 1)) {
1468  usleep(100000);
1470  if (parent_->exceptionState())
1471  parent_->threadError();
1472  }
1473  //copy everything to beginning of the first chunk
1474  dataPosition -= chunkPosition_;
1475  assert(dataPosition == chunks_[currentChunk_]->buf_);
1476  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_ + chunkPosition_, currentLeft);
1477  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_ + 1]->buf_, size - currentLeft);
1478  //set pointers at the end of the old data position
1479  bufferPosition_ += size;
1480  chunkPosition_ = size - currentLeft;
1481  currentChunk_++;
1482  return true;
1483  } else {
1484  chunkPosition_ += size;
1485  bufferPosition_ += size;
1486  return false;
1487  }
1488 }
1489 
1490 void InputFile::moveToPreviousChunk(const size_t size, const size_t offset) {
1491  //this will fail in case of events that are too large
1493  assert(size - offset < chunks_[currentChunk_]->size_);
1494  memcpy(chunks_[currentChunk_ - 1]->buf_ + offset, chunks_[currentChunk_]->buf_ + chunkPosition_, size);
1495  chunkPosition_ += size;
1496  bufferPosition_ += size;
1497 }
1498 
1499 void InputFile::rewindChunk(const size_t size) {
1500  chunkPosition_ -= size;
1501  bufferPosition_ -= size;
1502 }
1503 
1505  if (rawFd_ != -1)
1506  close(rawFd_);
1507 
1508  if (deleteFile_) {
1509  for (auto& fileName : fileNames_) {
1510  if (!fileName.empty()) {
1512  try {
1513  //sometimes this fails but file gets deleted
1514  LogDebug("FedRawDataInputSource:InputFile") << "Deleting input file -:" << fileName;
1516  continue;
1517  } catch (const std::filesystem::filesystem_error& ex) {
1518  edm::LogError("FedRawDataInputSource:InputFile")
1519  << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what() << ". Trying again.";
1520  } catch (std::exception& ex) {
1521  edm::LogError("FedRawDataInputSource:InputFile")
1522  << " - deleteFile std::exception CAUGHT -: " << ex.what() << ". Trying again.";
1523  }
1525  }
1526  }
1527  }
1528 }
1529 
1530 //single-buffer mode file reading
1532  uint32_t existingSize = 0;
1533 
1534  if (fileDescriptor_ < 0) {
1535  bufferInputRead_ = 0;
1536  if (file->rawFd_ == -1) {
1537  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1538  if (file->rawHeaderSize_)
1539  lseek(fileDescriptor_, file->rawHeaderSize_, SEEK_SET);
1540  } else
1541  fileDescriptor_ = file->rawFd_;
1542 
1543  //skip header size in destination buffer (chunk position was already adjusted)
1544  bufferInputRead_ += file->rawHeaderSize_;
1545  existingSize += file->rawHeaderSize_;
1546 
1547  if (fileDescriptor_ >= 0)
1548  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1549  else {
1550  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1551  << "failed to open file " << std::endl
1552  << file->fileName_ << " fd:" << fileDescriptor_;
1553  }
1554  //fill chunk (skipping file header if present)
1555  for (unsigned int i = 0; i < readBlocks_; i++) {
1556  const ssize_t last = ::read(fileDescriptor_,
1557  (void*)(file->chunks_[0]->buf_ + existingSize),
1558  eventChunkBlock_ - (i == readBlocks_ - 1 ? existingSize : 0));
1560  existingSize += last;
1561  }
1562 
1563  } else {
1564  //continue reading
1565  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1566  for (unsigned int i = 0; i < readBlocks_; i++) {
1567  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1569  existingSize += last;
1570  }
1571  } else {
1572  //event didn't fit in last chunk, so leftover must be moved to the beginning and completed
1573  uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1574  memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1575 
1576  //calculate amount of data that can be added
1577  const uint32_t blockcount = file->chunkPosition_ / eventChunkBlock_;
1578  const uint32_t leftsize = file->chunkPosition_ % eventChunkBlock_;
1579 
1580  for (uint32_t i = 0; i < blockcount; i++) {
1581  const ssize_t last =
1582  ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1584  existingSizeLeft += last;
1585  }
1586  if (leftsize) {
1587  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1589  }
1590  file->chunkPosition_ = 0; //data was moved to beginning of the chunk
1591  }
1592  }
1593  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1594  if (fileDescriptor_ != -1) {
1595  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1596  close(fileDescriptor_);
1597  file->rawFd_ = fileDescriptor_ = -1;
1598  }
1599  }
1600 }
1601 
1603  std::lock_guard<std::mutex> lock(monlock_);
1604  auto itr = sourceEventsReport_.find(lumi);
1605  if (itr != sourceEventsReport_.end())
1606  itr->second += events;
1607  else
1609 }
1610 
1611 std::pair<bool, unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase) {
1612  std::lock_guard<std::mutex> lock(monlock_);
1613  auto itr = sourceEventsReport_.find(lumi);
1614  if (itr != sourceEventsReport_.end()) {
1615  std::pair<bool, unsigned int> ret(true, itr->second);
1616  if (erase)
1617  sourceEventsReport_.erase(itr);
1618  return ret;
1619  } else
1620  return std::pair<bool, unsigned int>(false, 0);
1621 }
1622 
1624  std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1625  if (a.rfind('/') != std::string::npos)
1626  a = a.substr(a.rfind('/'));
1627  if (b.rfind('/') != std::string::npos)
1628  b = b.substr(b.rfind('/'));
1629  return b > a;
1630  });
1631 
1632  if (!fileNames_.empty()) {
1633  //get run number from first file in the vector
1635  std::string fileStem = fileName.stem().string();
1636  if (fileStem.find("file://") == 0)
1637  fileStem = fileStem.substr(7);
1638  else if (fileStem.find("file:") == 0)
1639  fileStem = fileStem.substr(5);
1640  auto end = fileStem.find('_');
1641 
1642  if (fileStem.find("run") == 0) {
1643  std::string runStr = fileStem.substr(3, end - 3);
1644  try {
1645  //get long to support test run numbers < 2^32
1646  long rval = std::stol(runStr);
1647  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1648  return rval;
1649  } catch (const std::exception&) {
1650  edm::LogWarning("FedRawDataInputSource")
1651  << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1652  }
1653  }
1654  }
1655  return -1;
1656 }
1657 
1659  std::string& nextFile,
1660  uint32_t& fsize,
1661  uint64_t& lockWaitTime) {
1662  if (fileListIndex_ < fileNames_.size()) {
1663  nextFile = fileNames_[fileListIndex_];
1664  if (nextFile.find("file://") == 0)
1665  nextFile = nextFile.substr(7);
1666  else if (nextFile.find("file:") == 0)
1667  nextFile = nextFile.substr(5);
1668  std::filesystem::path fileName = nextFile;
1669  std::string fileStem = fileName.stem().string();
1670  if (fileStem.find("ls"))
1671  fileStem = fileStem.substr(fileStem.find("ls") + 2);
1672  if (fileStem.find('_'))
1673  fileStem = fileStem.substr(0, fileStem.find('_'));
1674 
1675  if (!fileListLoopMode_)
1676  ls = std::stoul(fileStem);
1677  else //always starting from LS 1 in loop mode
1678  ls = 1 + loopModeIterationInc_;
1679 
1680  //fsize = 0;
1681  //lockWaitTime = 0;
1682  fileListIndex_++;
1684  } else {
1685  if (!fileListLoopMode_)
1687  else {
1688  //loop through files until interrupted
1690  fileListIndex_ = 0;
1691  return getFile(ls, nextFile, fsize, lockWaitTime);
1692  }
1693  }
1694 }
size
Write out results.
static const char runNumber_[]
uint8_t triggerType() const
Event Trigger type identifier.
Definition: FEDHeader.cc:13
Definition: start.py:1
Definition: fillJson.h:27
std::vector< std::string > fileNames_
unsigned int getgpshigh(const unsigned char *)
std::condition_variable cvWakeup_
void read(edm::EventPrincipal &eventPrincipal) override
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:261
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:359
bool gtpe_board_sense(const unsigned char *p)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
constexpr std::array< uint32, FRDHeaderMaxVersion+1 > FRDHeaderVersionSize
bool crc32c_hw_test()
Definition: crc32c.cc:354
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
static const uint32_t length
Definition: FEDTrailer.h:57
tbb::concurrent_queue< unsigned int > workerPool_
unsigned int get(const unsigned char *, bool)
bool useFileBroker() const
void maybeOpenNewLumiSection(const uint32_t lumiSection)
std::vector< std::string > fileNames_
static const uint32_t length
Definition: FEDHeader.h:54
ret
prodAgent to be discontinued
uint16_t sourceID() const
Identifier of the FED.
Definition: FEDHeader.cc:19
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
FedRawDataInputSource * parent_
volatile std::atomic< bool > shutdown_flag
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:458
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
constexpr size_t FRDHeaderMaxVersion
bool advance(unsigned char *&dataPosition, const size_t size)
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
bool lumisectionDiscarded(unsigned int ls)
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &rawData, bool &tcdsInRange)
std::pair< InputFile *, InputChunk * > ReaderInfo
std::unique_ptr< std::thread > readSupervisorThread_
Log< level::Error, false > LogError
unsigned int numConcurrentLumis() const
StreamID streamID() const
int timeout
Definition: mps_check.py:53
assert(be >=bs)
bool isExceptionOnData(unsigned int ls)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
bool waitForChunk(unsigned int chunkid)
std::string getEoRFilePathOnFU() const
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, bool isRealData, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection, bool suppressWarning)
uint32_t fragmentLength() const
The length of the event fragment counted in 64-bit words including header and trailer.
Definition: FEDTrailer.cc:13
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
U second(std::pair< T, U > const &p)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
std::string getEoLSFilePathOnBU(const unsigned int ls) const
static Timestamp beginOfTime()
Definition: Timestamp.h:77
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
const std::vector< unsigned int > testTCDSFEDRange_
Definition: TCDSRaw.h:16
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:226
void rewindChunk(const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:366
std::vector< int > streamFileTracker_
void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex)
unsigned char * buf_
ProductProvenance const & dummyProvenance() const
void setMonStateSup(evf::FastMonState::InputState state)
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::map< unsigned int, unsigned int > sourceEventsReport_
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
unsigned long long TimeValue_t
Definition: Timestamp.h:21
std::vector< std::thread * > workerThreads_
bool evm_board_sense(const unsigned char *p, size_t size)
void setMonState(evf::FastMonState::InputState state)
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:391
void createProcessingNotificationMaybe() const
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
std::atomic< bool > readComplete_
Definition: init.py:1
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
def ls(path, rec=False)
Definition: eostools.py:349
BranchDescription const & branchDescription() const
tbb::concurrent_vector< InputChunk * > chunks_
unsigned int getStartLumisectionFromEnv() const
def getRunNumber(filename)
void setInStateSup(FastMonState::InputState inputState)
unsigned long long uint64_t
Definition: Time.h:13
def load(fileName)
Definition: svgfig.py:547
void resize(size_t newsize, size_t wordsize=8)
Definition: FEDRawData.cc:28
uint32_t chunkPosition_
double b
Definition: hdecay.h:120
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:233
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:374
void stoppedLookingForFile(unsigned int lumi)
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:360
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:463
HLT enums.
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:362
ItemTypeInfo state() const
Definition: InputSource.h:361
void readWorker(unsigned int tid)
double a
Definition: hdecay.h:121
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:264
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
std::vector< unsigned int > thread_quit_signal
unsigned int getLumisectionToStart() const
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:24
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
evf::FastMonitoringService * fms_
unsigned int gtpe_get(const unsigned char *)
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
tbb::concurrent_queue< InputChunk * > freeChunks_
Log< level::Warning, false > LogWarning
evf::EvFDaqDirector::FileStatus getNextEvent()
unsigned int currentChunk_
evf::EvFDaqDirector::FileStatus nextEvent()
void readNextChunkIntoBuffer(InputFile *file)
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
void setFMS(evf::FastMonitoringService *fms)
std::unique_ptr< InputFile > currentFile_
unsigned int getgpslow(const unsigned char *)
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
int events
Power< A, B >::type pow(const A &a, const B &b)
Definition: Power.h:29
def move(src, dest)
Definition: eostools.py:511
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
std::unique_ptr< edm::streamer::FRDEventMsgView > event_
Definition: event.py:1
#define LogDebug(id)
void setInState(FastMonState::InputState inputState)
uint32_t bufferPosition_