CMS 3D CMS Logo

FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <memory>
5 #include <sstream>
6 #include <sys/types.h>
7 #include <sys/file.h>
8 #include <sys/time.h>
9 #include <unistd.h>
10 #include <vector>
11 #include <fstream>
12 #include <zlib.h>
13 #include <cstdio>
14 #include <chrono>
15 
16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
18 
19 
22 
24 
31 
35 
37 
41 
43 
48 
49 //JSON file reader
51 
52 #include <boost/lexical_cast.hpp>
53 
54 using namespace jsoncollector;
55 
57  edm::InputSourceDescription const& desc) :
58  edm::RawInputSource(pset, desc),
59  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
60  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",32)*1048576),
61  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",32)*1048576),
62  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",2)),
63  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
64  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
65  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
66  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
67  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
68  fileNames_(pset.getUntrackedParameter<std::vector<std::string>> ("fileNames",std::vector<std::string>())),
69  fileListMode_(pset.getUntrackedParameter<bool> ("fileListMode", false)),
70  fileListLoopMode_(pset.getUntrackedParameter<bool> ("fileListLoopMode", false)),
71  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
72  fuOutputDir_(std::string()),
73  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
74  eventID_(),
75  processHistoryID_(),
76  currentLumiSection_(0),
77  tcds_pointer_(nullptr),
78  eventsThisLumi_(0),
79  dpd_(nullptr)
80 {
81  char thishost[256];
82  gethostname(thishost, 255);
83  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
84  << std::endl << (eventChunkSize_/1048576)
85  << " MB on host " << thishost;
86 
87  long autoRunNumber = -1;
88  if (fileListMode_) {
89  autoRunNumber = initFileList();
90  if (!fileListLoopMode_) {
91  if (autoRunNumber<0)
92  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
93  //override run number
94  runNumber_ = (edm::RunNumber_t)autoRunNumber;
95  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
96  }
97  }
98 
100  setNewRun();
101  //todo:autodetect from file name (assert if names differ)
104 
105  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
106  defPath_ = std::string(getenv("CMSSW_BASE")) + "/" + defPathSuffix;
107  struct stat statbuf;
108  if (stat(defPath_.c_str(), &statbuf)) {
109  defPath_ = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
110  if (stat(defPath_.c_str(), &statbuf)) {
111  defPath_ = defPathSuffix;
112  }
113  }
114 
115  dpd_ = new DataPointDefinition();
116  std::string defLabel = "data";
117  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
118 
119  //make sure that chunk size is N * block size
124 
125  if (!numBuffers_)
126  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
127  "no reading enabled with numBuffers parameter 0";
128 
132 
133  if (!crc32c_hw_test())
134  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
135 
136  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
137  if (fileListMode_) {
138  try {
140  } catch(cms::Exception e) {
141  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
142  }
143  }
144  else {
146  if (!fms_) {
147  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
148  }
149  }
150 
152  if (!daqDirector_)
153  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
154 
155  //set DaqDirector to delete files in preGlobalEndLumi callback
157  if (fms_) {
159  fms_->setInputSource(this);
162  }
163  //should delete chunks when run stops
164  for (unsigned int i=0;i<numBuffers_;i++) {
166  }
167 
168  quit_threads_ = false;
169 
170  for (unsigned int i=0;i<numConcurrentReads_;i++)
171  {
172  std::unique_lock<std::mutex> lk(startupLock_);
173  //issue a memory fence here and in threads (constructor was segfaulting without this)
174  thread_quit_signal.push_back(false);
175  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
176  cvReader_.push_back(new std::condition_variable);
177  tid_active_.push_back(0);
178  threadInit_.store(false,std::memory_order_release);
179  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
180  startupCv_.wait(lk);
181  }
182 
183  runAuxiliary()->setProcessHistoryID(processHistoryID_);
184 }
185 
187 {
188  quit_threads_=true;
189 
190  //delete any remaining open files
191  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
192  deleteFile(it->second->fileName_);
193  delete it->second;
194  }
196  readSupervisorThread_->join();
197  }
198  else {
199  //join aux threads in case the supervisor thread was not started
200  for (unsigned int i=0;i<workerThreads_.size();i++) {
201  std::unique_lock<std::mutex> lk(mReader_);
202  thread_quit_signal[i]=true;
203  cvReader_[i]->notify_one();
204  lk.unlock();
205  workerThreads_[i]->join();
206  delete workerThreads_[i];
207  }
208  }
209  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
210  /*
211  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
212  InputChunk *ch;
213  while (!freeChunks_.try_pop(ch)) {}
214  delete ch;
215  }
216  */
217 }
218 
220 {
222  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
223  desc.addUntracked<unsigned int> ("eventChunkSize",32)->setComment("Input buffer (chunk) size");
224  desc.addUntracked<unsigned int> ("eventChunkBlock",32)->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
225  desc.addUntracked<unsigned int> ("numBuffers",2)->setComment("Number of buffers used for reading input");
226  desc.addUntracked<unsigned int> ("maxBufferedFiles",2)->setComment("Maximum number of simultaneously buffered raw files");
227  desc.addUntracked<bool> ("verifyAdler32", true)->setComment("Verify event Adler32 checksum with FRDv3 or v4");
228  desc.addUntracked<bool> ("verifyChecksum", true)->setComment("Verify event CRC-32C checksum of FRDv5 or higher");
229  desc.addUntracked<bool> ("useL1EventID", false)->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
230  desc.addUntracked<bool> ("fileListMode", false)->setComment("Use fileNames parameter to directly specify raw files to open");
231  desc.addUntracked<std::vector<std::string>> ("fileNames", std::vector<std::string>())->setComment("file list used when fileListMode is enabled");
232  desc.setAllowAnything();
233  descriptions.add("source", desc);
234 }
235 
237 {
239  {
240  //late init of directory variable
242 
243  //this thread opens new files and dispatches reading to worker readers
244  //threadInit_.store(false,std::memory_order_release);
245  std::unique_lock<std::mutex> lk(startupLock_);
246  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
248  startupCv_.wait(lk);
249  }
250  //signal hltd to start event accounting
254  switch (nextEvent() ) {
256  //maybe create EoL file in working directory before ending run
257  struct stat buf;
258  if ( currentLumiSection_ > 0) {
259  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
260  if (eolFound) {
262  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
263  if ( !found ) {
265  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
266  close(eol_fd);
268  }
269  }
270  }
271  //also create EoR file in FU data directory
272  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
273  if (!eorFound) {
274  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
275  close(eor_fd);
276  }
278  eventsThisLumi_=0;
280  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
281  return false;
282  }
284  //this is not reachable
285  return true;
286  }
288  //std::cout << "--------------NEW LUMI---------------" << std::endl;
289  return true;
290  }
291  default: {
292  if (!getLSFromFilename_) {
293  //get new lumi from file header
294  if (event_->lumi() > currentLumiSection_) {
296  eventsThisLumi_=0;
297  maybeOpenNewLumiSection( event_->lumi() );
298  }
299  }
302  else
303  eventRunNumber_=event_->run();
304  L1EventID_ = event_->event();
305 
306  setEventCached();
307 
308  return true;
309  }
310  }
311 }
312 
313 void FedRawDataInputSource::createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
314 {
315  //used for backpressure mechanisms and monitoring
316  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
317  struct stat buf;
318  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
319  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
320  close(bol_fd);
321  }
322 }
323 
324 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection)
325 {
327  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
328 
329  if ( currentLumiSection_ > 0) {
330  const std::string fuEoLS =
332  struct stat buf;
333  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
334  if ( !found ) {
336  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
337  close(eol_fd);
338  createBoLSFile(lumiSection,false);
340  }
341  }
342  else createBoLSFile(lumiSection,true);//needed for initial lumisection
343 
344  currentLumiSection_ = lumiSection;
345 
347 
348  timeval tv;
349  gettimeofday(&tv, nullptr);
350  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
351 
352  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
354  runAuxiliary()->run(),
355  lumiSection, lsopentime,
357 
358  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
359  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
360 
361  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
362  }
363 }
364 
366 {
368  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
369  {
370  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
371  }
372  return status;
373 }
374 
376 {
377 
379  if (!currentFile_)
380  {
383  if (!fileQueue_.try_pop(currentFile_))
384  {
385  //sleep until wakeup (only in single-buffer mode) or timeout
386  std::unique_lock<std::mutex> lkw(mWakeup_);
387  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
389  }
390  status = currentFile_->status_;
391  if ( status == evf::EvFDaqDirector::runEnded)
392  {
394  delete currentFile_;
395  currentFile_=nullptr;
396  return status;
397  }
398  else if ( status == evf::EvFDaqDirector::runAbort)
399  {
400  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
401  }
402  else if (status == evf::EvFDaqDirector::newLumi)
403  {
405  if (getLSFromFilename_) {
408  eventsThisLumi_=0;
410  }
411  }
412  else {//let this be picked up from next event
414  }
415 
416  delete currentFile_;
417  currentFile_=nullptr;
418  return status;
419  }
420  else if (status == evf::EvFDaqDirector::newFile) {
422  }
423  else
424  assert(0);
425  }
427 
428  //file is empty
429  if (!currentFile_->fileSize_) {
431  //try to open new lumi
432  assert(currentFile_->nChunks_==0);
433  if (getLSFromFilename_)
436  eventsThisLumi_=0;
438  }
439  //immediately delete empty file
441  delete currentFile_;
442  currentFile_=nullptr;
444  }
445 
446  //file is finished
449  //release last chunk (it is never released elsewhere)
452  {
453  throw cms::Exception("FedRawDataInputSource::getNextEvent")
454  << "Fully processed " << currentFile_->nProcessed_
455  << " from the file " << currentFile_->fileName_
456  << " but according to BU JSON there should be "
457  << currentFile_->nEvents_ << " events";
458  }
459  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
460  if (singleBufferMode_) {
461  std::unique_lock<std::mutex> lkw(mWakeup_);
462  cvWakeup_.notify_one();
463  }
466  //put the file in pending delete list;
467  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
468  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
469  }
470  else {
471  //in single-thread and stream jobs, events are already processed
473  delete currentFile_;
474  }
475  currentFile_=nullptr;
477  }
478 
479 
480  //file is too short
482  {
483  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
484  "Premature end of input file while reading event header";
485  }
486  if (singleBufferMode_) {
487 
488  //should already be there
491  usleep(10000);
493  }
495 
496  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
497 
498  //conditions when read amount is not sufficient for the header to fit
499  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
501  {
503 
504  if (detectedFRDversion_==0) {
505  detectedFRDversion_=*((uint32*)dataPosition);
506  if (detectedFRDversion_>5)
507  throw cms::Exception("FedRawDataInputSource::getNextEvent")
508  << "Unknown FRD version -: " << detectedFRDversion_;
509  assert(detectedFRDversion_>=1);
510  }
511 
512  //recalculate chunk position
513  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
514  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
515  {
516  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
517  "Premature end of input file while reading event header";
518  }
519  }
520 
521  event_.reset( new FRDEventMsgView(dataPosition) );
522  if (event_->size()>eventChunkSize_) {
523  throw cms::Exception("FedRawDataInputSource::getNextEvent")
524  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
525  << " run:" << event_->run() << " of size:" << event_->size()
526  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
527  }
528 
529  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
530 
532  {
533  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
534  "Premature end of input file while reading event data";
535  }
536  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
538  //recalculate chunk position
539  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
540  event_.reset( new FRDEventMsgView(dataPosition) );
541  }
542  currentFile_->bufferPosition_ += event_->size();
543  currentFile_->chunkPosition_ += event_->size();
544  //last chunk is released when this function is invoked next time
545 
546  }
547  //multibuffer mode:
548  else
549  {
550  //wait for the current chunk to become added to the vector
553  usleep(10000);
555  }
557 
558  //check if header is at the boundary of two chunks
559  chunkIsFree_ = false;
560  unsigned char *dataPosition;
561 
562  //read header, copy it to a single chunk if necessary
563  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
564 
565  event_.reset( new FRDEventMsgView(dataPosition) );
566  if (event_->size()>eventChunkSize_) {
567  throw cms::Exception("FedRawDataInputSource::getNextEvent")
568  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
569  << " run:" << event_->run() << " of size:" << event_->size()
570  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
571  }
572 
573  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
574 
576  {
577  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
578  "Premature end of input file while reading event data";
579  }
580 
581  if (chunkEnd) {
582  //header was at the chunk boundary, we will have to move payload as well
583  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
584  chunkIsFree_ = true;
585  }
586  else {
587  //header was contiguous, but check if payload fits the chunk
588  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
589  //rewind to header start position
590  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
591  //copy event to a chunk start and move pointers
592  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
593  assert(chunkEnd);
594  chunkIsFree_=true;
595  //header is moved
596  event_.reset( new FRDEventMsgView(dataPosition) );
597  }
598  else {
599  //everything is in a single chunk, only move pointers forward
600  chunkEnd = currentFile_->advance(dataPosition,msgSize);
601  assert(!chunkEnd);
602  chunkIsFree_=false;
603  }
604  }
605  }//end multibuffer mode
607 
608  if (verifyChecksum_ && event_->version() >= 5)
609  {
610  uint32_t crc=0;
611  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
612  if ( crc != event_->crc32c() ) {
614  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
615  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
616  " but calculated 0x" << crc;
617  }
618  }
619  else if ( verifyAdler32_ && event_->version() >= 3)
620  {
621  uint32_t adler = adler32(0L,Z_NULL,0);
622  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
623 
624  if ( adler != event_->adler32() ) {
626  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
627  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
628  " but calculated 0x" << adler;
629  }
630  }
632 
634 
636 }
637 
639 {
640  //no deletion in file list mode
641  if (fileListMode_) return;
642 
643  const boost::filesystem::path filePath(fileName);
644  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
645  try {
646  //sometimes this fails but file gets deleted
647  boost::filesystem::remove(filePath);
648  }
649  catch (const boost::filesystem::filesystem_error& ex)
650  {
651  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
652  << ". Trying again.";
653  usleep(100000);
654  try {
655  boost::filesystem::remove(filePath);
656  }
657  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
658  }
659  catch (std::exception& ex)
660  {
661  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
662  << ". Trying again.";
663  usleep(100000);
664  try {
665  boost::filesystem::remove(filePath);
666  } catch (std::exception&) {/*file gets deleted first time but exception is still thrown*/}
667  }
668 }
669 
670 
672 {
674  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
675  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
676 
677  if (useL1EventID_){
679  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
682  makeEvent(eventPrincipal, aux);
683  }
684  else if(tcds_pointer_==nullptr){
685  assert(GTPEventID_);
687  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
690  makeEvent(eventPrincipal, aux);
691  }
692  else{
693  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_);
698  makeEvent(eventPrincipal, aux);
699  }
700 
701 
702 
703  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
704 
705  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
707 
708  eventsThisLumi_++;
710 
711  //resize vector if needed
712  while (streamFileTracker_.size() <= eventPrincipal.streamID())
713  streamFileTracker_.push_back(-1);
714 
716 
717  //this old file check runs no more often than every 10 events
718  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
719  //delete files that are not in processing
720  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
721  auto it = filesToDelete_.begin();
722  while (it!=filesToDelete_.end()) {
723  bool fileIsBeingProcessed = false;
724  for (unsigned int i=0;i<nStreams_;i++) {
725  if (it->first == streamFileTracker_.at(i)) {
726  fileIsBeingProcessed = true;
727  break;
728  }
729  }
730  if (!fileIsBeingProcessed) {
731  deleteFile(it->second->fileName_);
732  delete it->second;
733  it = filesToDelete_.erase(it);
734  }
735  else it++;
736  }
737 
738  }
740  chunkIsFree_=false;
742  return;
743 }
744 
746 {
748  timeval stv;
749  gettimeofday(&stv,nullptr);
750  time = stv.tv_sec;
751  time = (time << 32) + stv.tv_usec;
752  edm::Timestamp tstamp(time);
753 
754  uint32_t eventSize = event_->eventSize();
755  char* event = (char*)event_->payload();
756  GTPEventID_=0;
757  tcds_pointer_ = nullptr;
758  while (eventSize > 0) {
759  assert(eventSize>=sizeof(fedt_t));
760  eventSize -= sizeof(fedt_t);
761  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
762  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
763  assert(eventSize>=fedSize - sizeof(fedt_t));
764  eventSize -= (fedSize - sizeof(fedt_t));
765  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
766  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
767  if(fedId>FEDNumbering::MAXFEDID)
768  {
769  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
770  }
771  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
772  tcds_pointer_ = (unsigned char *)(event + eventSize );
773  }
774  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
775  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
776  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
777  else
778  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
779  //evf::evtn::evm_board_setformat(fedSize);
780  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
781  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
782  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
783  }
784  //take event ID from GTPE FED
785  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
786  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
787  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
788  }
789  }
790  FEDRawData& fedData = rawData.FEDData(fedId);
791  fedData.resize(fedSize);
792  memcpy(fedData.data(), event + eventSize, fedSize);
793  }
794  assert(eventSize == 0);
795 
796  return tstamp;
797 }
798 
800 {
802  try {
803  // assemble json destination path
805 
806  //TODO:should be ported to use fffnaming
807  std::ostringstream fileNameWithPID;
808  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
809  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
810  jsonDestPath /= fileNameWithPID.str();
811 
812  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
813  << jsonDestPath;
814  try {
815  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
816  }
817  catch (const boost::filesystem::filesystem_error& ex)
818  {
819  // Input dir gone?
820  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
821  // << " Maybe the file is not yet visible by FU. Trying again in one second";
822  sleep(1);
823  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
824  }
826 
827  try {
828  //sometimes this fails but file gets deleted
829  boost::filesystem::remove(jsonSourcePath);
830  }
831  catch (const boost::filesystem::filesystem_error& ex)
832  {
833  // Input dir gone?
834  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
835  }
836  catch (std::exception& ex)
837  {
838  // Input dir gone?
839  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
840  }
841 
842  boost::filesystem::ifstream ij(jsonDestPath);
843  Json::Value deserializeRoot;
845 
846  std::stringstream ss;
847  ss << ij.rdbuf();
848  if (!reader.parse(ss.str(), deserializeRoot)) {
849  edm::LogError("FedRawDataInputSource") << "Failed to deserialize JSON file -: " << jsonDestPath
850  << "\nERROR:\n" << reader.getFormatedErrorMessages()
851  << "CONTENT:\n" << ss.str()<<".";
852  throw std::runtime_error("Cannot deserialize input JSON file");
853  }
854 
855  //read BU JSON
857  DataPoint dp;
858  dp.deserialize(deserializeRoot);
859  bool success = false;
860  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
861  if (dpd_->getNames().at(i)=="NEvents")
862  if (i<dp.getData().size()) {
863  data = dp.getData()[i];
864  success=true;
865  }
866  }
867  if (!success) {
868  if (!dp.getData().empty())
869  data = dp.getData()[0];
870  else
871  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
872  " error reading number of events from BU JSON -: No input value " << data;
873  }
874  return boost::lexical_cast<int>(data);
875 
876  }
877  catch (const boost::filesystem::filesystem_error& ex)
878  {
879  // Input dir gone?
881  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
882  }
883  catch (std::runtime_error e)
884  {
885  // Another process grabbed the file and NFS did not register this
887  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
888  }
889 
890  catch( boost::bad_lexical_cast const& ) {
891  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
892  << "Input value is -: " << data;
893  }
894 
895  catch (std::exception e)
896  {
897  // BU run directory disappeared?
899  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
900  }
901 
902  return -1;
903 }
904 
906 {}
907 
908 
910 {
911  bool stop=false;
912  unsigned int currentLumiSection = 0;
913  //threadInit_.exchange(true,std::memory_order_acquire);
914 
915  {
916  std::unique_lock<std::mutex> lk(startupLock_);
917  startupCv_.notify_one();
918  }
919 
920  uint32_t ls=0;
921  uint32_t monLS=1;
922  uint32_t lockCount=0;
923  uint64_t sumLockWaitTimeUs=0.;
924 
925  while (!stop) {
926 
927  //wait for at least one free thread and chunk
928  int counter=0;
930  {
931  //report state to monitoring
932  if (fms_) {
933  bool copy_active=false;
934  for (auto j : tid_active_) if (j) copy_active=true;
936  else if (freeChunks_.empty()) {
937  if (copy_active)
939  else
941  }
942  else {
943  if (copy_active)
945  else
947  }
948  }
949  std::unique_lock<std::mutex> lkw(mWakeup_);
950  //sleep until woken up by condition or a timeout
951  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
952  counter++;
953  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
954  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
955  }
956  else {
957  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
958  }
959  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
960  }
961 
962  if (stop) break;
963 
964  //look for a new file
965  std::string nextFile;
966  uint32_t fileSize;
967 
968  if (fms_) {
972  }
973 
975 
976  while (status == evf::EvFDaqDirector::noFile) {
977  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
978  stop=true;
979  break;
980  }
981 
982  uint64_t thisLockWaitTimeUs=0.;
983  if (fileListMode_) {
984  //return LS if LS not set, otherwise return file
985  status = getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
986  }
987  else
988  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
989 
991 
992  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
993 
994  //monitoring of lock wait time
995  if (thisLockWaitTimeUs>0.)
996  sumLockWaitTimeUs+=thisLockWaitTimeUs;
997  lockCount++;
998  if (ls>monLS) {
999  monLS=ls;
1000  if (lockCount)
1001  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
1002  lockCount=0;
1003  sumLockWaitTimeUs=0;
1004  }
1005 
1006  //check again for any remaining index/EoLS files after EoR file is seen
1007  if ( status == evf::EvFDaqDirector::runEnded && !fileListMode_) {
1009  usleep(100000);
1010  //now all files should have appeared in ramdisk, check again if any raw files were left behind
1011  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
1012  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
1013  }
1014 
1015  if ( status == evf::EvFDaqDirector::runEnded) {
1017  stop=true;
1018  break;
1019  }
1020 
1021  //queue new lumisection
1022  if( getLSFromFilename_ && ls > currentLumiSection) {
1023  //fms_->setInStateSup(evf::FastMonitoringThread::inSupNewLumi);
1024  currentLumiSection = ls;
1025  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
1026  }
1027 
1028  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
1029  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
1031  stop=true;
1032  break;
1033  }
1034 
1035  int dbgcount=0;
1036  if (status == evf::EvFDaqDirector::noFile) {
1038  dbgcount++;
1039  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1040  usleep(100000);
1041  }
1042  }
1043  if ( status == evf::EvFDaqDirector::newFile ) {
1045  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1046 
1047 
1048  boost::filesystem::path rawFilePath(nextFile);
1049  std::string rawFile = rawFilePath.replace_extension(".raw").string();
1050 
1051  struct stat st;
1052  int stat_res = stat(rawFile.c_str(),&st);
1053  if (stat_res==-1) {
1054  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-"<< rawFile << std::endl;
1055  setExceptionState_=true;
1056  break;
1057  }
1058  fileSize=st.st_size;
1059 
1060  if (fms_) {
1064  }
1065  int eventsInNewFile;
1066  if (fileListMode_) {
1067  if (fileSize==0) eventsInNewFile=0;
1068  else eventsInNewFile=-1;
1069  }
1070  else {
1071  eventsInNewFile = grabNextJsonFile(nextFile);
1072  assert( eventsInNewFile>=0 );
1073  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1074  }
1075 
1076  if (!singleBufferMode_) {
1077  //calculate number of needed chunks
1078  unsigned int neededChunks = fileSize/eventChunkSize_;
1079  if (fileSize%eventChunkSize_) neededChunks++;
1080 
1081  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
1083  fileQueue_.push(newInputFile);
1084 
1085  for (unsigned int i=0;i<neededChunks;i++) {
1086 
1087  if (fms_) {
1088  bool copy_active=false;
1089  for (auto j : tid_active_) if (j) copy_active=true;
1090  if (copy_active)
1092  else
1094  }
1095  //get thread
1096  unsigned int newTid = 0xffffffff;
1097  while (!workerPool_.try_pop(newTid)) {
1098  usleep(100000);
1099  }
1100 
1101  if (fms_) {
1102  bool copy_active=false;
1103  for (auto j : tid_active_) if (j) copy_active=true;
1104  if (copy_active)
1106  else
1108  }
1109  InputChunk * newChunk = nullptr;
1110  while (!freeChunks_.try_pop(newChunk)) {
1111  usleep(100000);
1112  if (quit_threads_.load(std::memory_order_relaxed)) break;
1113  }
1114 
1115  if (newChunk == nullptr) {
1116  //return unused tid if we received shutdown (nullptr chunk)
1117  if (newTid!=0xffffffff) workerPool_.push(newTid);
1118  stop = true;
1119  break;
1120  }
1122 
1123  std::unique_lock<std::mutex> lk(mReader_);
1124 
1125  unsigned int toRead = eventChunkSize_;
1126  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1127  newChunk->reset(i*eventChunkSize_,toRead,i);
1128 
1129  workerJob_[newTid].first=newInputFile;
1130  workerJob_[newTid].second=newChunk;
1131 
1132  //wake up the worker thread
1133  cvReader_[newTid]->notify_one();
1134  }
1135  }
1136  else {
1137  if (!eventsInNewFile) {
1138  //still queue file for lumi update
1139  std::unique_lock<std::mutex> lkw(mWakeup_);
1140  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1142  fileQueue_.push(newInputFile);
1143  cvWakeup_.notify_one();
1144  break;
1145  }
1146  //in single-buffer mode put single chunk in the file and let the main thread read the file
1147  InputChunk * newChunk;
1148  //should be available immediately
1149  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1150 
1151  std::unique_lock<std::mutex> lkw(mWakeup_);
1152 
1153  unsigned int toRead = eventChunkSize_;
1154  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1155  newChunk->reset(0,toRead,0);
1156  newChunk->readComplete_=true;
1157 
1158  //push file and wakeup main thread
1159  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1160  newInputFile->chunks_[0]=newChunk;
1162  fileQueue_.push(newInputFile);
1163  cvWakeup_.notify_one();
1164  }
1165  }
1166  }
1168  //make sure threads finish reading
1169  unsigned numFinishedThreads = 0;
1170  while (numFinishedThreads < workerThreads_.size()) {
1171  unsigned int tid;
1172  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1173  std::unique_lock<std::mutex> lk(mReader_);
1174  thread_quit_signal[tid]=true;
1175  cvReader_[tid]->notify_one();
1176  numFinishedThreads++;
1177  }
1178  for (unsigned int i=0;i<workerThreads_.size();i++) {
1179  workerThreads_[i]->join();
1180  delete workerThreads_[i];
1181  }
1182 }
1183 
1185 {
1186  bool init = true;
1187  threadInit_.exchange(true,std::memory_order_acquire);
1188 
1189  while (true) {
1190 
1191  tid_active_[tid]=false;
1192  std::unique_lock<std::mutex> lk(mReader_);
1193  workerJob_[tid].first=nullptr;
1194  workerJob_[tid].first=nullptr;
1195 
1196  assert(!thread_quit_signal[tid]);//should never get it here
1197  workerPool_.push(tid);
1198 
1199  if (init) {
1200  std::unique_lock<std::mutex> lk(startupLock_);
1201  init = false;
1202  startupCv_.notify_one();
1203  }
1204  cvReader_[tid]->wait(lk);
1205 
1206  if (thread_quit_signal[tid]) return;
1207  tid_active_[tid]=true;
1208 
1209  InputFile * file;
1210  InputChunk * chunk;
1211 
1212  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1213 
1214  file = workerJob_[tid].first;
1215  chunk = workerJob_[tid].second;
1216 
1217  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1218  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1219 
1220 
1221  if (fileDescriptor>=0)
1222  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1223  else
1224  {
1225  edm::LogError("FedRawDataInputSource") <<
1226  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1227  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1228  setExceptionState_=true;
1229  return;
1230 
1231  }
1232 
1233  unsigned int bufferLeft = 0;
1235  for (unsigned int i=0;i<readBlocks_;i++)
1236  {
1237  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1238  if ( last > 0 )
1239  bufferLeft+=last;
1240  if (last < eventChunkBlock_) {
1241  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1242  break;
1243  }
1244  }
1246  auto diff = end-start;
1247  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1248  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1249  close(fileDescriptor);
1250 
1251  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1252  assert(detectedFRDversion_<=5);
1253  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1254  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1255 
1256  }
1257 }
1258 
1260 {
1261  quit_threads_=true;
1262  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1263 
1264 }
1265 
1266 
1267 inline bool InputFile::advance(unsigned char* & dataPosition, const size_t size)
1268 {
1269  //wait for chunk
1270  while (!waitForChunk(currentChunk_)) {
1271  usleep(100000);
1272  if (parent_->exceptionState()) parent_->threadError();
1273  }
1274 
1275  dataPosition = chunks_[currentChunk_]->buf_+ chunkPosition_;
1276  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1277 
1278  if (currentLeft < size) {
1279 
1280  //we need next chunk
1281  while (!waitForChunk(currentChunk_+1)) {
1282  usleep(100000);
1283  if (parent_->exceptionState()) parent_->threadError();
1284  }
1285  //copy everything to beginning of the first chunk
1286  dataPosition-=chunkPosition_;
1287  assert(dataPosition==chunks_[currentChunk_]->buf_);
1288  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_+chunkPosition_, currentLeft);
1289  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_+1]->buf_, size - currentLeft);
1290  //set pointers at the end of the old data position
1291  bufferPosition_+=size;
1292  chunkPosition_=size-currentLeft;
1293  currentChunk_++;
1294  return true;
1295  }
1296  else {
1297  chunkPosition_+=size;
1298  bufferPosition_+=size;
1299  return false;
1300  }
1301 }
1302 
1303 inline void InputFile::moveToPreviousChunk(const size_t size, const size_t offset)
1304 {
1305  //this will fail in case of events that are too large
1306  assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1307  assert(size - offset < chunks_[currentChunk_]->size_);
1308  memcpy(chunks_[currentChunk_-1]->buf_+offset,chunks_[currentChunk_]->buf_+chunkPosition_,size);
1309  chunkPosition_+=size;
1310  bufferPosition_+=size;
1311 }
1312 
1313 inline void InputFile::rewindChunk(const size_t size) {
1314  chunkPosition_-=size;
1315  bufferPosition_-=size;
1316 }
1317 
1318 //single-buffer mode file reading
1320 {
1321 
1322  if (fileDescriptor_<0) {
1323  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1324  bufferInputRead_ = 0;
1325  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1326  if (fileDescriptor_>=0)
1327  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1328  else
1329  {
1330  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1331  << file->fileName_ << " fd:" << fileDescriptor_;
1332  }
1333  }
1334 
1335  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1336  uint32_t existingSize = 0;
1337  for (unsigned int i=0;i<readBlocks_;i++)
1338  {
1339  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1341  existingSize+=last;
1342  }
1343  }
1344  else {
1345  const uint32_t chunksize = file->chunkPosition_;
1346  const uint32_t blockcount=chunksize/eventChunkBlock_;
1347  const uint32_t leftsize = chunksize%eventChunkBlock_;
1348  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1349  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1350 
1351  for (uint32_t i=0;i<blockcount;i++) {
1352  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1354  existingSize+=last;
1355  }
1356  if (leftsize) {
1357  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1359  }
1360  file->chunkPosition_=0;//data was moved to beginning of the chunk
1361  }
1362  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1363  if (fileDescriptor_!=-1)
1364  {
1365  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1366  close(fileDescriptor_);
1367  fileDescriptor_=-1;
1368  }
1369  }
1370 }
1371 
1372 
1374 {
1375 
1376  std::lock_guard<std::mutex> lock(monlock_);
1377  auto itr = sourceEventsReport_.find(lumi);
1378  if (itr!=sourceEventsReport_.end())
1379  itr->second+=events;
1380  else
1381  sourceEventsReport_[lumi]=events;
1382 }
1383 
1384 std::pair<bool,unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase)
1385 {
1386  std::lock_guard<std::mutex> lock(monlock_);
1387  auto itr = sourceEventsReport_.find(lumi);
1388  if (itr!=sourceEventsReport_.end()) {
1389  auto && ret = std::pair<bool,unsigned int>(true,itr->second);
1390  if (erase)
1391  sourceEventsReport_.erase(itr);
1392  return ret;
1393  }
1394  else
1395  return std::pair<bool,unsigned int>(false,0);
1396 }
1397 
1399 {
1400  std::sort(fileNames_.begin(),fileNames_.end(),
1401  [](std::string a, std::string b) {
1402  if (a.rfind("/")!=std::string::npos) a=a.substr(a.rfind("/"));
1403  if (b.rfind("/")!=std::string::npos) b=b.substr(b.rfind("/"));
1404  return b > a;});
1405 
1406  if (!fileNames_.empty()) {
1407  //get run number from first file in the vector
1409  std::string fileStem = fileName.stem().string();
1410  auto end = fileStem.find("_");
1411  if (fileStem.find("run")==0) {
1412  std::string runStr = fileStem.substr(3,end-3);
1413  try {
1414  //get long to support test run numbers < 2^32
1415  long rval = boost::lexical_cast<long>(runStr);
1416  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: "<< rval;
1417  return rval;
1418  }
1419  catch( boost::bad_lexical_cast const& ) {
1420  edm::LogWarning("FedRawDataInputSource") << "Unable to autodetect run number in fileListMode from file -: "<< fileName;
1421  }
1422  }
1423  }
1424  return -1;
1425 }
1426 
1427 evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile(unsigned int& ls, std::string& nextFile, uint32_t& fsize, uint64_t& lockWaitTime)
1428 {
1429  if (fileListIndex_ < fileNames_.size()) {
1430  nextFile = fileNames_[fileListIndex_];
1431  if (nextFile.find("file://")==0) nextFile=nextFile.substr(7);
1432  else if (nextFile.find("file:")==0) nextFile=nextFile.substr(5);
1433  boost::filesystem::path fileName = nextFile;
1434  std::string fileStem = fileName.stem().string();
1435  if (fileStem.find("ls")) fileStem = fileStem.substr(fileStem.find("ls")+2);
1436  if (fileStem.find("_")) fileStem = fileStem.substr(0,fileStem.find("_"));
1437 
1438  if (!fileListLoopMode_)
1439  ls = boost::lexical_cast<unsigned int>(fileStem);
1440  else //always starting from LS 1 in loop mode
1441  ls = 1 + loopModeIterationInc_;
1442 
1443  //fsize = 0;
1444  //lockWaitTime = 0;
1445  fileListIndex_++;
1447  }
1448  else {
1449  if (!fileListLoopMode_)
1451  else {
1452  //loop through files until interrupted
1454  fileListIndex_=0;
1455  return getFile(ls,nextFile,fsize,lockWaitTime);
1456  }
1457  }
1458 }
#define LogDebug(id)
size
Write out results.
static const char runNumber_[]
Definition: start.py:1
void setComment(std::string const &value)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
unsigned int lumi_
std::vector< std::string > fileNames_
unsigned int getgpshigh(const unsigned char *)
uint32_t chunkPosition_
std::condition_variable cvWakeup_
void read(edm::EventPrincipal &eventPrincipal) override
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:339
bool gtpe_board_sense(const unsigned char *p)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
bool crc32c_hw_test()
Definition: crc32c.cc:354
unsigned int offset_
jsoncollector::DataPointDefinition * dpd_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
tbb::concurrent_queue< unsigned int > workerPool_
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void setInState(FastMonitoringThread::InputState inputState)
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
volatile std::atomic< bool > shutdown_flag
int init
Definition: HydjetWrapper.h:67
evf::EvFDaqDirector::FileStatus status_
void setInStateSup(FastMonitoringThread::InputState inputState)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:504
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
std::unique_ptr< std::thread > readSupervisorThread_
ProductProvenance const & dummyProvenance() const
Represents a JSON value.
Definition: value.h:111
std::string getEoLSFilePathOnBU(const unsigned int ls) const
#define nullptr
int timeout
Definition: mps_check.py:51
std::vector< std::condition_variable * > cvReader_
FedRawDataInputSource * parent_
unsigned int sourceid
Definition: fed_header.h:32
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:209
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
U second(std::pair< T, U > const &p)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void setComment(std::string const &value)
uint32_t bufferPosition_
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
Definition: TCDSRaw.h:18
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
Definition: FEDRawData.cc:32
bool advance(unsigned char *&dataPosition, const size_t size)
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:346
std::vector< int > streamFileTracker_
void moveToPreviousChunk(const size_t size, const size_t offset)
unsigned char * buf_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
struct fedt_struct fedt_t
int grabNextJsonFile(boost::filesystem::path const &)
StreamID streamID() const
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
#define end
Definition: vmac.h:37
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
std::map< unsigned int, unsigned int > sourceEventsReport_
bool waitForChunk(unsigned int chunkid)
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
unsigned long long TimeValue_t
Definition: Timestamp.h:28
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:371
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
void setInputSource(FedRawDataInputSource *inputSource)
std::atomic< bool > readComplete_
unsigned long long int rval
Definition: vlib.h:22
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:510
def ls(path, rec=False)
Definition: eostools.py:348
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
def getRunNumber(filename)
unsigned long long uint64_t
Definition: Time.h:15
unsigned int currentChunk_
auto dp
Definition: deltaR.h:22
void deserialize(Json::Value &root) override
Definition: DataPoint.cc:56
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
def load(fileName)
Definition: svgfig.py:546
std::string fileName_
double b
Definition: hdecay.h:120
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:209
void setProcessHistoryID(ProcessHistoryID const &phid)
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:250
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:354
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:340
unsigned int eventsize
Definition: fed_trailer.h:33
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
std::string getBoLSFilePathOnFU(const unsigned int ls) const
HLT enums.
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:342
void readWorker(unsigned int tid)
double a
Definition: hdecay.h:121
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
bool emptyLumisectionMode() const
unsigned int gtpe_get(const unsigned char *)
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
tbb::concurrent_queue< InputChunk * > freeChunks_
evf::EvFDaqDirector::FileStatus getNextEvent()
evf::EvFDaqDirector::FileStatus nextEvent()
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID, bool verifyLumiSection)
void readNextChunkIntoBuffer(InputFile *file)
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:253
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
def move(src, dest)
Definition: eostools.py:510
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
Definition: event.py:1
std::string getEoRFilePathOnFU() const