CMS 3D CMS Logo

FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <memory>
5 #include <sstream>
6 #include <sys/types.h>
7 #include <sys/file.h>
8 #include <sys/time.h>
9 #include <unistd.h>
10 #include <vector>
11 #include <fstream>
12 #include <zlib.h>
13 #include <stdio.h>
14 #include <chrono>
15 
16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
18 
19 
22 
29 
33 
35 
39 
41 
46 
47 //JSON file reader
49 
50 #include <boost/lexical_cast.hpp>
51 
52 using namespace jsoncollector;
53 
55  edm::InputSourceDescription const& desc) :
56  edm::RawInputSource(pset, desc),
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",32)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",32)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",2)),
61  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
62  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
63  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
64  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
65  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
66  fileNames_(pset.getUntrackedParameter<std::vector<std::string>> ("fileNames",std::vector<std::string>())),
67  fileListMode_(pset.getUntrackedParameter<bool> ("fileListMode", false)),
68  fileListLoopMode_(pset.getUntrackedParameter<bool> ("fileListLoopMode", false)),
69  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
70  fuOutputDir_(std::string()),
71  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
72  eventID_(),
73  processHistoryID_(),
74  currentLumiSection_(0),
75  tcds_pointer_(0),
76  eventsThisLumi_(0),
77  dpd_(nullptr)
78 {
79  char thishost[256];
80  gethostname(thishost, 255);
81  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
82  << std::endl << (eventChunkSize_/1048576)
83  << " MB on host " << thishost;
84 
85  long autoRunNumber = -1;
86  if (fileListMode_) {
87  autoRunNumber = initFileList();
88  if (!fileListLoopMode_) {
89  if (autoRunNumber<0)
90  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
91  //override run number
92  runNumber_ = (edm::RunNumber_t)autoRunNumber;
93  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
94  }
95  }
96 
98  setNewRun();
99  //todo:autodetect from file name (assert if names differ)
102 
103  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
104  defPath_ = std::string(getenv("CMSSW_BASE")) + "/" + defPathSuffix;
105  struct stat statbuf;
106  if (stat(defPath_.c_str(), &statbuf)) {
107  defPath_ = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
108  if (stat(defPath_.c_str(), &statbuf)) {
109  defPath_ = defPathSuffix;
110  }
111  }
112 
113  dpd_ = new DataPointDefinition();
114  std::string defLabel = "data";
115  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
116 
117  //make sure that chunk size is N * block size
122 
123  if (!numBuffers_)
124  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
125  "no reading enabled with numBuffers parameter 0";
126 
130 
131  if (!crc32c_hw_test())
132  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
133 
134  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
135  if (fileListMode_) {
136  try {
138  } catch(cms::Exception e) {
139  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
140  }
141  }
142  else {
144  if (!fms_) {
145  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
146  }
147  }
148 
150  if (!daqDirector_)
151  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
152 
153  //set DaqDirector to delete files in preGlobalEndLumi callback
155  if (fms_) {
157  fms_->setInputSource(this);
160  }
161  //should delete chunks when run stops
162  for (unsigned int i=0;i<numBuffers_;i++) {
164  }
165 
166  quit_threads_ = false;
167 
168  for (unsigned int i=0;i<numConcurrentReads_;i++)
169  {
170  std::unique_lock<std::mutex> lk(startupLock_);
171  //issue a memory fence here and in threads (constructor was segfaulting without this)
172  thread_quit_signal.push_back(false);
173  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
174  cvReader_.push_back(new std::condition_variable);
175  tid_active_.push_back(0);
176  threadInit_.store(false,std::memory_order_release);
177  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
178  startupCv_.wait(lk);
179  }
180 
181  runAuxiliary()->setProcessHistoryID(processHistoryID_);
182 }
183 
185 {
186  quit_threads_=true;
187 
188  //delete any remaining open files
189  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
190  deleteFile(it->second->fileName_);
191  delete it->second;
192  }
194  readSupervisorThread_->join();
195  }
196  else {
197  //join aux threads in case the supervisor thread was not started
198  for (unsigned int i=0;i<workerThreads_.size();i++) {
199  std::unique_lock<std::mutex> lk(mReader_);
200  thread_quit_signal[i]=true;
201  cvReader_[i]->notify_one();
202  lk.unlock();
203  workerThreads_[i]->join();
204  delete workerThreads_[i];
205  }
206  }
207  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
208  /*
209  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
210  InputChunk *ch;
211  while (!freeChunks_.try_pop(ch)) {}
212  delete ch;
213  }
214  */
215 }
216 
218 {
220  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
221  desc.addUntracked<unsigned int> ("eventChunkSize",32)->setComment("Input buffer (chunk) size");
222  desc.addUntracked<unsigned int> ("eventChunkBlock",32)->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
223  desc.addUntracked<unsigned int> ("numBuffers",2)->setComment("Number of buffers used for reading input");
224  desc.addUntracked<unsigned int> ("maxBufferedFiles",2)->setComment("Maximum number of simultaneously buffered raw files");
225  desc.addUntracked<bool> ("verifyAdler32", true)->setComment("Verify event Adler32 checksum with FRDv3 or v4");
226  desc.addUntracked<bool> ("verifyChecksum", true)->setComment("Verify event CRC-32C checksum of FRDv5 or higher");
227  desc.addUntracked<bool> ("useL1EventID", false)->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
228  desc.addUntracked<bool> ("fileListMode", false)->setComment("Use fileNames parameter to directly specify raw files to open");
229  desc.addUntracked<std::vector<std::string>> ("fileNames", std::vector<std::string>())->setComment("file list used when fileListMode is enabled");
230  desc.setAllowAnything();
231  descriptions.add("source", desc);
232 }
233 
235 {
237  {
238  //late init of directory variable
240 
241  //this thread opens new files and dispatches reading to worker readers
242  //threadInit_.store(false,std::memory_order_release);
243  std::unique_lock<std::mutex> lk(startupLock_);
244  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
246  startupCv_.wait(lk);
247  }
248  //signal hltd to start event accounting
252  switch (nextEvent() ) {
254  //maybe create EoL file in working directory before ending run
255  struct stat buf;
256  if ( currentLumiSection_ > 0) {
257  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
258  if (eolFound) {
260  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
261  if ( !found ) {
263  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
264  close(eol_fd);
266  }
267  }
268  }
269  //also create EoR file in FU data directory
270  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
271  if (!eorFound) {
272  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
273  close(eor_fd);
274  }
276  eventsThisLumi_=0;
278  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
279  return false;
280  }
282  //this is not reachable
283  return true;
284  }
286  //std::cout << "--------------NEW LUMI---------------" << std::endl;
287  return true;
288  }
289  default: {
290  if (!getLSFromFilename_) {
291  //get new lumi from file header
292  if (event_->lumi() > currentLumiSection_) {
294  eventsThisLumi_=0;
295  maybeOpenNewLumiSection( event_->lumi() );
296  }
297  }
300  else
301  eventRunNumber_=event_->run();
302  L1EventID_ = event_->event();
303 
304  setEventCached();
305 
306  return true;
307  }
308  }
309 }
310 
311 void FedRawDataInputSource::createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
312 {
313  //used for backpressure mechanisms and monitoring
314  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
315  struct stat buf;
316  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
317  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
318  close(bol_fd);
319  }
320 }
321 
322 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection)
323 {
325  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
326 
327  if ( currentLumiSection_ > 0) {
328  const std::string fuEoLS =
330  struct stat buf;
331  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
332  if ( !found ) {
334  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
335  close(eol_fd);
336  createBoLSFile(lumiSection,false);
338  }
339  }
340  else createBoLSFile(lumiSection,true);//needed for initial lumisection
341 
342  currentLumiSection_ = lumiSection;
343 
345 
346  timeval tv;
347  gettimeofday(&tv, 0);
348  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
349 
350  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
352  runAuxiliary()->run(),
353  lumiSection, lsopentime,
355 
356  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
357  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
358 
359  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
360  }
361 }
362 
364 {
366  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
367  {
368  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
369  }
370  return status;
371 }
372 
374 {
375 
377  if (!currentFile_)
378  {
381  if (!fileQueue_.try_pop(currentFile_))
382  {
383  //sleep until wakeup (only in single-buffer mode) or timeout
384  std::unique_lock<std::mutex> lkw(mWakeup_);
385  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
387  }
388  status = currentFile_->status_;
389  if ( status == evf::EvFDaqDirector::runEnded)
390  {
392  delete currentFile_;
393  currentFile_=nullptr;
394  return status;
395  }
396  else if ( status == evf::EvFDaqDirector::runAbort)
397  {
398  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
399  }
400  else if (status == evf::EvFDaqDirector::newLumi)
401  {
403  if (getLSFromFilename_) {
406  eventsThisLumi_=0;
408  }
409  }
410  else {//let this be picked up from next event
412  }
413 
414  delete currentFile_;
415  currentFile_=nullptr;
416  return status;
417  }
418  else if (status == evf::EvFDaqDirector::newFile) {
420  }
421  else
422  assert(0);
423  }
425 
426  //file is empty
427  if (!currentFile_->fileSize_) {
429  //try to open new lumi
430  assert(currentFile_->nChunks_==0);
431  if (getLSFromFilename_)
434  eventsThisLumi_=0;
436  }
437  //immediately delete empty file
439  delete currentFile_;
440  currentFile_=nullptr;
442  }
443 
444  //file is finished
447  //release last chunk (it is never released elsewhere)
450  {
451  throw cms::Exception("FedRawDataInputSource::getNextEvent")
452  << "Fully processed " << currentFile_->nProcessed_
453  << " from the file " << currentFile_->fileName_
454  << " but according to BU JSON there should be "
455  << currentFile_->nEvents_ << " events";
456  }
457  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
458  if (singleBufferMode_) {
459  std::unique_lock<std::mutex> lkw(mWakeup_);
460  cvWakeup_.notify_one();
461  }
464  //put the file in pending delete list;
465  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
466  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
467  }
468  else {
469  //in single-thread and stream jobs, events are already processed
471  delete currentFile_;
472  }
473  currentFile_=nullptr;
475  }
476 
477 
478  //file is too short
480  {
481  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
482  "Premature end of input file while reading event header";
483  }
484  if (singleBufferMode_) {
485 
486  //should already be there
489  usleep(10000);
491  }
493 
494  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
495 
496  //conditions when read amount is not sufficient for the header to fit
497  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
499  {
501 
502  if (detectedFRDversion_==0) {
503  detectedFRDversion_=*((uint32*)dataPosition);
504  if (detectedFRDversion_>5)
505  throw cms::Exception("FedRawDataInputSource::getNextEvent")
506  << "Unknown FRD version -: " << detectedFRDversion_;
507  assert(detectedFRDversion_>=1);
508  }
509 
510  //recalculate chunk position
511  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
512  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
513  {
514  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
515  "Premature end of input file while reading event header";
516  }
517  }
518 
519  event_.reset( new FRDEventMsgView(dataPosition) );
520  if (event_->size()>eventChunkSize_) {
521  throw cms::Exception("FedRawDataInputSource::getNextEvent")
522  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
523  << " run:" << event_->run() << " of size:" << event_->size()
524  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
525  }
526 
527  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
528 
530  {
531  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
532  "Premature end of input file while reading event data";
533  }
534  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
536  //recalculate chunk position
537  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
538  event_.reset( new FRDEventMsgView(dataPosition) );
539  }
540  currentFile_->bufferPosition_ += event_->size();
541  currentFile_->chunkPosition_ += event_->size();
542  //last chunk is released when this function is invoked next time
543 
544  }
545  //multibuffer mode:
546  else
547  {
548  //wait for the current chunk to become added to the vector
551  usleep(10000);
553  }
555 
556  //check if header is at the boundary of two chunks
557  chunkIsFree_ = false;
558  unsigned char *dataPosition;
559 
560  //read header, copy it to a single chunk if necessary
561  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
562 
563  event_.reset( new FRDEventMsgView(dataPosition) );
564  if (event_->size()>eventChunkSize_) {
565  throw cms::Exception("FedRawDataInputSource::getNextEvent")
566  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
567  << " run:" << event_->run() << " of size:" << event_->size()
568  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
569  }
570 
571  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
572 
574  {
575  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
576  "Premature end of input file while reading event data";
577  }
578 
579  if (chunkEnd) {
580  //header was at the chunk boundary, we will have to move payload as well
581  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
582  chunkIsFree_ = true;
583  }
584  else {
585  //header was contiguous, but check if payload fits the chunk
586  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
587  //rewind to header start position
588  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
589  //copy event to a chunk start and move pointers
590  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
591  assert(chunkEnd);
592  chunkIsFree_=true;
593  //header is moved
594  event_.reset( new FRDEventMsgView(dataPosition) );
595  }
596  else {
597  //everything is in a single chunk, only move pointers forward
598  chunkEnd = currentFile_->advance(dataPosition,msgSize);
599  assert(!chunkEnd);
600  chunkIsFree_=false;
601  }
602  }
603  }//end multibuffer mode
605 
606  if (verifyChecksum_ && event_->version() >= 5)
607  {
608  uint32_t crc=0;
609  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
610  if ( crc != event_->crc32c() ) {
612  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
613  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
614  " but calculated 0x" << crc;
615  }
616  }
617  else if ( verifyAdler32_ && event_->version() >= 3)
618  {
619  uint32_t adler = adler32(0L,Z_NULL,0);
620  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
621 
622  if ( adler != event_->adler32() ) {
624  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
625  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
626  " but calculated 0x" << adler;
627  }
628  }
630 
632 
634 }
635 
637 {
638  //no deletion in file list mode
639  if (fileListMode_) return;
640 
641  const boost::filesystem::path filePath(fileName);
642  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
643  try {
644  //sometimes this fails but file gets deleted
645  boost::filesystem::remove(filePath);
646  }
647  catch (const boost::filesystem::filesystem_error& ex)
648  {
649  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
650  << ". Trying again.";
651  usleep(100000);
652  try {
653  boost::filesystem::remove(filePath);
654  }
655  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
656  }
657  catch (std::exception& ex)
658  {
659  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
660  << ". Trying again.";
661  usleep(100000);
662  try {
663  boost::filesystem::remove(filePath);
664  } catch (std::exception&) {/*file gets deleted first time but exception is still thrown*/}
665  }
666 }
667 
668 
670 {
672  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
673  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
674 
675  if (useL1EventID_){
677  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
680  makeEvent(eventPrincipal, aux);
681  }
682  else if(tcds_pointer_==0){
683  assert(GTPEventID_);
685  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
688  makeEvent(eventPrincipal, aux);
689  }
690  else{
691  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
696  makeEvent(eventPrincipal, aux);
697  }
698 
699 
700 
701  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
702 
703  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
705 
706  eventsThisLumi_++;
708 
709  //resize vector if needed
710  while (streamFileTracker_.size() <= eventPrincipal.streamID())
711  streamFileTracker_.push_back(-1);
712 
714 
715  //this old file check runs no more often than every 10 events
716  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
717  //delete files that are not in processing
718  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
719  auto it = filesToDelete_.begin();
720  while (it!=filesToDelete_.end()) {
721  bool fileIsBeingProcessed = false;
722  for (unsigned int i=0;i<nStreams_;i++) {
723  if (it->first == streamFileTracker_.at(i)) {
724  fileIsBeingProcessed = true;
725  break;
726  }
727  }
728  if (!fileIsBeingProcessed) {
729  deleteFile(it->second->fileName_);
730  delete it->second;
731  it = filesToDelete_.erase(it);
732  }
733  else it++;
734  }
735 
736  }
738  chunkIsFree_=false;
740  return;
741 }
742 
744 {
746  timeval stv;
747  gettimeofday(&stv,0);
748  time = stv.tv_sec;
749  time = (time << 32) + stv.tv_usec;
750  edm::Timestamp tstamp(time);
751 
752  uint32_t eventSize = event_->eventSize();
753  char* event = (char*)event_->payload();
754  GTPEventID_=0;
755  tcds_pointer_ = 0;
756  while (eventSize > 0) {
757  assert(eventSize>=sizeof(fedt_t));
758  eventSize -= sizeof(fedt_t);
759  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
760  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
761  assert(eventSize>=fedSize - sizeof(fedt_t));
762  eventSize -= (fedSize - sizeof(fedt_t));
763  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
764  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
765  if(fedId>FEDNumbering::MAXFEDID)
766  {
767  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
768  }
769  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
770  tcds_pointer_ = (unsigned char *)(event + eventSize );
771  }
772  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
773  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
774  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
775  else
776  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
777  //evf::evtn::evm_board_setformat(fedSize);
778  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
779  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
780  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
781  }
782  //take event ID from GTPE FED
783  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
784  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
785  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
786  }
787  }
788  FEDRawData& fedData = rawData.FEDData(fedId);
789  fedData.resize(fedSize);
790  memcpy(fedData.data(), event + eventSize, fedSize);
791  }
792  assert(eventSize == 0);
793 
794  return tstamp;
795 }
796 
798 {
800  try {
801  // assemble json destination path
803 
804  //TODO:should be ported to use fffnaming
805  std::ostringstream fileNameWithPID;
806  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
807  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
808  jsonDestPath /= fileNameWithPID.str();
809 
810  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
811  << jsonDestPath;
812  try {
813  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
814  }
815  catch (const boost::filesystem::filesystem_error& ex)
816  {
817  // Input dir gone?
818  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
819  // << " Maybe the file is not yet visible by FU. Trying again in one second";
820  sleep(1);
821  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
822  }
824 
825  try {
826  //sometimes this fails but file gets deleted
827  boost::filesystem::remove(jsonSourcePath);
828  }
829  catch (const boost::filesystem::filesystem_error& ex)
830  {
831  // Input dir gone?
832  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
833  }
834  catch (std::exception& ex)
835  {
836  // Input dir gone?
837  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
838  }
839 
840  boost::filesystem::ifstream ij(jsonDestPath);
841  Json::Value deserializeRoot;
843 
844  std::stringstream ss;
845  ss << ij.rdbuf();
846  if (!reader.parse(ss.str(), deserializeRoot)) {
847  edm::LogError("FedRawDataInputSource") << "Failed to deserialize JSON file -: " << jsonDestPath
848  << "\nERROR:\n" << reader.getFormatedErrorMessages()
849  << "CONTENT:\n" << ss.str()<<".";
850  throw std::runtime_error("Cannot deserialize input JSON file");
851  }
852 
853  //read BU JSON
855  DataPoint dp;
856  dp.deserialize(deserializeRoot);
857  bool success = false;
858  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
859  if (dpd_->getNames().at(i)=="NEvents")
860  if (i<dp.getData().size()) {
861  data = dp.getData()[i];
862  success=true;
863  }
864  }
865  if (!success) {
866  if (dp.getData().size())
867  data = dp.getData()[0];
868  else
869  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
870  " error reading number of events from BU JSON -: No input value " << data;
871  }
872  return boost::lexical_cast<int>(data);
873 
874  }
875  catch (const boost::filesystem::filesystem_error& ex)
876  {
877  // Input dir gone?
879  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
880  }
881  catch (std::runtime_error e)
882  {
883  // Another process grabbed the file and NFS did not register this
885  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
886  }
887 
888  catch( boost::bad_lexical_cast const& ) {
889  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
890  << "Input value is -: " << data;
891  }
892 
893  catch (std::exception e)
894  {
895  // BU run directory disappeared?
897  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
898  }
899 
900  return -1;
901 }
902 
904 {}
905 
906 
908 {
909  bool stop=false;
910  unsigned int currentLumiSection = 0;
911  //threadInit_.exchange(true,std::memory_order_acquire);
912 
913  {
914  std::unique_lock<std::mutex> lk(startupLock_);
915  startupCv_.notify_one();
916  }
917 
918  uint32_t ls=0;
919  uint32_t monLS=1;
920  uint32_t lockCount=0;
921  uint64_t sumLockWaitTimeUs=0.;
922 
923  while (!stop) {
924 
925  //wait for at least one free thread and chunk
926  int counter=0;
928  {
929  //report state to monitoring
930  if (fms_) {
931  bool copy_active=false;
932  for (auto j : tid_active_) if (j) copy_active=true;
934  else if (freeChunks_.empty()) {
935  if (copy_active)
937  else
939  }
940  else {
941  if (copy_active)
943  else
945  }
946  }
947  std::unique_lock<std::mutex> lkw(mWakeup_);
948  //sleep until woken up by condition or a timeout
949  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
950  counter++;
951  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
952  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
953  }
954  else {
955  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
956  }
957  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
958  }
959 
960  if (stop) break;
961 
962  //look for a new file
963  std::string nextFile;
964  uint32_t fileSize;
965 
966  if (fms_) {
970  }
971 
973 
974  while (status == evf::EvFDaqDirector::noFile) {
975  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
976  stop=true;
977  break;
978  }
979 
980  uint64_t thisLockWaitTimeUs=0.;
981  if (fileListMode_) {
982  //return LS if LS not set, otherwise return file
983  status = getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
984  }
985  else
986  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
987 
989 
990  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
991 
992  //monitoring of lock wait time
993  if (thisLockWaitTimeUs>0.)
994  sumLockWaitTimeUs+=thisLockWaitTimeUs;
995  lockCount++;
996  if (ls>monLS) {
997  monLS=ls;
998  if (lockCount)
999  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
1000  lockCount=0;
1001  sumLockWaitTimeUs=0;
1002  }
1003 
1004  //check again for any remaining index/EoLS files after EoR file is seen
1005  if ( status == evf::EvFDaqDirector::runEnded && !fileListMode_) {
1007  usleep(100000);
1008  //now all files should have appeared in ramdisk, check again if any raw files were left behind
1009  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
1010  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
1011  }
1012 
1013  if ( status == evf::EvFDaqDirector::runEnded) {
1015  stop=true;
1016  break;
1017  }
1018 
1019  //queue new lumisection
1020  if( getLSFromFilename_ && ls > currentLumiSection) {
1021  //fms_->setInStateSup(evf::FastMonitoringThread::inSupNewLumi);
1022  currentLumiSection = ls;
1023  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
1024  }
1025 
1026  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
1027  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
1029  stop=true;
1030  break;
1031  }
1032 
1033  int dbgcount=0;
1034  if (status == evf::EvFDaqDirector::noFile) {
1036  dbgcount++;
1037  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1038  usleep(100000);
1039  }
1040  }
1041  if ( status == evf::EvFDaqDirector::newFile ) {
1043  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1044 
1045 
1046  boost::filesystem::path rawFilePath(nextFile);
1047  std::string rawFile = rawFilePath.replace_extension(".raw").string();
1048 
1049  struct stat st;
1050  int stat_res = stat(rawFile.c_str(),&st);
1051  if (stat_res==-1) {
1052  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-"<< rawFile << std::endl;
1053  setExceptionState_=true;
1054  break;
1055  }
1056  fileSize=st.st_size;
1057 
1058  if (fms_) {
1062  }
1063  int eventsInNewFile;
1064  if (fileListMode_) {
1065  if (fileSize==0) eventsInNewFile=0;
1066  else eventsInNewFile=-1;
1067  }
1068  else {
1069  eventsInNewFile = grabNextJsonFile(nextFile);
1070  assert( eventsInNewFile>=0 );
1071  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1072  }
1073 
1074  if (!singleBufferMode_) {
1075  //calculate number of needed chunks
1076  unsigned int neededChunks = fileSize/eventChunkSize_;
1077  if (fileSize%eventChunkSize_) neededChunks++;
1078 
1079  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
1081  fileQueue_.push(newInputFile);
1082 
1083  for (unsigned int i=0;i<neededChunks;i++) {
1084 
1085  if (fms_) {
1086  bool copy_active=false;
1087  for (auto j : tid_active_) if (j) copy_active=true;
1088  if (copy_active)
1090  else
1092  }
1093  //get thread
1094  unsigned int newTid = 0xffffffff;
1095  while (!workerPool_.try_pop(newTid)) {
1096  usleep(100000);
1097  }
1098 
1099  if (fms_) {
1100  bool copy_active=false;
1101  for (auto j : tid_active_) if (j) copy_active=true;
1102  if (copy_active)
1104  else
1106  }
1107  InputChunk * newChunk = nullptr;
1108  while (!freeChunks_.try_pop(newChunk)) {
1109  usleep(100000);
1110  if (quit_threads_.load(std::memory_order_relaxed)) break;
1111  }
1112 
1113  if (newChunk == nullptr) {
1114  //return unused tid if we received shutdown (nullptr chunk)
1115  if (newTid!=0xffffffff) workerPool_.push(newTid);
1116  stop = true;
1117  break;
1118  }
1120 
1121  std::unique_lock<std::mutex> lk(mReader_);
1122 
1123  unsigned int toRead = eventChunkSize_;
1124  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1125  newChunk->reset(i*eventChunkSize_,toRead,i);
1126 
1127  workerJob_[newTid].first=newInputFile;
1128  workerJob_[newTid].second=newChunk;
1129 
1130  //wake up the worker thread
1131  cvReader_[newTid]->notify_one();
1132  }
1133  }
1134  else {
1135  if (!eventsInNewFile) {
1136  //still queue file for lumi update
1137  std::unique_lock<std::mutex> lkw(mWakeup_);
1138  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1140  fileQueue_.push(newInputFile);
1141  cvWakeup_.notify_one();
1142  break;
1143  }
1144  //in single-buffer mode put single chunk in the file and let the main thread read the file
1145  InputChunk * newChunk;
1146  //should be available immediately
1147  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1148 
1149  std::unique_lock<std::mutex> lkw(mWakeup_);
1150 
1151  unsigned int toRead = eventChunkSize_;
1152  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1153  newChunk->reset(0,toRead,0);
1154  newChunk->readComplete_=true;
1155 
1156  //push file and wakeup main thread
1157  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1158  newInputFile->chunks_[0]=newChunk;
1160  fileQueue_.push(newInputFile);
1161  cvWakeup_.notify_one();
1162  }
1163  }
1164  }
1166  //make sure threads finish reading
1167  unsigned numFinishedThreads = 0;
1168  while (numFinishedThreads < workerThreads_.size()) {
1169  unsigned int tid;
1170  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1171  std::unique_lock<std::mutex> lk(mReader_);
1172  thread_quit_signal[tid]=true;
1173  cvReader_[tid]->notify_one();
1174  numFinishedThreads++;
1175  }
1176  for (unsigned int i=0;i<workerThreads_.size();i++) {
1177  workerThreads_[i]->join();
1178  delete workerThreads_[i];
1179  }
1180 }
1181 
1183 {
1184  bool init = true;
1185  threadInit_.exchange(true,std::memory_order_acquire);
1186 
1187  while (1) {
1188 
1189  tid_active_[tid]=false;
1190  std::unique_lock<std::mutex> lk(mReader_);
1191  workerJob_[tid].first=nullptr;
1192  workerJob_[tid].first=nullptr;
1193 
1194  assert(!thread_quit_signal[tid]);//should never get it here
1195  workerPool_.push(tid);
1196 
1197  if (init) {
1198  std::unique_lock<std::mutex> lk(startupLock_);
1199  init = false;
1200  startupCv_.notify_one();
1201  }
1202  cvReader_[tid]->wait(lk);
1203 
1204  if (thread_quit_signal[tid]) return;
1205  tid_active_[tid]=true;
1206 
1207  InputFile * file;
1208  InputChunk * chunk;
1209 
1210  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1211 
1212  file = workerJob_[tid].first;
1213  chunk = workerJob_[tid].second;
1214 
1215  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1216  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1217 
1218 
1219  if (fileDescriptor>=0)
1220  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1221  else
1222  {
1223  edm::LogError("FedRawDataInputSource") <<
1224  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1225  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1226  setExceptionState_=true;
1227  return;
1228 
1229  }
1230 
1231  unsigned int bufferLeft = 0;
1233  for (unsigned int i=0;i<readBlocks_;i++)
1234  {
1235  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1236  if ( last > 0 )
1237  bufferLeft+=last;
1238  if (last < eventChunkBlock_) {
1239  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1240  break;
1241  }
1242  }
1244  auto diff = end-start;
1245  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1246  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1247  close(fileDescriptor);
1248 
1249  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1250  assert(detectedFRDversion_<=5);
1251  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1252  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1253 
1254  }
1255 }
1256 
1258 {
1259  quit_threads_=true;
1260  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1261 
1262 }
1263 
1264 
1265 inline bool InputFile::advance(unsigned char* & dataPosition, const size_t size)
1266 {
1267  //wait for chunk
1268  while (!waitForChunk(currentChunk_)) {
1269  usleep(100000);
1270  if (parent_->exceptionState()) parent_->threadError();
1271  }
1272 
1273  dataPosition = chunks_[currentChunk_]->buf_+ chunkPosition_;
1274  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1275 
1276  if (currentLeft < size) {
1277 
1278  //we need next chunk
1279  while (!waitForChunk(currentChunk_+1)) {
1280  usleep(100000);
1281  if (parent_->exceptionState()) parent_->threadError();
1282  }
1283  //copy everything to beginning of the first chunk
1284  dataPosition-=chunkPosition_;
1285  assert(dataPosition==chunks_[currentChunk_]->buf_);
1286  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_+chunkPosition_, currentLeft);
1287  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_+1]->buf_, size - currentLeft);
1288  //set pointers at the end of the old data position
1289  bufferPosition_+=size;
1290  chunkPosition_=size-currentLeft;
1291  currentChunk_++;
1292  return true;
1293  }
1294  else {
1295  chunkPosition_+=size;
1296  bufferPosition_+=size;
1297  return false;
1298  }
1299 }
1300 
1301 inline void InputFile::moveToPreviousChunk(const size_t size, const size_t offset)
1302 {
1303  //this will fail in case of events that are too large
1304  assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1305  assert(size - offset < chunks_[currentChunk_]->size_);
1306  memcpy(chunks_[currentChunk_-1]->buf_+offset,chunks_[currentChunk_]->buf_+chunkPosition_,size);
1307  chunkPosition_+=size;
1308  bufferPosition_+=size;
1309 }
1310 
1311 inline void InputFile::rewindChunk(const size_t size) {
1312  chunkPosition_-=size;
1313  bufferPosition_-=size;
1314 }
1315 
1316 //single-buffer mode file reading
1318 {
1319 
1320  if (fileDescriptor_<0) {
1321  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1322  bufferInputRead_ = 0;
1323  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1324  if (fileDescriptor_>=0)
1325  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1326  else
1327  {
1328  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1329  << file->fileName_ << " fd:" << fileDescriptor_;
1330  }
1331  }
1332 
1333  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1334  uint32_t existingSize = 0;
1335  for (unsigned int i=0;i<readBlocks_;i++)
1336  {
1337  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1339  existingSize+=last;
1340  }
1341  }
1342  else {
1343  const uint32_t chunksize = file->chunkPosition_;
1344  const uint32_t blockcount=chunksize/eventChunkBlock_;
1345  const uint32_t leftsize = chunksize%eventChunkBlock_;
1346  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1347  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1348 
1349  for (uint32_t i=0;i<blockcount;i++) {
1350  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1352  existingSize+=last;
1353  }
1354  if (leftsize) {
1355  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1357  }
1358  file->chunkPosition_=0;//data was moved to beginning of the chunk
1359  }
1360  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1361  if (fileDescriptor_!=-1)
1362  {
1363  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1364  close(fileDescriptor_);
1365  fileDescriptor_=-1;
1366  }
1367  }
1368 }
1369 
1370 
1372 {
1373 
1374  std::lock_guard<std::mutex> lock(monlock_);
1375  auto itr = sourceEventsReport_.find(lumi);
1376  if (itr!=sourceEventsReport_.end())
1377  itr->second+=events;
1378  else
1379  sourceEventsReport_[lumi]=events;
1380 }
1381 
1382 std::pair<bool,unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase)
1383 {
1384  std::lock_guard<std::mutex> lock(monlock_);
1385  auto itr = sourceEventsReport_.find(lumi);
1386  if (itr!=sourceEventsReport_.end()) {
1387  auto && ret = std::pair<bool,unsigned int>(true,itr->second);
1388  if (erase)
1389  sourceEventsReport_.erase(itr);
1390  return ret;
1391  }
1392  else
1393  return std::pair<bool,unsigned int>(false,0);
1394 }
1395 
1397 {
1398  std::sort(fileNames_.begin(),fileNames_.end(),
1399  [](std::string a, std::string b) {
1400  if (a.rfind("/")!=std::string::npos) a=a.substr(a.rfind("/"));
1401  if (b.rfind("/")!=std::string::npos) b=b.substr(b.rfind("/"));
1402  return b > a;});
1403 
1404  if (fileNames_.size()) {
1405  //get run number from first file in the vector
1407  std::string fileStem = fileName.stem().string();
1408  auto end = fileStem.find("_");
1409  if (fileStem.find("run")==0) {
1410  std::string runStr = fileStem.substr(3,end-3);
1411  try {
1412  //get long to support test run numbers < 2^32
1413  long rval = boost::lexical_cast<long>(runStr);
1414  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: "<< rval;
1415  return rval;
1416  }
1417  catch( boost::bad_lexical_cast const& ) {
1418  edm::LogWarning("FedRawDataInputSource") << "Unable to autodetect run number in fileListMode from file -: "<< fileName;
1419  }
1420  }
1421  }
1422  return -1;
1423 }
1424 
1425 evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile(unsigned int& ls, std::string& nextFile, uint32_t& fsize, uint64_t& lockWaitTime)
1426 {
1427  if (fileListIndex_ < fileNames_.size()) {
1428  nextFile = fileNames_[fileListIndex_];
1429  if (nextFile.find("file://")==0) nextFile=nextFile.substr(7);
1430  else if (nextFile.find("file:")==0) nextFile=nextFile.substr(5);
1431  boost::filesystem::path fileName = nextFile;
1432  std::string fileStem = fileName.stem().string();
1433  if (fileStem.find("ls")) fileStem = fileStem.substr(fileStem.find("ls")+2);
1434  if (fileStem.find("_")) fileStem = fileStem.substr(0,fileStem.find("_"));
1435 
1436  if (!fileListLoopMode_)
1437  ls = boost::lexical_cast<unsigned int>(fileStem);
1438  else //always starting from LS 1 in loop mode
1439  ls = 1 + loopModeIterationInc_;
1440 
1441  //fsize = 0;
1442  //lockWaitTime = 0;
1443  fileListIndex_++;
1445  }
1446  else {
1447  if (!fileListLoopMode_)
1449  else {
1450  //loop through files until interrupted
1452  fileListIndex_=0;
1453  return getFile(ls,nextFile,fsize,lockWaitTime);
1454  }
1455  }
1456 }
#define LogDebug(id)
size
Write out results.
static const char runNumber_[]
Definition: start.py:1
void setComment(std::string const &value)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
unsigned int lumi_
std::vector< std::string > fileNames_
unsigned int getgpshigh(const unsigned char *)
uint32_t chunkPosition_
std::condition_variable cvWakeup_
virtual void read(edm::EventPrincipal &eventPrincipal) override
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:344
bool gtpe_board_sense(const unsigned char *p)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
virtual void rewind_() override
bool crc32c_hw_test()
Definition: crc32c.cc:354
unsigned int offset_
jsoncollector::DataPointDefinition * dpd_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
tbb::concurrent_queue< unsigned int > workerPool_
JetCorrectorParameters::Record record
Definition: classes.h:7
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void setInState(FastMonitoringThread::InputState inputState)
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
volatile std::atomic< bool > shutdown_flag
int init
Definition: HydjetWrapper.h:67
evf::EvFDaqDirector::FileStatus status_
void setInStateSup(FastMonitoringThread::InputState inputState)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:534
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
std::unique_ptr< std::thread > readSupervisorThread_
ProductProvenance const & dummyProvenance() const
Represents a JSON value.
Definition: value.h:111
std::string getEoLSFilePathOnBU(const unsigned int ls) const
#define nullptr
int timeout
Definition: mps_check.py:48
std::vector< std::condition_variable * > cvReader_
FedRawDataInputSource * parent_
unsigned int sourceid
Definition: fed_header.h:32
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:211
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
U second(std::pair< T, U > const &p)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void setComment(std::string const &value)
uint32_t bufferPosition_
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
Definition: FEDRawData.cc:32
bool advance(unsigned char *&dataPosition, const size_t size)
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:351
std::vector< int > streamFileTracker_
void moveToPreviousChunk(const size_t size, const size_t offset)
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID, bool verifyLumiSection)
unsigned char * buf_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
struct fedt_struct fedt_t
int grabNextJsonFile(boost::filesystem::path const &)
StreamID streamID() const
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
#define end
Definition: vmac.h:37
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
std::map< unsigned int, unsigned int > sourceEventsReport_
bool waitForChunk(unsigned int chunkid)
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
unsigned long long TimeValue_t
Definition: Timestamp.h:28
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:376
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
void setInputSource(FedRawDataInputSource *inputSource)
std::atomic< bool > readComplete_
virtual bool checkNextEvent() override
unsigned long long int rval
Definition: vlib.h:22
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:540
def ls(path, rec=False)
Definition: eostools.py:348
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
def getRunNumber(filename)
unsigned long long uint64_t
Definition: Time.h:15
unsigned int currentChunk_
auto dp
Definition: deltaR.h:22
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
def load(fileName)
Definition: svgfig.py:546
std::string fileName_
double b
Definition: hdecay.h:120
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:209
void setProcessHistoryID(ProcessHistoryID const &phid)
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:252
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:359
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:345
unsigned int eventsize
Definition: fed_trailer.h:33
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
std::string getBoLSFilePathOnFU(const unsigned int ls) const
HLT enums.
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:347
void readWorker(unsigned int tid)
double a
Definition: hdecay.h:121
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
bool emptyLumisectionMode() const
unsigned int gtpe_get(const unsigned char *)
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
tbb::concurrent_queue< InputChunk * > freeChunks_
evf::EvFDaqDirector::FileStatus getNextEvent()
evf::EvFDaqDirector::FileStatus nextEvent()
void readNextChunkIntoBuffer(InputFile *file)
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:255
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
def move(src, dest)
Definition: eostools.py:510
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
Definition: event.py:1
std::string getEoRFilePathOnFU() const