CMS 3D CMS Logo

FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <memory>
5 #include <sstream>
6 #include <sys/types.h>
7 #include <sys/file.h>
8 #include <sys/time.h>
9 #include <unistd.h>
10 #include <vector>
11 #include <fstream>
12 #include <zlib.h>
13 #include <cstdio>
14 #include <chrono>
15 
16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
18 
19 
24 
26 
33 
35 
37 
41 
43 
48 
49 //JSON file reader
51 
52 #include <boost/lexical_cast.hpp>
53 
55  edm::InputSourceDescription const& desc) :
56  edm::RawInputSource(pset, desc),
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", "")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",32)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",32)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",2)),
61  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
62  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
63  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool> ("alwaysStartFromFirstLS", false)),
64  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
65  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
66  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
67  fileNames_(pset.getUntrackedParameter<std::vector<std::string>> ("fileNames",std::vector<std::string>())),
68  fileListMode_(pset.getUntrackedParameter<bool> ("fileListMode", false)),
69  fileListLoopMode_(pset.getUntrackedParameter<bool> ("fileListLoopMode", false)),
70  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
71  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
72  eventID_(),
73  processHistoryID_(),
74  currentLumiSection_(0),
75  tcds_pointer_(nullptr),
76  eventsThisLumi_(0)
77 {
78  char thishost[256];
79  gethostname(thishost, 255);
80  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
81  << std::endl << (eventChunkSize_/1048576)
82  << " MB on host " << thishost;
83 
84  long autoRunNumber = -1;
85  if (fileListMode_) {
86  autoRunNumber = initFileList();
87  if (!fileListLoopMode_) {
88  if (autoRunNumber<0)
89  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
90  //override run number
91  runNumber_ = (edm::RunNumber_t)autoRunNumber;
92  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
93  }
94  }
95 
97  setNewRun();
98  //todo:autodetect from file name (assert if names differ)
101 
102  //make sure that chunk size is N * block size
107 
108  if (!numBuffers_)
109  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
110  "no reading enabled with numBuffers parameter 0";
111 
115 
116  if (!crc32c_hw_test())
117  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
118 
119  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
120  if (fileListMode_) {
121  try {
123  } catch(cms::Exception const&) {
124  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
125  }
126  }
127  else {
129  if (!fms_) {
130  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
131  }
132  }
133 
135  if (!daqDirector_)
136  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
137 
139  if (useFileBroker_)
140  edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
141  //set DaqDirector to delete files in preGlobalEndLumi callback
143  if (fms_) {
145  fms_->setInputSource(this);
148  }
149  //should delete chunks when run stops
150  for (unsigned int i=0;i<numBuffers_;i++) {
152  }
153 
154  quit_threads_ = false;
155 
156  for (unsigned int i=0;i<numConcurrentReads_;i++)
157  {
158  std::unique_lock<std::mutex> lk(startupLock_);
159  //issue a memory fence here and in threads (constructor was segfaulting without this)
160  thread_quit_signal.push_back(false);
161  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
162  cvReader_.push_back(new std::condition_variable);
163  tid_active_.push_back(0);
164  threadInit_.store(false,std::memory_order_release);
165  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
166  startupCv_.wait(lk);
167  }
168 
169  runAuxiliary()->setProcessHistoryID(processHistoryID_);
170 }
171 
173 {
174  quit_threads_=true;
175 
176  //delete any remaining open files
177  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
178  deleteFile(it->second->fileName_);
179  delete it->second;
180  }
182  readSupervisorThread_->join();
183  }
184  else {
185  //join aux threads in case the supervisor thread was not started
186  for (unsigned int i=0;i<workerThreads_.size();i++) {
187  std::unique_lock<std::mutex> lk(mReader_);
188  thread_quit_signal[i]=true;
189  cvReader_[i]->notify_one();
190  lk.unlock();
191  workerThreads_[i]->join();
192  delete workerThreads_[i];
193  }
194  }
195  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
196  /*
197  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
198  InputChunk *ch;
199  while (!freeChunks_.try_pop(ch)) {}
200  delete ch;
201  }
202  */
203 }
204 
206 {
208  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
209  desc.addUntracked<unsigned int> ("eventChunkSize",32)->setComment("Input buffer (chunk) size");
210  desc.addUntracked<unsigned int> ("eventChunkBlock",32)->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
211  desc.addUntracked<unsigned int> ("numBuffers",2)->setComment("Number of buffers used for reading input");
212  desc.addUntracked<unsigned int> ("maxBufferedFiles",2)->setComment("Maximum number of simultaneously buffered raw files");
213  desc.addUntracked<unsigned int> ("alwaysStartFromfirstLS",false)->setComment("Force source to start from LS 1 if server provides higher lumisection number");
214  desc.addUntracked<bool> ("verifyAdler32", true)->setComment("Verify event Adler32 checksum with FRDv3 or v4");
215  desc.addUntracked<bool> ("verifyChecksum", true)->setComment("Verify event CRC-32C checksum of FRDv5 or higher");
216  desc.addUntracked<bool> ("useL1EventID", false)->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
217  desc.addUntracked<bool> ("fileListMode", false)->setComment("Use fileNames parameter to directly specify raw files to open");
218  desc.addUntracked<std::vector<std::string>> ("fileNames", std::vector<std::string>())->setComment("file list used when fileListMode is enabled");
219  desc.setAllowAnything();
220  descriptions.add("source", desc);
221 }
222 
224 {
226  {
227  //this thread opens new files and dispatches reading to worker readers
228  //threadInit_.store(false,std::memory_order_release);
229  std::unique_lock<std::mutex> lk(startupLock_);
230  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
232  startupCv_.wait(lk);
233  }
234  //signal hltd to start event accounting
235  if (!currentLumiSection_)
238  switch (nextEvent() ) {
240  //maybe create EoL file in working directory before ending run
241  struct stat buf;
242  if (!useFileBroker_ && currentLumiSection_ > 0) {
243  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
244  if (eolFound) {
246  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
247  if ( !found ) {
249  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
250  close(eol_fd);
252  }
253  }
254  }
255  //also create EoR file in FU data directory
256  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
257  if (!eorFound) {
258  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
259  close(eor_fd);
260  }
262  eventsThisLumi_=0;
264  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
265  return false;
266  }
268  //this is not reachable
269  return true;
270  }
272  //std::cout << "--------------NEW LUMI---------------" << std::endl;
273  return true;
274  }
275  default: {
276  if (!getLSFromFilename_) {
277  //get new lumi from file header
278  if (event_->lumi() > currentLumiSection_) {
280  eventsThisLumi_=0;
281  maybeOpenNewLumiSection( event_->lumi() );
282  }
283  }
286  else
287  eventRunNumber_=event_->run();
288  L1EventID_ = event_->event();
289 
290  setEventCached();
291 
292  return true;
293  }
294  }
295 }
296 
297 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection)
298 {
300  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
301 
302  if (!useFileBroker_) {
303  if ( currentLumiSection_ > 0) {
304  const std::string fuEoLS =
306  struct stat buf;
307  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
308  if ( !found ) {
310  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
311  close(eol_fd);
312  daqDirector_->createBoLSFile(lumiSection,false);
314  }
315  }
316  else daqDirector_->createBoLSFile(lumiSection,true);//needed for initial lumisection
317  }
318 
319  currentLumiSection_ = lumiSection;
320 
322 
323  timeval tv;
324  gettimeofday(&tv, nullptr);
325  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
326 
327  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
329  runAuxiliary()->run(),
330  lumiSection, lsopentime,
332 
333  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
334  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
335 
336  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
337  }
338 }
339 
341 {
343  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
344  {
345  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
346  }
347  return status;
348 }
349 
351 {
352 
354  if (!currentFile_)
355  {
358  if (!fileQueue_.try_pop(currentFile_))
359  {
360  //sleep until wakeup (only in single-buffer mode) or timeout
361  std::unique_lock<std::mutex> lkw(mWakeup_);
362  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
364  }
365  status = currentFile_->status_;
366  if ( status == evf::EvFDaqDirector::runEnded)
367  {
369  delete currentFile_;
370  currentFile_=nullptr;
371  return status;
372  }
373  else if ( status == evf::EvFDaqDirector::runAbort)
374  {
375  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
376  }
377  else if (status == evf::EvFDaqDirector::newLumi)
378  {
380  if (getLSFromFilename_) {
383  eventsThisLumi_=0;
385  }
386  }
387  else {//let this be picked up from next event
389  }
390 
391  delete currentFile_;
392  currentFile_=nullptr;
393  return status;
394  }
395  else if (status == evf::EvFDaqDirector::newFile) {
397  }
398  else
399  assert(false);
400  }
402 
403  //file is empty
404  if (!currentFile_->fileSize_) {
406  //try to open new lumi
407  assert(currentFile_->nChunks_==0);
408  if (getLSFromFilename_)
411  eventsThisLumi_=0;
413  }
414  //immediately delete empty file
416  delete currentFile_;
417  currentFile_=nullptr;
419  }
420 
421  //file is finished
424  //release last chunk (it is never released elsewhere)
427  {
428  throw cms::Exception("FedRawDataInputSource::getNextEvent")
429  << "Fully processed " << currentFile_->nProcessed_
430  << " from the file " << currentFile_->fileName_
431  << " but according to BU JSON there should be "
432  << currentFile_->nEvents_ << " events";
433  }
434  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
435  if (singleBufferMode_) {
436  std::unique_lock<std::mutex> lkw(mWakeup_);
437  cvWakeup_.notify_one();
438  }
441  //put the file in pending delete list;
442  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
443  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
444  }
445  else {
446  //in single-thread and stream jobs, events are already processed
448  delete currentFile_;
449  }
450  currentFile_=nullptr;
452  }
453 
454 
455  //file is too short
457  {
458  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
459  "Premature end of input file while reading event header";
460  }
461  if (singleBufferMode_) {
462 
463  //should already be there
466  usleep(10000);
468  }
470 
471  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
472 
473  //conditions when read amount is not sufficient for the header to fit
474  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
476  {
478 
479  if (detectedFRDversion_==0) {
480  detectedFRDversion_=*((uint32*)dataPosition);
481  if (detectedFRDversion_>5)
482  throw cms::Exception("FedRawDataInputSource::getNextEvent")
483  << "Unknown FRD version -: " << detectedFRDversion_;
484  assert(detectedFRDversion_>=1);
485  }
486 
487  //recalculate chunk position
488  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
489  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
490  {
491  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
492  "Premature end of input file while reading event header";
493  }
494  }
495 
496  event_.reset( new FRDEventMsgView(dataPosition) );
497  if (event_->size()>eventChunkSize_) {
498  throw cms::Exception("FedRawDataInputSource::getNextEvent")
499  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
500  << " run:" << event_->run() << " of size:" << event_->size()
501  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
502  }
503 
504  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
505 
507  {
508  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
509  "Premature end of input file while reading event data";
510  }
511  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
513  //recalculate chunk position
514  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
515  event_.reset( new FRDEventMsgView(dataPosition) );
516  }
517  currentFile_->bufferPosition_ += event_->size();
518  currentFile_->chunkPosition_ += event_->size();
519  //last chunk is released when this function is invoked next time
520 
521  }
522  //multibuffer mode:
523  else
524  {
525  //wait for the current chunk to become added to the vector
528  usleep(10000);
530  }
532 
533  //check if header is at the boundary of two chunks
534  chunkIsFree_ = false;
535  unsigned char *dataPosition;
536 
537  //read header, copy it to a single chunk if necessary
538  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
539 
540  event_.reset( new FRDEventMsgView(dataPosition) );
541  if (event_->size()>eventChunkSize_) {
542  throw cms::Exception("FedRawDataInputSource::getNextEvent")
543  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
544  << " run:" << event_->run() << " of size:" << event_->size()
545  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
546  }
547 
548  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
549 
551  {
552  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
553  "Premature end of input file while reading event data";
554  }
555 
556  if (chunkEnd) {
557  //header was at the chunk boundary, we will have to move payload as well
558  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
559  chunkIsFree_ = true;
560  }
561  else {
562  //header was contiguous, but check if payload fits the chunk
563  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
564  //rewind to header start position
565  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
566  //copy event to a chunk start and move pointers
567  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
568  assert(chunkEnd);
569  chunkIsFree_=true;
570  //header is moved
571  event_.reset( new FRDEventMsgView(dataPosition) );
572  }
573  else {
574  //everything is in a single chunk, only move pointers forward
575  chunkEnd = currentFile_->advance(dataPosition,msgSize);
576  assert(!chunkEnd);
577  chunkIsFree_=false;
578  }
579  }
580  }//end multibuffer mode
582 
583  if (verifyChecksum_ && event_->version() >= 5)
584  {
585  uint32_t crc=0;
586  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
587  if ( crc != event_->crc32c() ) {
589  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
590  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
591  " but calculated 0x" << crc;
592  }
593  }
594  else if ( verifyAdler32_ && event_->version() >= 3)
595  {
596  uint32_t adler = adler32(0L,Z_NULL,0);
597  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
598 
599  if ( adler != event_->adler32() ) {
601  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
602  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
603  " but calculated 0x" << adler;
604  }
605  }
607 
609 
611 }
612 
614 {
615  //no deletion in file list mode
616  if (fileListMode_) return;
617 
618  const boost::filesystem::path filePath(fileName);
619  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
620  try {
621  //sometimes this fails but file gets deleted
622  boost::filesystem::remove(filePath);
623  }
624  catch (const boost::filesystem::filesystem_error& ex)
625  {
626  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
627  << ". Trying again.";
628  usleep(100000);
629  try {
630  boost::filesystem::remove(filePath);
631  }
632  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
633  }
634  catch (std::exception& ex)
635  {
636  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
637  << ". Trying again.";
638  usleep(100000);
639  try {
640  boost::filesystem::remove(filePath);
641  } catch (std::exception&) {/*file gets deleted first time but exception is still thrown*/}
642  }
643 }
644 
645 
647 {
649  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
650  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
651 
652  if (useL1EventID_){
654  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
657  makeEvent(eventPrincipal, aux);
658  }
659  else if(tcds_pointer_==nullptr){
660  assert(GTPEventID_);
662  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
665  makeEvent(eventPrincipal, aux);
666  }
667  else{
668  const FEDHeader fedHeader(tcds_pointer_);
669  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
672  static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
675  makeEvent(eventPrincipal, aux);
676  }
677 
678 
679 
680  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
681 
682  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
684 
685  eventsThisLumi_++;
687 
688  //resize vector if needed
689  while (streamFileTracker_.size() <= eventPrincipal.streamID())
690  streamFileTracker_.push_back(-1);
691 
693 
694  //this old file check runs no more often than every 10 events
695  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
696  //delete files that are not in processing
697  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
698  auto it = filesToDelete_.begin();
699  while (it!=filesToDelete_.end()) {
700  bool fileIsBeingProcessed = false;
701  for (unsigned int i=0;i<nStreams_;i++) {
702  if (it->first == streamFileTracker_.at(i)) {
703  fileIsBeingProcessed = true;
704  break;
705  }
706  }
707  if (!fileIsBeingProcessed) {
708  deleteFile(it->second->fileName_);
709  delete it->second;
710  it = filesToDelete_.erase(it);
711  }
712  else it++;
713  }
714 
715  }
717  chunkIsFree_=false;
719  return;
720 }
721 
723 {
725  timeval stv;
726  gettimeofday(&stv,nullptr);
727  time = stv.tv_sec;
728  time = (time << 32) + stv.tv_usec;
729  edm::Timestamp tstamp(time);
730 
731  uint32_t eventSize = event_->eventSize();
732  unsigned char* event = (unsigned char*)event_->payload();
733  GTPEventID_=0;
734  tcds_pointer_ = nullptr;
735  while (eventSize > 0) {
736  assert(eventSize>=FEDTrailer::length);
737  eventSize -= FEDTrailer::length;
738  const FEDTrailer fedTrailer(event + eventSize);
739  const uint32_t fedSize = fedTrailer.fragmentLength() << 3; //trailer length counts in 8 bytes
740  assert(eventSize>=fedSize - FEDHeader::length);
741  eventSize -= (fedSize - FEDHeader::length);
742  const FEDHeader fedHeader(event + eventSize);
743  const uint16_t fedId = fedHeader.sourceID();
744  if(fedId>FEDNumbering::MAXFEDID)
745  {
746  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
747  }
748  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
749  tcds_pointer_ = event + eventSize;
750  }
751  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
752  if (evf::evtn::evm_board_sense(event + eventSize,fedSize))
753  GTPEventID_ = evf::evtn::get(event + eventSize,true);
754  else
755  GTPEventID_ = evf::evtn::get(event + eventSize,false);
756  //evf::evtn::evm_board_setformat(fedSize);
757  const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
758  const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
759  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
760  }
761  //take event ID from GTPE FED
762  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
763  if (evf::evtn::gtpe_board_sense(event + eventSize)) {
764  GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
765  }
766  }
767  FEDRawData& fedData = rawData.FEDData(fedId);
768  fedData.resize(fedSize);
769  memcpy(fedData.data(), event + eventSize, fedSize);
770  }
771  assert(eventSize == 0);
772 
773  return tstamp;
774 }
775 
777 {}
778 
779 
781 {
782  bool stop=false;
783  unsigned int currentLumiSection = 0;
784  //threadInit_.exchange(true,std::memory_order_acquire);
785 
786  {
787  std::unique_lock<std::mutex> lk(startupLock_);
788  startupCv_.notify_one();
789  }
790 
791  uint32_t ls=0;
792  uint32_t monLS=1;
793  uint32_t lockCount=0;
794  uint64_t sumLockWaitTimeUs=0.;
795 
796  while (!stop) {
797 
798  //wait for at least one free thread and chunk
799  int counter=0;
801  {
802  //report state to monitoring
803  if (fms_) {
804  bool copy_active=false;
805  for (auto j : tid_active_) if (j) copy_active=true;
807  else if (freeChunks_.empty()) {
808  if (copy_active)
810  else
812  }
813  else {
814  if (copy_active)
816  else
818  }
819  }
820  std::unique_lock<std::mutex> lkw(mWakeup_);
821  //sleep until woken up by condition or a timeout
822  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
823  counter++;
824  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
825  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
826  }
827  else {
828  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
829  }
830  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
831  }
832 
833  if (stop) break;
834 
835  //look for a new file
836  std::string nextFile;
837  uint32_t fileSizeIndex;
838  int64_t fileSizeFromJson;
839 
840  if (fms_) {
844  }
845 
847 
848  int serverEventsInNewFile_=-1;
849 
850  int backoff_exp=0;
851 
852  while (status == evf::EvFDaqDirector::noFile) {
853  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
854  stop=true;
855  break;
856  }
857 
858  uint64_t thisLockWaitTimeUs=0.;
859  if (fileListMode_) {
860  //return LS if LS not set, otherwise return file
861  status = getFile(ls,nextFile,fileSizeIndex,thisLockWaitTimeUs);
862  }
863  else if (!useFileBroker_)
864  status = daqDirector_->updateFuLock(ls,nextFile,fileSizeIndex,thisLockWaitTimeUs);
865  else {
866  status = daqDirector_->getNextFromFileBroker(currentLumiSection,ls,nextFile,serverEventsInNewFile_,fileSizeFromJson,thisLockWaitTimeUs);
867  }
868 
870 
871  //cycle through all remaining LS even if no files get assigned
872  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
873 
874  //monitoring of lock wait time
875  if (thisLockWaitTimeUs>0.)
876  sumLockWaitTimeUs+=thisLockWaitTimeUs;
877  lockCount++;
878  if (ls>monLS) {
879  monLS=ls;
880  if (lockCount)
881  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
882  lockCount=0;
883  sumLockWaitTimeUs=0;
884  }
885 
886  //check again for any remaining index/EoLS files after EoR file is seen
889  usleep(100000);
890  //now all files should have appeared in ramdisk, check again if any raw files were left behind
891  status = daqDirector_->updateFuLock(ls,nextFile,fileSizeIndex,thisLockWaitTimeUs);
892  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
893  }
894 
895  if ( status == evf::EvFDaqDirector::runEnded) {
897  stop=true;
898  break;
899  }
900 
901  //error from filelocking function
902  if (status == evf::EvFDaqDirector::runAbort) {
904  stop=true;
905  break;
906  }
907  //queue new lumisection
908  if( getLSFromFilename_) {
909  if (ls > currentLumiSection) {
910  if (!useFileBroker_) {
911  //file locking
912  //fms_->setInStateSup(evf::FastMonitoringThread::inSupNewLumi);
913  currentLumiSection = ls;
914  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
915  }
916  else {
917  //new file service
918  if (currentLumiSection==0 && !alwaysStartFromFirstLS_) {
919  if (ls < 100) {
920  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
921  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
922 
923  for (unsigned int nextLS=lsToStart;nextLS<=ls;nextLS++)
925  }
926  else {
927  //start from current LS
929  }
930  }
931  else {
932  //queue all lumisections after last one seen to avoid gaps
933  for (unsigned int nextLS=currentLumiSection+1;nextLS<=ls;nextLS++) {
935  }
936  }
937  currentLumiSection = ls;
938  }
939  }
940  //else
941  if( currentLumiSection>0 && ls < currentLumiSection) {
942  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
944  stop=true;
945  break;
946  }
947  }
948 
949  int dbgcount=0;
950  if (status == evf::EvFDaqDirector::noFile) {
952  dbgcount++;
953  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
954  if (!useFileBroker_) usleep(100000);
955  else {
956  backoff_exp = std::min(4,backoff_exp); // max 1.6 seconds
957  //backoff_exp=0; // disabled!
958  int sleeptime = (int) (100000. * pow(2,backoff_exp));
959  usleep(sleeptime);
960  backoff_exp++;
961  }
962  }
963  else backoff_exp=0;
964  }
965  if ( status == evf::EvFDaqDirector::newFile ) {
967  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
968 
969 
970  std::string rawFile;
971  //file service will report raw extension
972  if (useFileBroker_) rawFile = nextFile;
973  else {
974  boost::filesystem::path rawFilePath(nextFile);
975  rawFile = rawFilePath.replace_extension(".raw").string();
976  }
977 
978  struct stat st;
979  int stat_res = stat(rawFile.c_str(),&st);
980  if (stat_res==-1) {
981  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-"<< rawFile << std::endl;
982  setExceptionState_=true;
983  break;
984  }
985  uint64_t fileSize=st.st_size;
986 
987  if (fms_) {
991  }
992  int eventsInNewFile;
993  if (fileListMode_) {
994  if (fileSize==0) eventsInNewFile=0;
995  else eventsInNewFile=-1;
996  }
997  else {
999  if (!useFileBroker_)
1000  eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1001  else
1002  eventsInNewFile = serverEventsInNewFile_;
1003  assert( eventsInNewFile>=0 );
1004  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1005  }
1006 
1007  if (!singleBufferMode_) {
1008  //calculate number of needed chunks
1009  unsigned int neededChunks = fileSize/eventChunkSize_;
1010  if (fileSize%eventChunkSize_) neededChunks++;
1011 
1012  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
1014  fileQueue_.push(newInputFile);
1015 
1016  for (unsigned int i=0;i<neededChunks;i++) {
1017 
1018  if (fms_) {
1019  bool copy_active=false;
1020  for (auto j : tid_active_) if (j) copy_active=true;
1021  if (copy_active)
1023  else
1025  }
1026  //get thread
1027  unsigned int newTid = 0xffffffff;
1028  while (!workerPool_.try_pop(newTid)) {
1029  usleep(100000);
1030  if (quit_threads_.load(std::memory_order_relaxed)) {stop=true;break;}
1031  }
1032 
1033  if (fms_) {
1034  bool copy_active=false;
1035  for (auto j : tid_active_) if (j) copy_active=true;
1036  if (copy_active)
1038  else
1040  }
1041  InputChunk * newChunk = nullptr;
1042  while (!freeChunks_.try_pop(newChunk)) {
1043  usleep(100000);
1044  if (quit_threads_.load(std::memory_order_relaxed)) {stop=true;break;}
1045  }
1046 
1047  if (newChunk == nullptr) {
1048  //return unused tid if we received shutdown (nullptr chunk)
1049  if (newTid!=0xffffffff) workerPool_.push(newTid);
1050  stop = true;
1051  break;
1052  }
1053  if (stop) break;
1055 
1056  std::unique_lock<std::mutex> lk(mReader_);
1057 
1058  unsigned int toRead = eventChunkSize_;
1059  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1060  newChunk->reset(i*eventChunkSize_,toRead,i);
1061 
1062  workerJob_[newTid].first=newInputFile;
1063  workerJob_[newTid].second=newChunk;
1064 
1065  //wake up the worker thread
1066  cvReader_[newTid]->notify_one();
1067  }
1068  }
1069  else {
1070  if (!eventsInNewFile) {
1071  //still queue file for lumi update
1072  std::unique_lock<std::mutex> lkw(mWakeup_);
1073  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1075  fileQueue_.push(newInputFile);
1076  cvWakeup_.notify_one();
1077  break;
1078  }
1079  //in single-buffer mode put single chunk in the file and let the main thread read the file
1080  InputChunk * newChunk;
1081  //should be available immediately
1082  while(!freeChunks_.try_pop(newChunk)) {
1083  usleep(100000);
1084  if (quit_threads_.load(std::memory_order_relaxed)) break;
1085  }
1086 
1087  std::unique_lock<std::mutex> lkw(mWakeup_);
1088 
1089  unsigned int toRead = eventChunkSize_;
1090  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1091  newChunk->reset(0,toRead,0);
1092  newChunk->readComplete_=true;
1093 
1094  //push file and wakeup main thread
1095  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1096  newInputFile->chunks_[0]=newChunk;
1098  fileQueue_.push(newInputFile);
1099  cvWakeup_.notify_one();
1100  }
1101  }
1102  }
1104  //make sure threads finish reading
1105  unsigned numFinishedThreads = 0;
1106  while (numFinishedThreads < workerThreads_.size()) {
1107  unsigned int tid;
1108  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1109  std::unique_lock<std::mutex> lk(mReader_);
1110  thread_quit_signal[tid]=true;
1111  cvReader_[tid]->notify_one();
1112  numFinishedThreads++;
1113  }
1114  for (unsigned int i=0;i<workerThreads_.size();i++) {
1115  workerThreads_[i]->join();
1116  delete workerThreads_[i];
1117  }
1118 }
1119 
1121 {
1122  bool init = true;
1123  threadInit_.exchange(true,std::memory_order_acquire);
1124 
1125  while (true) {
1126 
1127  tid_active_[tid]=false;
1128  std::unique_lock<std::mutex> lk(mReader_);
1129  workerJob_[tid].first=nullptr;
1130  workerJob_[tid].first=nullptr;
1131 
1132  assert(!thread_quit_signal[tid]);//should never get it here
1133  workerPool_.push(tid);
1134 
1135  if (init) {
1136  std::unique_lock<std::mutex> lk(startupLock_);
1137  init = false;
1138  startupCv_.notify_one();
1139  }
1140  cvReader_[tid]->wait(lk);
1141 
1142  if (thread_quit_signal[tid]) return;
1143  tid_active_[tid]=true;
1144 
1145  InputFile * file;
1146  InputChunk * chunk;
1147 
1148  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1149 
1150  file = workerJob_[tid].first;
1151  chunk = workerJob_[tid].second;
1152 
1153  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1154  off_t pos = 0;
1155 
1156 
1157  if (fileDescriptor<0) {
1158  edm::LogError("FedRawDataInputSource") <<
1159  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<" error: " << strerror(errno);
1160  setExceptionState_=true;
1161  continue;
1162  }
1163  pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1164  if (pos==-1) {
1165  edm::LogError("FedRawDataInputSource") <<
1166  "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1167  " to offset " << chunk->offset_ << " error: " << strerror(errno);
1168  setExceptionState_=true;
1169  continue;
1170  }
1171 
1172  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1173 
1174  unsigned int bufferLeft = 0;
1176  for (unsigned int i=0;i<readBlocks_;i++)
1177  {
1178  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1179  if (last<0) {
1180  edm::LogError("FedRawDataInputSource") <<
1181  "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " error: " << strerror(errno);
1182  setExceptionState_=true;
1183  break;
1184  }
1185  if ( last > 0 )
1186  bufferLeft+=last;
1187  if (last < eventChunkBlock_) {
1188  if (!(chunk->usedSize_==i*eventChunkBlock_+last)) {
1189  edm::LogError("FedRawDataInputSource") <<
1190  "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last <<
1191  " expectedChunkSize:" << chunk->usedSize_ << " readChunkSize:" << (i*eventChunkBlock_+last) << " error: " << strerror(errno);
1192  setExceptionState_=true;
1193  }
1194  break;
1195  }
1196  }
1197  if (setExceptionState_) continue;
1198 
1200  auto diff = end-start;
1201  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1202  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1203  close(fileDescriptor);
1204 
1205  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1206  assert(detectedFRDversion_<=5);
1207  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1208  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1209 
1210  }
1211 }
1212 
1214 {
1215  quit_threads_=true;
1216  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1217 
1218 }
1219 
1220 
1221 inline bool InputFile::advance(unsigned char* & dataPosition, const size_t size)
1222 {
1223  //wait for chunk
1224  while (!waitForChunk(currentChunk_)) {
1225  usleep(100000);
1226  if (parent_->exceptionState()) parent_->threadError();
1227  }
1228 
1229  dataPosition = chunks_[currentChunk_]->buf_+ chunkPosition_;
1230  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1231 
1232  if (currentLeft < size) {
1233 
1234  //we need next chunk
1235  while (!waitForChunk(currentChunk_+1)) {
1236  usleep(100000);
1237  if (parent_->exceptionState()) parent_->threadError();
1238  }
1239  //copy everything to beginning of the first chunk
1240  dataPosition-=chunkPosition_;
1241  assert(dataPosition==chunks_[currentChunk_]->buf_);
1242  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_+chunkPosition_, currentLeft);
1243  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_+1]->buf_, size - currentLeft);
1244  //set pointers at the end of the old data position
1245  bufferPosition_+=size;
1246  chunkPosition_=size-currentLeft;
1247  currentChunk_++;
1248  return true;
1249  }
1250  else {
1251  chunkPosition_+=size;
1252  bufferPosition_+=size;
1253  return false;
1254  }
1255 }
1256 
1257 inline void InputFile::moveToPreviousChunk(const size_t size, const size_t offset)
1258 {
1259  //this will fail in case of events that are too large
1260  assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1261  assert(size - offset < chunks_[currentChunk_]->size_);
1262  memcpy(chunks_[currentChunk_-1]->buf_+offset,chunks_[currentChunk_]->buf_+chunkPosition_,size);
1263  chunkPosition_+=size;
1264  bufferPosition_+=size;
1265 }
1266 
1267 inline void InputFile::rewindChunk(const size_t size) {
1268  chunkPosition_-=size;
1269  bufferPosition_-=size;
1270 }
1271 
1272 //single-buffer mode file reading
1274 {
1275 
1276  if (fileDescriptor_<0) {
1277  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1278  bufferInputRead_ = 0;
1279  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1280  if (fileDescriptor_>=0)
1281  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1282  else
1283  {
1284  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1285  << file->fileName_ << " fd:" << fileDescriptor_;
1286  }
1287  }
1288 
1289  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1290  uint32_t existingSize = 0;
1291  for (unsigned int i=0;i<readBlocks_;i++)
1292  {
1293  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1295  existingSize+=last;
1296  }
1297  }
1298  else {
1299  const uint32_t chunksize = file->chunkPosition_;
1300  const uint32_t blockcount=chunksize/eventChunkBlock_;
1301  const uint32_t leftsize = chunksize%eventChunkBlock_;
1302  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1303  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1304 
1305  for (uint32_t i=0;i<blockcount;i++) {
1306  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1308  existingSize+=last;
1309  }
1310  if (leftsize) {
1311  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1313  }
1314  file->chunkPosition_=0;//data was moved to beginning of the chunk
1315  }
1316  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1317  if (fileDescriptor_!=-1)
1318  {
1319  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1320  close(fileDescriptor_);
1321  fileDescriptor_=-1;
1322  }
1323  }
1324 }
1325 
1326 
1328 {
1329 
1330  std::lock_guard<std::mutex> lock(monlock_);
1331  auto itr = sourceEventsReport_.find(lumi);
1332  if (itr!=sourceEventsReport_.end())
1333  itr->second+=events;
1334  else
1335  sourceEventsReport_[lumi]=events;
1336 }
1337 
1338 std::pair<bool,unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase)
1339 {
1340  std::lock_guard<std::mutex> lock(monlock_);
1341  auto itr = sourceEventsReport_.find(lumi);
1342  if (itr!=sourceEventsReport_.end()) {
1343  auto && ret = std::pair<bool,unsigned int>(true,itr->second);
1344  if (erase)
1345  sourceEventsReport_.erase(itr);
1346  return ret;
1347  }
1348  else
1349  return std::pair<bool,unsigned int>(false,0);
1350 }
1351 
1353 {
1354  std::sort(fileNames_.begin(),fileNames_.end(),
1355  [](std::string a, std::string b) {
1356  if (a.rfind("/")!=std::string::npos) a=a.substr(a.rfind("/"));
1357  if (b.rfind("/")!=std::string::npos) b=b.substr(b.rfind("/"));
1358  return b > a;});
1359 
1360  if (!fileNames_.empty()) {
1361  //get run number from first file in the vector
1363  std::string fileStem = fileName.stem().string();
1364  auto end = fileStem.find("_");
1365  if (fileStem.find("run")==0) {
1366  std::string runStr = fileStem.substr(3,end-3);
1367  try {
1368  //get long to support test run numbers < 2^32
1369  long rval = boost::lexical_cast<long>(runStr);
1370  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: "<< rval;
1371  return rval;
1372  }
1373  catch( boost::bad_lexical_cast const& ) {
1374  edm::LogWarning("FedRawDataInputSource") << "Unable to autodetect run number in fileListMode from file -: "<< fileName;
1375  }
1376  }
1377  }
1378  return -1;
1379 }
1380 
1381 evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile(unsigned int& ls, std::string& nextFile, uint32_t& fsize, uint64_t& lockWaitTime)
1382 {
1383  if (fileListIndex_ < fileNames_.size()) {
1384  nextFile = fileNames_[fileListIndex_];
1385  if (nextFile.find("file://")==0) nextFile=nextFile.substr(7);
1386  else if (nextFile.find("file:")==0) nextFile=nextFile.substr(5);
1387  boost::filesystem::path fileName = nextFile;
1388  std::string fileStem = fileName.stem().string();
1389  if (fileStem.find("ls")) fileStem = fileStem.substr(fileStem.find("ls")+2);
1390  if (fileStem.find("_")) fileStem = fileStem.substr(0,fileStem.find("_"));
1391 
1392  if (!fileListLoopMode_)
1393  ls = boost::lexical_cast<unsigned int>(fileStem);
1394  else //always starting from LS 1 in loop mode
1395  ls = 1 + loopModeIterationInc_;
1396 
1397  //fsize = 0;
1398  //lockWaitTime = 0;
1399  fileListIndex_++;
1401  }
1402  else {
1403  if (!fileListLoopMode_)
1405  else {
1406  //loop through files until interrupted
1408  fileListIndex_=0;
1409  return getFile(ls,nextFile,fsize,lockWaitTime);
1410  }
1411  }
1412 }
#define LogDebug(id)
size
Write out results.
static const char runNumber_[]
Definition: start.py:1
void setComment(std::string const &value)
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection)
unsigned int lumi_
std::vector< std::string > fileNames_
unsigned int getgpshigh(const unsigned char *)
uint32_t chunkPosition_
std::condition_variable cvWakeup_
void read(edm::EventPrincipal &eventPrincipal) override
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:334
bool gtpe_board_sense(const unsigned char *p)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
uint8_t triggerType() const
Event Trigger type identifier.
Definition: FEDHeader.cc:17
bool crc32c_hw_test()
Definition: crc32c.cc:354
unsigned int offset_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
static const uint32_t length
Definition: FEDTrailer.h:61
tbb::concurrent_queue< unsigned int > workerPool_
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
static const uint32_t length
Definition: FEDHeader.h:54
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void setInState(FastMonitoringThread::InputState inputState)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
volatile std::atomic< bool > shutdown_flag
uint16_t sourceID() const
Identifier of the FED.
Definition: FEDHeader.cc:32
evf::EvFDaqDirector::FileStatus status_
void setInStateSup(FastMonitoringThread::InputState inputState)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:497
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
void createProcessingNotificationMaybe() const
std::unique_ptr< std::thread > readSupervisorThread_
ProductProvenance const & dummyProvenance() const
int grabNextJsonFileAndUnlock(boost::filesystem::path const &jsonSourcePath)
std::string getEoLSFilePathOnBU(const unsigned int ls) const
#define nullptr
int timeout
Definition: mps_check.py:52
std::vector< std::condition_variable * > cvReader_
uint32_t fragmentLength() const
The length of the event fragment counted in 64-bit words including header and trailer.
Definition: FEDTrailer.cc:17
FedRawDataInputSource * parent_
bool useFileBroker() const
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:211
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
U second(std::pair< T, U > const &p)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void setComment(std::string const &value)
uint32_t bufferPosition_
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
Definition: TCDSRaw.h:16
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
Definition: FEDRawData.cc:32
bool advance(unsigned char *&dataPosition, const size_t size)
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:341
std::vector< int > streamFileTracker_
void moveToPreviousChunk(const size_t size, const size_t offset)
unsigned char * buf_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
StreamID streamID() const
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
#define end
Definition: vmac.h:39
T min(T a, T b)
Definition: MathUtil.h:58
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
std::map< unsigned int, unsigned int > sourceEventsReport_
bool waitForChunk(unsigned int chunkid)
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
unsigned long long TimeValue_t
Definition: Timestamp.h:28
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:366
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
void setInputSource(FedRawDataInputSource *inputSource)
std::atomic< bool > readComplete_
Definition: init.py:1
unsigned long long int rval
Definition: vlib.h:22
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:503
def ls(path, rec=False)
Definition: eostools.py:349
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
def getRunNumber(filename)
unsigned int getLumisectionToStart() const
unsigned long long uint64_t
Definition: Time.h:15
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
def load(fileName)
Definition: svgfig.py:546
std::string fileName_
double b
Definition: hdecay.h:120
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:212
void setProcessHistoryID(ProcessHistoryID const &phid)
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:246
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:349
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:335
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
HLT enums.
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:337
void readWorker(unsigned int tid)
double a
Definition: hdecay.h:121
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
unsigned int gtpe_get(const unsigned char *)
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
std::string getEoLSFilePathOnFU(const unsigned int ls) const
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
tbb::concurrent_queue< InputChunk * > freeChunks_
evf::EvFDaqDirector::FileStatus getNextEvent()
evf::EvFDaqDirector::FileStatus nextEvent()
void readNextChunkIntoBuffer(InputFile *file)
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:249
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
Power< A, B >::type pow(const A &a, const B &b)
Definition: Power.h:40
def move(src, dest)
Definition: eostools.py:511
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
Definition: event.py:1
std::string getEoRFilePathOnFU() const