CMS 3D CMS Logo

FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <memory>
5 #include <sstream>
6 #include <sys/types.h>
7 #include <sys/file.h>
8 #include <sys/time.h>
9 #include <unistd.h>
10 #include <vector>
11 #include <fstream>
12 #include <zlib.h>
13 #include <stdio.h>
14 #include <chrono>
15 
16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
18 
19 
22 
29 
33 
35 
39 
41 
46 
47 //JSON file reader
49 
50 #include <boost/lexical_cast.hpp>
51 
52 using namespace jsoncollector;
53 
55  edm::InputSourceDescription const& desc) :
56  edm::RawInputSource(pset, desc),
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",32)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",32)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",2)),
61  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
62  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
63  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
64  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
65  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
66  fileNames_(pset.getUntrackedParameter<std::vector<std::string>> ("fileNames",std::vector<std::string>())),
67  fileListMode_(pset.getUntrackedParameter<bool> ("fileListMode", false)),
68  fileListLoopMode_(pset.getUntrackedParameter<bool> ("fileListLoopMode", false)),
69  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
70  fuOutputDir_(std::string()),
71  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
72  eventID_(),
73  processHistoryID_(),
74  currentLumiSection_(0),
75  tcds_pointer_(0),
76  eventsThisLumi_(0),
77  dpd_(nullptr)
78 {
79  char thishost[256];
80  gethostname(thishost, 255);
81  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
82  << std::endl << (eventChunkSize_/1048576)
83  << " MB on host " << thishost;
84 
85  long autoRunNumber = -1;
86  if (fileListMode_) {
87  autoRunNumber = initFileList();
88  if (!fileListLoopMode_) {
89  if (autoRunNumber<0)
90  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
91  //override run number
92  runNumber_ = (edm::RunNumber_t)autoRunNumber;
93  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
94  }
95  }
96 
98  setNewRun();
99  //todo:autodetect from file name (assert if names differ)
102 
103  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
104  defPath_ = std::string(getenv("CMSSW_BASE")) + "/" + defPathSuffix;
105  struct stat statbuf;
106  if (stat(defPath_.c_str(), &statbuf)) {
107  defPath_ = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
108  if (stat(defPath_.c_str(), &statbuf)) {
109  defPath_ = defPathSuffix;
110  }
111  }
112 
113  dpd_ = new DataPointDefinition();
114  std::string defLabel = "data";
115  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
116 
117  //make sure that chunk size is N * block size
122 
123  if (!numBuffers_)
124  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
125  "no reading enabled with numBuffers parameter 0";
126 
130 
131  if (!crc32c_hw_test())
132  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
133 
134  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
135  if (fileListMode_) {
136  try {
138  } catch(cms::Exception e) {
139  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
140  }
141  }
142  else {
144  if (!fms_) {
145  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
146  }
147  }
148 
150  if (!daqDirector_)
151  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
152 
153  //set DaqDirector to delete files in preGlobalEndLumi callback
155  if (fms_) {
157  fms_->setInputSource(this);
160  }
161  //should delete chunks when run stops
162  for (unsigned int i=0;i<numBuffers_;i++) {
164  }
165 
166  quit_threads_ = false;
167 
168  for (unsigned int i=0;i<numConcurrentReads_;i++)
169  {
170  std::unique_lock<std::mutex> lk(startupLock_);
171  //issue a memory fence here and in threads (constructor was segfaulting without this)
172  thread_quit_signal.push_back(false);
173  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
174  cvReader_.push_back(new std::condition_variable);
175  tid_active_.push_back(0);
176  threadInit_.store(false,std::memory_order_release);
177  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
178  startupCv_.wait(lk);
179  }
180 
181  runAuxiliary()->setProcessHistoryID(processHistoryID_);
182 }
183 
185 {
186  quit_threads_=true;
187 
188  //delete any remaining open files
189  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
190  deleteFile(it->second->fileName_);
191  delete it->second;
192  }
194  readSupervisorThread_->join();
195  }
196  else {
197  //join aux threads in case the supervisor thread was not started
198  for (unsigned int i=0;i<workerThreads_.size();i++) {
199  std::unique_lock<std::mutex> lk(mReader_);
200  thread_quit_signal[i]=true;
201  cvReader_[i]->notify_one();
202  lk.unlock();
203  workerThreads_[i]->join();
204  delete workerThreads_[i];
205  }
206  }
207  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
208  /*
209  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
210  InputChunk *ch;
211  while (!freeChunks_.try_pop(ch)) {}
212  delete ch;
213  }
214  */
215 }
216 
218 {
220  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
221  desc.addUntracked<unsigned int> ("eventChunkSize",32)->setComment("Input buffer (chunk) size");
222  desc.addUntracked<unsigned int> ("eventChunkBlock",32)->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
223  desc.addUntracked<unsigned int> ("numBuffers",2)->setComment("Number of buffers used for reading input");
224  desc.addUntracked<unsigned int> ("maxBufferedFiles",2)->setComment("Maximum number of simultaneously buffered raw files");
225  desc.addUntracked<bool> ("verifyAdler32", true)->setComment("Verify event Adler32 checksum with FRDv3 or v4");
226  desc.addUntracked<bool> ("verifyChecksum", true)->setComment("Verify event CRC-32C checksum of FRDv5 or higher");
227  desc.addUntracked<bool> ("useL1EventID", false)->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
228  desc.addUntracked<bool> ("fileListMode", false)->setComment("Use fileNames parameter to directly specify raw files to open");
229  desc.addUntracked<std::vector<std::string>> ("fileNames", std::vector<std::string>())->setComment("file list used when fileListMode is enabled");
230  desc.setAllowAnything();
231  descriptions.add("source", desc);
232 }
233 
235 {
237  {
238  //late init of directory variable
240 
241  //this thread opens new files and dispatches reading to worker readers
242  //threadInit_.store(false,std::memory_order_release);
243  std::unique_lock<std::mutex> lk(startupLock_);
244  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
246  startupCv_.wait(lk);
247  }
248  //signal hltd to start event accounting
252  switch (nextEvent() ) {
254  //maybe create EoL file in working directory before ending run
255  struct stat buf;
256  if ( currentLumiSection_ > 0) {
257  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
258  if (eolFound) {
260  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
261  if ( !found ) {
263  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
264  close(eol_fd);
266  }
267  }
268  }
269  //also create EoR file in FU data directory
270  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
271  if (!eorFound) {
272  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
273  close(eor_fd);
274  }
276  eventsThisLumi_=0;
278  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
279  return false;
280  }
282  //this is not reachable
283  return true;
284  }
286  //std::cout << "--------------NEW LUMI---------------" << std::endl;
287  return true;
288  }
289  default: {
290  if (!getLSFromFilename_) {
291  //get new lumi from file header
292  if (event_->lumi() > currentLumiSection_) {
294  eventsThisLumi_=0;
295  maybeOpenNewLumiSection( event_->lumi() );
296  }
297  }
300  else
301  eventRunNumber_=event_->run();
302  L1EventID_ = event_->event();
303 
304  setEventCached();
305 
306  return true;
307  }
308  }
309 }
310 
311 void FedRawDataInputSource::createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
312 {
313  //used for backpressure mechanisms and monitoring
314  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
315  struct stat buf;
316  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
317  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
318  close(bol_fd);
319  }
320 }
321 
322 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection)
323 {
325  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
326 
327  if ( currentLumiSection_ > 0) {
328  const std::string fuEoLS =
330  struct stat buf;
331  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
332  if ( !found ) {
334  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
335  close(eol_fd);
336  createBoLSFile(lumiSection,false);
338  }
339  }
340  else createBoLSFile(lumiSection,true);//needed for initial lumisection
341 
342  currentLumiSection_ = lumiSection;
343 
345 
346  timeval tv;
347  gettimeofday(&tv, 0);
348  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
349 
350  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
352  runAuxiliary()->run(),
353  lumiSection, lsopentime,
355 
356  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
357  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
358 
359  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
360  }
361 }
362 
364 {
366  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
367  {
368  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
369  }
370  return status;
371 }
372 
374 {
375 
377  if (!currentFile_)
378  {
381  if (!fileQueue_.try_pop(currentFile_))
382  {
383  //sleep until wakeup (only in single-buffer mode) or timeout
384  std::unique_lock<std::mutex> lkw(mWakeup_);
385  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
387  }
388  status = currentFile_->status_;
389  if ( status == evf::EvFDaqDirector::runEnded)
390  {
392  delete currentFile_;
393  currentFile_=nullptr;
394  return status;
395  }
396  else if ( status == evf::EvFDaqDirector::runAbort)
397  {
398  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
399  }
400  else if (status == evf::EvFDaqDirector::newLumi)
401  {
403  if (getLSFromFilename_) {
406  eventsThisLumi_=0;
408  }
409  }
410  else {//let this be picked up from next event
412  }
413 
414  delete currentFile_;
415  currentFile_=nullptr;
416  return status;
417  }
418  else if (status == evf::EvFDaqDirector::newFile) {
420  }
421  else
422  assert(0);
423  }
425 
426  //file is empty
427  if (!currentFile_->fileSize_) {
429  //try to open new lumi
430  assert(currentFile_->nChunks_==0);
431  if (getLSFromFilename_)
434  eventsThisLumi_=0;
436  }
437  //immediately delete empty file
439  delete currentFile_;
440  currentFile_=nullptr;
442  }
443 
444  //file is finished
447  //release last chunk (it is never released elsewhere)
450  {
451  throw cms::Exception("FedRawDataInputSource::getNextEvent")
452  << "Fully processed " << currentFile_->nProcessed_
453  << " from the file " << currentFile_->fileName_
454  << " but according to BU JSON there should be "
455  << currentFile_->nEvents_ << " events";
456  }
457  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
458  if (singleBufferMode_) {
459  std::unique_lock<std::mutex> lkw(mWakeup_);
460  cvWakeup_.notify_one();
461  }
464  //put the file in pending delete list;
465  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
466  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
467  }
468  else {
469  //in single-thread and stream jobs, events are already processed
471  delete currentFile_;
472  }
473  currentFile_=nullptr;
475  }
476 
477 
478  //file is too short
480  {
481  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
482  "Premature end of input file while reading event header";
483  }
484  if (singleBufferMode_) {
485 
486  //should already be there
489  usleep(10000);
491  }
493 
494  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
495 
496  //conditions when read amount is not sufficient for the header to fit
497  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
499  {
501 
502  if (detectedFRDversion_==0) {
503  detectedFRDversion_=*((uint32*)dataPosition);
504  if (detectedFRDversion_>5)
505  throw cms::Exception("FedRawDataInputSource::getNextEvent")
506  << "Unknown FRD version -: " << detectedFRDversion_;
507  assert(detectedFRDversion_>=1);
508  }
509 
510  //recalculate chunk position
511  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
512  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
513  {
514  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
515  "Premature end of input file while reading event header";
516  }
517  }
518 
519  event_.reset( new FRDEventMsgView(dataPosition) );
520  if (event_->size()>eventChunkSize_) {
521  throw cms::Exception("FedRawDataInputSource::getNextEvent")
522  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
523  << " run:" << event_->run() << " of size:" << event_->size()
524  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
525  }
526 
527  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
528 
530  {
531  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
532  "Premature end of input file while reading event data";
533  }
534  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
536  //recalculate chunk position
537  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
538  event_.reset( new FRDEventMsgView(dataPosition) );
539  }
540  currentFile_->bufferPosition_ += event_->size();
541  currentFile_->chunkPosition_ += event_->size();
542  //last chunk is released when this function is invoked next time
543 
544  }
545  //multibuffer mode:
546  else
547  {
548  //wait for the current chunk to become added to the vector
551  usleep(10000);
553  }
555 
556  //check if header is at the boundary of two chunks
557  chunkIsFree_ = false;
558  unsigned char *dataPosition;
559 
560  //read header, copy it to a single chunk if necessary
561  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
562 
563  event_.reset( new FRDEventMsgView(dataPosition) );
564  if (event_->size()>eventChunkSize_) {
565  throw cms::Exception("FedRawDataInputSource::getNextEvent")
566  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
567  << " run:" << event_->run() << " of size:" << event_->size()
568  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
569  }
570 
571  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
572 
574  {
575  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
576  "Premature end of input file while reading event data";
577  }
578 
579  if (chunkEnd) {
580  //header was at the chunk boundary, we will have to move payload as well
581  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
582  chunkIsFree_ = true;
583  }
584  else {
585  //header was contiguous, but check if payload fits the chunk
586  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
587  //rewind to header start position
588  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
589  //copy event to a chunk start and move pointers
590  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
591  assert(chunkEnd);
592  chunkIsFree_=true;
593  //header is moved
594  event_.reset( new FRDEventMsgView(dataPosition) );
595  }
596  else {
597  //everything is in a single chunk, only move pointers forward
598  chunkEnd = currentFile_->advance(dataPosition,msgSize);
599  assert(!chunkEnd);
600  chunkIsFree_=false;
601  }
602  }
603  }//end multibuffer mode
605 
606  if (verifyChecksum_ && event_->version() >= 5)
607  {
608  uint32_t crc=0;
609  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
610  if ( crc != event_->crc32c() ) {
612  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
613  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
614  " but calculated 0x" << crc;
615  }
616  }
617  else if ( verifyAdler32_ && event_->version() >= 3)
618  {
619  uint32_t adler = adler32(0L,Z_NULL,0);
620  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
621 
622  if ( adler != event_->adler32() ) {
624  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
625  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
626  " but calculated 0x" << adler;
627  }
628  }
630 
632 
634 }
635 
637 {
638  //no deletion in file list mode
639  if (fileListMode_) return;
640 
641  const boost::filesystem::path filePath(fileName);
642  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
643  try {
644  //sometimes this fails but file gets deleted
645  boost::filesystem::remove(filePath);
646  }
647  catch (const boost::filesystem::filesystem_error& ex)
648  {
649  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
650  << ". Trying again.";
651  usleep(100000);
652  try {
653  boost::filesystem::remove(filePath);
654  }
655  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
656  }
657  catch (std::exception& ex)
658  {
659  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
660  << ". Trying again.";
661  usleep(100000);
662  try {
663  boost::filesystem::remove(filePath);
664  } catch (std::exception&) {/*file gets deleted first time but exception is still thrown*/}
665  }
666 }
667 
668 
670 {
672  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
673  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
674 
675  if (useL1EventID_){
677  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
680  makeEvent(eventPrincipal, aux);
681  }
682  else if(tcds_pointer_==0){
683  assert(GTPEventID_);
685  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
688  makeEvent(eventPrincipal, aux);
689  }
690  else{
691  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
696  makeEvent(eventPrincipal, aux);
697  }
698 
699 
700 
701  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
702 
703  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
705 
706  eventsThisLumi_++;
708 
709  //resize vector if needed
710  while (streamFileTracker_.size() <= eventPrincipal.streamID())
711  streamFileTracker_.push_back(-1);
712 
714 
715  //this old file check runs no more often than every 10 events
716  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
717  //delete files that are not in processing
718  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
719  auto it = filesToDelete_.begin();
720  while (it!=filesToDelete_.end()) {
721  bool fileIsBeingProcessed = false;
722  for (unsigned int i=0;i<nStreams_;i++) {
723  if (it->first == streamFileTracker_.at(i)) {
724  fileIsBeingProcessed = true;
725  break;
726  }
727  }
728  if (!fileIsBeingProcessed) {
729  deleteFile(it->second->fileName_);
730  delete it->second;
731  it = filesToDelete_.erase(it);
732  }
733  else it++;
734  }
735 
736  }
738  chunkIsFree_=false;
740  return;
741 }
742 
744 {
746  timeval stv;
747  gettimeofday(&stv,0);
748  time = stv.tv_sec;
749  time = (time << 32) + stv.tv_usec;
750  edm::Timestamp tstamp(time);
751 
752  uint32_t eventSize = event_->eventSize();
753  char* event = (char*)event_->payload();
754  GTPEventID_=0;
755  tcds_pointer_ = 0;
756  while (eventSize > 0) {
757  assert(eventSize>=sizeof(fedt_t));
758  eventSize -= sizeof(fedt_t);
759  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
760  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
761  assert(eventSize>=fedSize - sizeof(fedt_t));
762  eventSize -= (fedSize - sizeof(fedt_t));
763  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
764  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
765  if(fedId>FEDNumbering::MAXFEDID)
766  {
767  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
768  }
769  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
770  tcds_pointer_ = (unsigned char *)(event + eventSize );
771  }
772  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
773  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
774  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
775  else
776  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
777  //evf::evtn::evm_board_setformat(fedSize);
778  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
779  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
780  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
781  }
782  //take event ID from GTPE FED
783  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
784  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
785  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
786  }
787  }
788  FEDRawData& fedData = rawData.FEDData(fedId);
789  fedData.resize(fedSize);
790  memcpy(fedData.data(), event + eventSize, fedSize);
791  }
792  assert(eventSize == 0);
793 
794  return tstamp;
795 }
796 
798 {
800  try {
801  // assemble json destination path
803 
804  //TODO:should be ported to use fffnaming
805  std::ostringstream fileNameWithPID;
806  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
807  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
808  jsonDestPath /= fileNameWithPID.str();
809 
810  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
811  << jsonDestPath;
812  try {
813  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
814  }
815  catch (const boost::filesystem::filesystem_error& ex)
816  {
817  // Input dir gone?
818  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
819  // << " Maybe the file is not yet visible by FU. Trying again in one second";
820  sleep(1);
821  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
822  }
824 
825  try {
826  //sometimes this fails but file gets deleted
827  boost::filesystem::remove(jsonSourcePath);
828  }
829  catch (const boost::filesystem::filesystem_error& ex)
830  {
831  // Input dir gone?
832  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
833  }
834  catch (std::exception& ex)
835  {
836  // Input dir gone?
837  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
838  }
839 
840  boost::filesystem::ifstream ij(jsonDestPath);
841  Json::Value deserializeRoot;
843 
844  std::stringstream ss;
845  ss << ij.rdbuf();
846  if (!reader.parse(ss.str(), deserializeRoot)) {
847  edm::LogError("FedRawDataInputSource") << "Failed to deserialize JSON file -: " << jsonDestPath
848  << "\nERROR:\n" << reader.getFormatedErrorMessages()
849  << "CONTENT:\n" << ss.str()<<".";
850  throw std::runtime_error("Cannot deserialize input JSON file");
851  }
852 
853  //read BU JSON
855  DataPoint dp;
856  dp.deserialize(deserializeRoot);
857  bool success = false;
858  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
859  if (dpd_->getNames().at(i)=="NEvents")
860  if (i<dp.getData().size()) {
861  data = dp.getData()[i];
862  success=true;
863  }
864  }
865  if (!success) {
866  if (dp.getData().size())
867  data = dp.getData()[0];
868  else
869  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
870  " error reading number of events from BU JSON -: No input value " << data;
871  }
872  return boost::lexical_cast<int>(data);
873 
874  }
875  catch (const boost::filesystem::filesystem_error& ex)
876  {
877  // Input dir gone?
879  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
880  }
881  catch (std::runtime_error e)
882  {
883  // Another process grabbed the file and NFS did not register this
885  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
886  }
887 
888  catch( boost::bad_lexical_cast const& ) {
889  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
890  << "Input value is -: " << data;
891  }
892 
893  catch (std::exception e)
894  {
895  // BU run directory disappeared?
897  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
898  }
899 
900  return -1;
901 }
902 
904 {}
905 
906 void FedRawDataInputSource::postForkReacquireResources(std::shared_ptr<edm::multicore::MessageReceiverForSource>)
907 {
908  InputSource::rewind();
912 }
913 
915 {}
916 
917 
919 {
920  bool stop=false;
921  unsigned int currentLumiSection = 0;
922  //threadInit_.exchange(true,std::memory_order_acquire);
923 
924  {
925  std::unique_lock<std::mutex> lk(startupLock_);
926  startupCv_.notify_one();
927  }
928 
929  uint32_t ls=0;
930  uint32_t monLS=1;
931  uint32_t lockCount=0;
932  uint64_t sumLockWaitTimeUs=0.;
933 
934  while (!stop) {
935 
936  //wait for at least one free thread and chunk
937  int counter=0;
939  {
940  //report state to monitoring
941  if (fms_) {
942  bool copy_active=false;
943  for (auto j : tid_active_) if (j) copy_active=true;
945  else if (freeChunks_.empty()) {
946  if (copy_active)
948  else
950  }
951  else {
952  if (copy_active)
954  else
956  }
957  }
958  std::unique_lock<std::mutex> lkw(mWakeup_);
959  //sleep until woken up by condition or a timeout
960  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
961  counter++;
962  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
963  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
964  }
965  else {
966  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
967  }
968  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
969  }
970 
971  if (stop) break;
972 
973  //look for a new file
974  std::string nextFile;
975  uint32_t fileSize;
976 
977  if (fms_) {
981  }
982 
984 
985  while (status == evf::EvFDaqDirector::noFile) {
986  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
987  stop=true;
988  break;
989  }
990 
991  uint64_t thisLockWaitTimeUs=0.;
992  if (fileListMode_) {
993  //return LS if LS not set, otherwise return file
994  status = getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
995  }
996  else
997  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
998 
1000 
1001  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
1002 
1003  //monitoring of lock wait time
1004  if (thisLockWaitTimeUs>0.)
1005  sumLockWaitTimeUs+=thisLockWaitTimeUs;
1006  lockCount++;
1007  if (ls>monLS) {
1008  monLS=ls;
1009  if (lockCount)
1010  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
1011  lockCount=0;
1012  sumLockWaitTimeUs=0;
1013  }
1014 
1015  //check again for any remaining index/EoLS files after EoR file is seen
1016  if ( status == evf::EvFDaqDirector::runEnded && !fileListMode_) {
1018  usleep(100000);
1019  //now all files should have appeared in ramdisk, check again if any raw files were left behind
1020  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
1021  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
1022  }
1023 
1024  if ( status == evf::EvFDaqDirector::runEnded) {
1026  stop=true;
1027  break;
1028  }
1029 
1030  //queue new lumisection
1031  if( getLSFromFilename_ && ls > currentLumiSection) {
1032  //fms_->setInStateSup(evf::FastMonitoringThread::inSupNewLumi);
1033  currentLumiSection = ls;
1034  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
1035  }
1036 
1037  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
1038  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
1040  stop=true;
1041  break;
1042  }
1043 
1044  int dbgcount=0;
1045  if (status == evf::EvFDaqDirector::noFile) {
1047  dbgcount++;
1048  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1049  usleep(100000);
1050  }
1051  }
1052  if ( status == evf::EvFDaqDirector::newFile ) {
1054  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1055 
1056 
1057  boost::filesystem::path rawFilePath(nextFile);
1058  std::string rawFile = rawFilePath.replace_extension(".raw").string();
1059 
1060  struct stat st;
1061  int stat_res = stat(rawFile.c_str(),&st);
1062  if (stat_res==-1) {
1063  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-"<< rawFile << std::endl;
1064  setExceptionState_=true;
1065  break;
1066  }
1067  fileSize=st.st_size;
1068 
1069  if (fms_) {
1073  }
1074  int eventsInNewFile;
1075  if (fileListMode_) {
1076  if (fileSize==0) eventsInNewFile=0;
1077  else eventsInNewFile=-1;
1078  }
1079  else {
1080  eventsInNewFile = grabNextJsonFile(nextFile);
1081  assert( eventsInNewFile>=0 );
1082  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1083  }
1084 
1085  if (!singleBufferMode_) {
1086  //calculate number of needed chunks
1087  unsigned int neededChunks = fileSize/eventChunkSize_;
1088  if (fileSize%eventChunkSize_) neededChunks++;
1089 
1090  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
1092  fileQueue_.push(newInputFile);
1093 
1094  for (unsigned int i=0;i<neededChunks;i++) {
1095 
1096  if (fms_) {
1097  bool copy_active=false;
1098  for (auto j : tid_active_) if (j) copy_active=true;
1099  if (copy_active)
1101  else
1103  }
1104  //get thread
1105  unsigned int newTid = 0xffffffff;
1106  while (!workerPool_.try_pop(newTid)) {
1107  usleep(100000);
1108  }
1109 
1110  if (fms_) {
1111  bool copy_active=false;
1112  for (auto j : tid_active_) if (j) copy_active=true;
1113  if (copy_active)
1115  else
1117  }
1118  InputChunk * newChunk = nullptr;
1119  while (!freeChunks_.try_pop(newChunk)) {
1120  usleep(100000);
1121  if (quit_threads_.load(std::memory_order_relaxed)) break;
1122  }
1123 
1124  if (newChunk == nullptr) {
1125  //return unused tid if we received shutdown (nullptr chunk)
1126  if (newTid!=0xffffffff) workerPool_.push(newTid);
1127  stop = true;
1128  break;
1129  }
1131 
1132  std::unique_lock<std::mutex> lk(mReader_);
1133 
1134  unsigned int toRead = eventChunkSize_;
1135  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1136  newChunk->reset(i*eventChunkSize_,toRead,i);
1137 
1138  workerJob_[newTid].first=newInputFile;
1139  workerJob_[newTid].second=newChunk;
1140 
1141  //wake up the worker thread
1142  cvReader_[newTid]->notify_one();
1143  }
1144  }
1145  else {
1146  if (!eventsInNewFile) {
1147  //still queue file for lumi update
1148  std::unique_lock<std::mutex> lkw(mWakeup_);
1149  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1151  fileQueue_.push(newInputFile);
1152  cvWakeup_.notify_one();
1153  break;
1154  }
1155  //in single-buffer mode put single chunk in the file and let the main thread read the file
1156  InputChunk * newChunk;
1157  //should be available immediately
1158  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1159 
1160  std::unique_lock<std::mutex> lkw(mWakeup_);
1161 
1162  unsigned int toRead = eventChunkSize_;
1163  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1164  newChunk->reset(0,toRead,0);
1165  newChunk->readComplete_=true;
1166 
1167  //push file and wakeup main thread
1168  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1169  newInputFile->chunks_[0]=newChunk;
1171  fileQueue_.push(newInputFile);
1172  cvWakeup_.notify_one();
1173  }
1174  }
1175  }
1177  //make sure threads finish reading
1178  unsigned numFinishedThreads = 0;
1179  while (numFinishedThreads < workerThreads_.size()) {
1180  unsigned int tid;
1181  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1182  std::unique_lock<std::mutex> lk(mReader_);
1183  thread_quit_signal[tid]=true;
1184  cvReader_[tid]->notify_one();
1185  numFinishedThreads++;
1186  }
1187  for (unsigned int i=0;i<workerThreads_.size();i++) {
1188  workerThreads_[i]->join();
1189  delete workerThreads_[i];
1190  }
1191 }
1192 
1194 {
1195  bool init = true;
1196  threadInit_.exchange(true,std::memory_order_acquire);
1197 
1198  while (1) {
1199 
1200  tid_active_[tid]=false;
1201  std::unique_lock<std::mutex> lk(mReader_);
1202  workerJob_[tid].first=nullptr;
1203  workerJob_[tid].first=nullptr;
1204 
1205  assert(!thread_quit_signal[tid]);//should never get it here
1206  workerPool_.push(tid);
1207 
1208  if (init) {
1209  std::unique_lock<std::mutex> lk(startupLock_);
1210  init = false;
1211  startupCv_.notify_one();
1212  }
1213  cvReader_[tid]->wait(lk);
1214 
1215  if (thread_quit_signal[tid]) return;
1216  tid_active_[tid]=true;
1217 
1218  InputFile * file;
1219  InputChunk * chunk;
1220 
1221  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1222 
1223  file = workerJob_[tid].first;
1224  chunk = workerJob_[tid].second;
1225 
1226  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1227  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1228 
1229 
1230  if (fileDescriptor>=0)
1231  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1232  else
1233  {
1234  edm::LogError("FedRawDataInputSource") <<
1235  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1236  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1237  setExceptionState_=true;
1238  return;
1239 
1240  }
1241 
1242  unsigned int bufferLeft = 0;
1244  for (unsigned int i=0;i<readBlocks_;i++)
1245  {
1246  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1247  if ( last > 0 )
1248  bufferLeft+=last;
1249  if (last < eventChunkBlock_) {
1250  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1251  break;
1252  }
1253  }
1255  auto diff = end-start;
1256  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1257  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1258  close(fileDescriptor);
1259 
1260  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1261  assert(detectedFRDversion_<=5);
1262  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1263  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1264 
1265  }
1266 }
1267 
1269 {
1270  quit_threads_=true;
1271  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1272 
1273 }
1274 
1275 
1276 inline bool InputFile::advance(unsigned char* & dataPosition, const size_t size)
1277 {
1278  //wait for chunk
1279  while (!waitForChunk(currentChunk_)) {
1280  usleep(100000);
1281  if (parent_->exceptionState()) parent_->threadError();
1282  }
1283 
1284  dataPosition = chunks_[currentChunk_]->buf_+ chunkPosition_;
1285  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1286 
1287  if (currentLeft < size) {
1288 
1289  //we need next chunk
1290  while (!waitForChunk(currentChunk_+1)) {
1291  usleep(100000);
1292  if (parent_->exceptionState()) parent_->threadError();
1293  }
1294  //copy everything to beginning of the first chunk
1295  dataPosition-=chunkPosition_;
1296  assert(dataPosition==chunks_[currentChunk_]->buf_);
1297  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_+chunkPosition_, currentLeft);
1298  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_+1]->buf_, size - currentLeft);
1299  //set pointers at the end of the old data position
1300  bufferPosition_+=size;
1301  chunkPosition_=size-currentLeft;
1302  currentChunk_++;
1303  return true;
1304  }
1305  else {
1306  chunkPosition_+=size;
1307  bufferPosition_+=size;
1308  return false;
1309  }
1310 }
1311 
1312 inline void InputFile::moveToPreviousChunk(const size_t size, const size_t offset)
1313 {
1314  //this will fail in case of events that are too large
1315  assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1316  assert(size - offset < chunks_[currentChunk_]->size_);
1317  memcpy(chunks_[currentChunk_-1]->buf_+offset,chunks_[currentChunk_]->buf_+chunkPosition_,size);
1318  chunkPosition_+=size;
1319  bufferPosition_+=size;
1320 }
1321 
1322 inline void InputFile::rewindChunk(const size_t size) {
1323  chunkPosition_-=size;
1324  bufferPosition_-=size;
1325 }
1326 
1327 //single-buffer mode file reading
1329 {
1330 
1331  if (fileDescriptor_<0) {
1332  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1333  bufferInputRead_ = 0;
1334  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1335  if (fileDescriptor_>=0)
1336  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1337  else
1338  {
1339  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1340  << file->fileName_ << " fd:" << fileDescriptor_;
1341  }
1342  }
1343 
1344  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1345  uint32_t existingSize = 0;
1346  for (unsigned int i=0;i<readBlocks_;i++)
1347  {
1348  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1350  existingSize+=last;
1351  }
1352  }
1353  else {
1354  const uint32_t chunksize = file->chunkPosition_;
1355  const uint32_t blockcount=chunksize/eventChunkBlock_;
1356  const uint32_t leftsize = chunksize%eventChunkBlock_;
1357  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1358  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1359 
1360  for (uint32_t i=0;i<blockcount;i++) {
1361  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1363  existingSize+=last;
1364  }
1365  if (leftsize) {
1366  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1368  }
1369  file->chunkPosition_=0;//data was moved to beginning of the chunk
1370  }
1371  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1372  if (fileDescriptor_!=-1)
1373  {
1374  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1375  close(fileDescriptor_);
1376  fileDescriptor_=-1;
1377  }
1378  }
1379 }
1380 
1381 
1383 {
1384 
1385  std::lock_guard<std::mutex> lock(monlock_);
1386  auto itr = sourceEventsReport_.find(lumi);
1387  if (itr!=sourceEventsReport_.end())
1388  itr->second+=events;
1389  else
1390  sourceEventsReport_[lumi]=events;
1391 }
1392 
1393 std::pair<bool,unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase)
1394 {
1395  std::lock_guard<std::mutex> lock(monlock_);
1396  auto itr = sourceEventsReport_.find(lumi);
1397  if (itr!=sourceEventsReport_.end()) {
1398  auto && ret = std::pair<bool,unsigned int>(true,itr->second);
1399  if (erase)
1400  sourceEventsReport_.erase(itr);
1401  return ret;
1402  }
1403  else
1404  return std::pair<bool,unsigned int>(false,0);
1405 }
1406 
1408 {
1409  std::sort(fileNames_.begin(),fileNames_.end(),
1410  [](std::string a, std::string b) {
1411  if (a.rfind("/")!=std::string::npos) a=a.substr(a.rfind("/"));
1412  if (b.rfind("/")!=std::string::npos) b=b.substr(b.rfind("/"));
1413  return b > a;});
1414 
1415  if (fileNames_.size()) {
1416  //get run number from first file in the vector
1418  std::string fileStem = fileName.stem().string();
1419  auto end = fileStem.find("_");
1420  if (fileStem.find("run")==0) {
1421  std::string runStr = fileStem.substr(3,end-3);
1422  try {
1423  //get long to support test run numbers < 2^32
1424  long rval = boost::lexical_cast<long>(runStr);
1425  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: "<< rval;
1426  return rval;
1427  }
1428  catch( boost::bad_lexical_cast const& ) {
1429  edm::LogWarning("FedRawDataInputSource") << "Unable to autodetect run number in fileListMode from file -: "<< fileName;
1430  }
1431  }
1432  }
1433  return -1;
1434 }
1435 
1436 evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile(unsigned int& ls, std::string& nextFile, uint32_t& fsize, uint64_t& lockWaitTime)
1437 {
1438  if (fileListIndex_ < fileNames_.size()) {
1439  nextFile = fileNames_[fileListIndex_];
1440  if (nextFile.find("file://")==0) nextFile=nextFile.substr(7);
1441  else if (nextFile.find("file:")==0) nextFile=nextFile.substr(5);
1442  boost::filesystem::path fileName = nextFile;
1443  std::string fileStem = fileName.stem().string();
1444  if (fileStem.find("ls")) fileStem = fileStem.substr(fileStem.find("ls")+2);
1445  if (fileStem.find("_")) fileStem = fileStem.substr(0,fileStem.find("_"));
1446 
1447  if (!fileListLoopMode_)
1448  ls = boost::lexical_cast<unsigned int>(fileStem);
1449  else //always starting from LS 1 in loop mode
1450  ls = 1 + loopModeIterationInc_;
1451 
1452  //fsize = 0;
1453  //lockWaitTime = 0;
1454  fileListIndex_++;
1456  }
1457  else {
1458  if (!fileListLoopMode_)
1460  else {
1461  //loop through files until interrupted
1463  fileListIndex_=0;
1464  return getFile(ls,nextFile,fsize,lockWaitTime);
1465  }
1466  }
1467 }
#define LogDebug(id)
size
Write out results.
static const char runNumber_[]
Definition: start.py:1
void setComment(std::string const &value)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
unsigned int lumi_
std::vector< std::string > fileNames_
unsigned int getgpshigh(const unsigned char *)
uint32_t chunkPosition_
std::condition_variable cvWakeup_
virtual void read(edm::EventPrincipal &eventPrincipal) override
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:356
bool gtpe_board_sense(const unsigned char *p)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
virtual void rewind_() override
bool crc32c_hw_test()
Definition: crc32c.cc:354
unsigned int offset_
jsoncollector::DataPointDefinition * dpd_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
tbb::concurrent_queue< unsigned int > workerPool_
JetCorrectorParameters::Record record
Definition: classes.h:7
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void setInState(FastMonitoringThread::InputState inputState)
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
volatile std::atomic< bool > shutdown_flag
evf::EvFDaqDirector::FileStatus status_
void setInStateSup(FastMonitoringThread::InputState inputState)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:586
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
std::unique_ptr< std::thread > readSupervisorThread_
ProductProvenance const & dummyProvenance() const
Represents a JSON value.
Definition: value.h:111
std::string getEoLSFilePathOnBU(const unsigned int ls) const
#define nullptr
int timeout
Definition: mps_check.py:51
std::vector< std::condition_variable * > cvReader_
FedRawDataInputSource * parent_
unsigned int sourceid
Definition: fed_header.h:32
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:219
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
U second(std::pair< T, U > const &p)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void setComment(std::string const &value)
uint32_t bufferPosition_
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
Definition: FEDRawData.cc:32
bool advance(unsigned char *&dataPosition, const size_t size)
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:363
std::vector< int > streamFileTracker_
void moveToPreviousChunk(const size_t size, const size_t offset)
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID, bool verifyLumiSection)
unsigned char * buf_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
struct fedt_struct fedt_t
int grabNextJsonFile(boost::filesystem::path const &)
StreamID streamID() const
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
#define end
Definition: vmac.h:37
virtual void preForkReleaseResources() override
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
std::map< unsigned int, unsigned int > sourceEventsReport_
bool waitForChunk(unsigned int chunkid)
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
unsigned long long TimeValue_t
Definition: Timestamp.h:28
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:388
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
void setInputSource(FedRawDataInputSource *inputSource)
std::atomic< bool > readComplete_
virtual bool checkNextEvent() override
unsigned long long int rval
Definition: vlib.h:22
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:592
def ls(path, rec=False)
Definition: eostools.py:348
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
def getRunNumber(filename)
unsigned long long uint64_t
Definition: Time.h:15
unsigned int currentChunk_
auto dp
Definition: deltaR.h:22
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
def load(fileName)
Definition: svgfig.py:546
std::string fileName_
double b
Definition: hdecay.h:120
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:209
void setProcessHistoryID(ProcessHistoryID const &phid)
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:264
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:371
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:357
unsigned int eventsize
Definition: fed_trailer.h:33
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
std::string getBoLSFilePathOnFU(const unsigned int ls) const
HLT enums.
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:359
void readWorker(unsigned int tid)
double a
Definition: hdecay.h:121
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
bool emptyLumisectionMode() const
virtual void postForkReacquireResources(std::shared_ptr< edm::multicore::MessageReceiverForSource >) override
unsigned int gtpe_get(const unsigned char *)
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
tbb::concurrent_queue< InputChunk * > freeChunks_
evf::EvFDaqDirector::FileStatus getNextEvent()
evf::EvFDaqDirector::FileStatus nextEvent()
void readNextChunkIntoBuffer(InputFile *file)
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:267
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
def move(src, dest)
Definition: eostools.py:510
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
Definition: event.py:1
std::string getEoRFilePathOnFU() const