test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <memory>
5 #include <sstream>
6 #include <sys/types.h>
7 #include <sys/file.h>
8 #include <sys/time.h>
9 #include <unistd.h>
10 #include <vector>
11 #include <fstream>
12 #include <zlib.h>
13 #include <stdio.h>
14 #include <chrono>
15 
16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
18 
19 
22 
29 
33 
35 
39 
41 
46 
47 //JSON file reader
49 
50 #include <boost/lexical_cast.hpp>
51 
52 using namespace jsoncollector;
53 
55  edm::InputSourceDescription const& desc) :
56  edm::RawInputSource(pset, desc),
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",32)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",32)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",2)),
61  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
62  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
63  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
64  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
65  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
66  fileNames_(pset.getUntrackedParameter<std::vector<std::string>> ("fileNames",std::vector<std::string>())),
67  fileListMode_(pset.getUntrackedParameter<bool> ("fileListMode", false)),
68  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
69  fuOutputDir_(std::string()),
70  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
71  eventID_(),
72  processHistoryID_(),
73  currentLumiSection_(0),
74  tcds_pointer_(0),
75  eventsThisLumi_(0),
76  dpd_(nullptr)
77 {
78  char thishost[256];
79  gethostname(thishost, 255);
80  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
81  << std::endl << (eventChunkSize_/1048576)
82  << " MB on host " << thishost;
83 
84  long autoRunNumber = -1;
85  if (fileListMode_) {
86  autoRunNumber = initFileList();
87  if (autoRunNumber<0)
88  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
89  //override run number
90  runNumber_ = (edm::RunNumber_t)autoRunNumber;
91  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
92  }
93 
95  setNewRun();
96  //todo:autodetect from file name (assert if names differ)
99 
100  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
101  defPath_ = std::string(getenv("CMSSW_BASE")) + "/" + defPathSuffix;
102  struct stat statbuf;
103  if (stat(defPath_.c_str(), &statbuf)) {
104  defPath_ = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
105  if (stat(defPath_.c_str(), &statbuf)) {
106  defPath_ = defPathSuffix;
107  }
108  }
109 
110  dpd_ = new DataPointDefinition();
111  std::string defLabel = "data";
112  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
113 
114  //make sure that chunk size is N * block size
119 
120  if (!numBuffers_)
121  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
122  "no reading enabled with numBuffers parameter 0";
123 
127 
128  if (!crc32c_hw_test())
129  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
130 
131  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
132  if (fileListMode_) {
133  try {
135  } catch(...) {}
136  }
137  else {
139  if (!fms_) {
140  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
141  }
142  }
143 
145  if (!daqDirector_)
146  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
147 
148  //set DaqDirector to delete files in preGlobalEndLumi callback
150  if (fms_) {
152  fms_->setInputSource(this);
155  }
156  //should delete chunks when run stops
157  for (unsigned int i=0;i<numBuffers_;i++) {
159  }
160 
161  quit_threads_ = false;
162 
163  for (unsigned int i=0;i<numConcurrentReads_;i++)
164  {
165  std::unique_lock<std::mutex> lk(startupLock_);
166  //issue a memory fence here and in threads (constructor was segfaulting without this)
167  thread_quit_signal.push_back(false);
168  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
169  cvReader_.push_back(new std::condition_variable);
170  tid_active_.push_back(0);
171  threadInit_.store(false,std::memory_order_release);
172  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
173  startupCv_.wait(lk);
174  }
175 
176  runAuxiliary()->setProcessHistoryID(processHistoryID_);
177 }
178 
180 {
181  quit_threads_=true;
182 
183  //delete any remaining open files
184  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
185  deleteFile(it->second->fileName_);
186  delete it->second;
187  }
189  readSupervisorThread_->join();
190  }
191  else {
192  //join aux threads in case the supervisor thread was not started
193  for (unsigned int i=0;i<workerThreads_.size();i++) {
194  std::unique_lock<std::mutex> lk(mReader_);
195  thread_quit_signal[i]=true;
196  cvReader_[i]->notify_one();
197  lk.unlock();
198  workerThreads_[i]->join();
199  delete workerThreads_[i];
200  }
201  }
202  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
203  /*
204  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
205  InputChunk *ch;
206  while (!freeChunks_.try_pop(ch)) {}
207  delete ch;
208  }
209  */
210 }
211 
213 {
215  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
216  desc.addUntracked<unsigned int> ("eventChunkSize",32)->setComment("Input buffer (chunk) size");
217  desc.addUntracked<unsigned int> ("eventChunkBlock",32)->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
218  desc.addUntracked<unsigned int> ("numBuffers",2)->setComment("Number of buffers used for reading input");
219  desc.addUntracked<unsigned int> ("maxBufferedFiles",2)->setComment("Maximum number of simultaneously buffered raw files");
220  desc.addUntracked<bool> ("verifyAdler32", true)->setComment("Verify event Adler32 checksum with FRDv3 or v4");
221  desc.addUntracked<bool> ("verifyChecksum", true)->setComment("Verify event CRC-32C checksum of FRDv5 or higher");
222  desc.addUntracked<bool> ("useL1EventID", false)->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
223  desc.addUntracked<bool> ("fileListMode", false)->setComment("Use fileNames parameter to directly specify raw files to open");
224  desc.addUntracked<std::vector<std::string>> ("fileNames", std::vector<std::string>())->setComment("file list used when fileListMode is enabled");
225  desc.setAllowAnything();
226  descriptions.add("source", desc);
227 }
228 
230 {
232  {
233  //late init of directory variable
235 
236  //this thread opens new files and dispatches reading to worker readers
237  //threadInit_.store(false,std::memory_order_release);
238  std::unique_lock<std::mutex> lk(startupLock_);
239  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
241  startupCv_.wait(lk);
242  }
243  //signal hltd to start event accounting
247  switch (nextEvent() ) {
249  //maybe create EoL file in working directory before ending run
250  struct stat buf;
251  if ( currentLumiSection_ > 0) {
252  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
253  if (eolFound) {
255  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
256  if ( !found ) {
258  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
259  close(eol_fd);
261  }
262  }
263  }
264  //also create EoR file in FU data directory
265  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
266  if (!eorFound) {
267  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
268  close(eor_fd);
269  }
271  eventsThisLumi_=0;
273  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
274  return false;
275  }
277  //this is not reachable
278  return true;
279  }
281  //std::cout << "--------------NEW LUMI---------------" << std::endl;
282  return true;
283  }
284  default: {
285  if (!getLSFromFilename_) {
286  //get new lumi from file header
287  if (event_->lumi() > currentLumiSection_) {
289  eventsThisLumi_=0;
290  maybeOpenNewLumiSection( event_->lumi() );
291  }
292  }
293  eventRunNumber_=event_->run();
294  L1EventID_ = event_->event();
295 
296  setEventCached();
297 
298  return true;
299  }
300  }
301 }
302 
303 void FedRawDataInputSource::createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
304 {
305  //used for backpressure mechanisms and monitoring
306  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
307  struct stat buf;
308  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
309  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
310  close(bol_fd);
311  }
312 }
313 
314 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection)
315 {
317  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
318 
319  if ( currentLumiSection_ > 0) {
320  const std::string fuEoLS =
322  struct stat buf;
323  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
324  if ( !found ) {
326  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
327  close(eol_fd);
328  createBoLSFile(lumiSection,false);
330  }
331  }
332  else createBoLSFile(lumiSection,true);//needed for initial lumisection
333 
334  currentLumiSection_ = lumiSection;
335 
337 
338  timeval tv;
339  gettimeofday(&tv, 0);
340  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
341 
342  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
344  runAuxiliary()->run(),
345  lumiSection, lsopentime,
347 
348  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
349  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
350 
351  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
352  }
353 }
354 
356 {
358  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
359  {
360  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
361  }
362  return status;
363 }
364 
366 {
367 
369  if (!currentFile_)
370  {
371  if (!streamFileTrackerPtr_) {
375  }
376 
379  if (!fileQueue_.try_pop(currentFile_))
380  {
381  //sleep until wakeup (only in single-buffer mode) or timeout
382  std::unique_lock<std::mutex> lkw(mWakeup_);
383  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
385  }
386  status = currentFile_->status_;
387  if ( status == evf::EvFDaqDirector::runEnded)
388  {
390  delete currentFile_;
391  currentFile_=nullptr;
392  return status;
393  }
394  else if ( status == evf::EvFDaqDirector::runAbort)
395  {
396  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
397  }
398  else if (status == evf::EvFDaqDirector::newLumi)
399  {
401  if (getLSFromFilename_) {
404  eventsThisLumi_=0;
406  }
407  }
408  else {//let this be picked up from next event
410  }
411 
412  delete currentFile_;
413  currentFile_=nullptr;
414  return status;
415  }
416  else if (status == evf::EvFDaqDirector::newFile) {
419  }
420  else
421  assert(0);
422  }
424 
425  //file is empty
426  if (!currentFile_->fileSize_) {
428  //try to open new lumi
430  if (getLSFromFilename_)
433  eventsThisLumi_=0;
435  }
436  //immediately delete empty file
438  delete currentFile_;
439  currentFile_=nullptr;
441  }
442 
443  //file is finished
446  //release last chunk (it is never released elsewhere)
449  {
450  throw cms::Exception("FedRawDataInputSource::getNextEvent")
451  << "Fully processed " << currentFile_->nProcessed_
452  << " from the file " << currentFile_->fileName_
453  << " but according to BU JSON there should be "
454  << currentFile_->nEvents_ << " events";
455  }
456  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
457  if (singleBufferMode_) {
458  std::unique_lock<std::mutex> lkw(mWakeup_);
459  cvWakeup_.notify_one();
460  }
463  //put the file in pending delete list;
464  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
465  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
466  }
467  else {
468  //in single-thread and stream jobs, events are already processed
470  delete currentFile_;
471  }
472  currentFile_=nullptr;
474  }
475 
476 
477  //file is too short
479  {
480  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
481  "Premature end of input file while reading event header";
482  }
483  if (singleBufferMode_) {
484 
485  //should already be there
488  usleep(10000);
490  }
492 
493  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
494 
495  //conditions when read amount is not sufficient for the header to fit
496  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
498  {
500 
501  if (detectedFRDversion_==0) {
502  detectedFRDversion_=*((uint32*)dataPosition);
503  if (detectedFRDversion_>5)
504  throw cms::Exception("FedRawDataInputSource::getNextEvent")
505  << "Unknown FRD version -: " << detectedFRDversion_;
506  assert(detectedFRDversion_>=1);
507  }
508 
509  //recalculate chunk position
510  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
511  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
512  {
513  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
514  "Premature end of input file while reading event header";
515  }
516  }
517 
518  event_.reset( new FRDEventMsgView(dataPosition) );
519  if (event_->size()>eventChunkSize_) {
520  throw cms::Exception("FedRawDataInputSource::getNextEvent")
521  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
522  << " run:" << event_->run() << " of size:" << event_->size()
523  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
524  }
525 
526  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
527 
529  {
530  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
531  "Premature end of input file while reading event data";
532  }
533  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
535  //recalculate chunk position
536  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
537  event_.reset( new FRDEventMsgView(dataPosition) );
538  }
539  currentFile_->bufferPosition_ += event_->size();
540  currentFile_->chunkPosition_ += event_->size();
541  //last chunk is released when this function is invoked next time
542 
543  }
544  //multibuffer mode:
545  else
546  {
547  //wait for the current chunk to become added to the vector
550  usleep(10000);
552  }
554 
555  //check if header is at the boundary of two chunks
556  chunkIsFree_ = false;
557  unsigned char *dataPosition;
558 
559  //read header, copy it to a single chunk if necessary
560  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
561 
562  event_.reset( new FRDEventMsgView(dataPosition) );
563  if (event_->size()>eventChunkSize_) {
564  throw cms::Exception("FedRawDataInputSource::getNextEvent")
565  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
566  << " run:" << event_->run() << " of size:" << event_->size()
567  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
568  }
569 
570  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
571 
573  {
574  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
575  "Premature end of input file while reading event data";
576  }
577 
578  if (chunkEnd) {
579  //header was at the chunk boundary, we will have to move payload as well
580  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
581  chunkIsFree_ = true;
582  }
583  else {
584  //header was contiguous, but check if payload fits the chunk
585  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
586  //rewind to header start position
587  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
588  //copy event to a chunk start and move pointers
589  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
590  assert(chunkEnd);
591  chunkIsFree_=true;
592  //header is moved
593  event_.reset( new FRDEventMsgView(dataPosition) );
594  }
595  else {
596  //everything is in a single chunk, only move pointers forward
597  chunkEnd = currentFile_->advance(dataPosition,msgSize);
598  assert(!chunkEnd);
599  chunkIsFree_=false;
600  }
601  }
602  }//end multibuffer mode
604 
605  if (verifyChecksum_ && event_->version() >= 5)
606  {
607  uint32_t crc=0;
608  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
609  if ( crc != event_->crc32c() ) {
611  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
612  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
613  " but calculated 0x" << crc;
614  }
615  }
616  else if ( verifyAdler32_ && event_->version() >= 3)
617  {
618  uint32_t adler = adler32(0L,Z_NULL,0);
619  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
620 
621  if ( adler != event_->adler32() ) {
623  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
624  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
625  " but calculated 0x" << adler;
626  }
627  }
629 
631 
633 }
634 
636 {
637  //no deletion in file list mode
638  if (fileListMode_) return;
639 
640  const boost::filesystem::path filePath(fileName);
641  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
642  try {
643  //sometimes this fails but file gets deleted
644  boost::filesystem::remove(filePath);
645  }
646  catch (const boost::filesystem::filesystem_error& ex)
647  {
648  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
649  << ". Trying again.";
650  usleep(100000);
651  try {
652  boost::filesystem::remove(filePath);
653  }
654  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
655  }
656  catch (std::exception& ex)
657  {
658  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
659  << ". Trying again.";
660  usleep(100000);
661  try {
662  boost::filesystem::remove(filePath);
663  } catch (std::exception&) {/*file gets deleted first time but exception is still thrown*/}
664  }
665 }
666 
667 
669 {
671  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
672  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
673 
674  if (useL1EventID_){
676  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
679  makeEvent(eventPrincipal, aux);
680  }
681  else if(tcds_pointer_==0){
684  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
687  makeEvent(eventPrincipal, aux);
688  }
689  else{
690  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
693  processGUID());
695  makeEvent(eventPrincipal, aux);
696  }
697 
698 
699 
700  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
701 
702  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
703  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
704  // daqProvenanceHelper_.dummyProvenance_);
705 
706  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
708 
709  eventsThisLumi_++;
711 
712  //this old file check runs no more often than every 10 events
713  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
714  //delete files that are not in processing
715  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
716  auto it = filesToDelete_.begin();
717  while (it!=filesToDelete_.end()) {
718  bool fileIsBeingProcessed = false;
719  for (unsigned int i=0;i<nStreams_;i++) {
720  if (it->first == streamFileTrackerPtr_->at(i)) {
721  fileIsBeingProcessed = true;
722  break;
723  }
724  }
725  if (!fileIsBeingProcessed) {
726  deleteFile(it->second->fileName_);
727  delete it->second;
728  it = filesToDelete_.erase(it);
729  }
730  else it++;
731  }
732 
733  }
735  chunkIsFree_=false;
737  return;
738 }
739 
741 {
742  edm::TimeValue_t time;
743  timeval stv;
744  gettimeofday(&stv,0);
745  time = stv.tv_sec;
746  time = (time << 32) + stv.tv_usec;
747  edm::Timestamp tstamp(time);
748 
749  uint32_t eventSize = event_->eventSize();
750  char* event = (char*)event_->payload();
751  GTPEventID_=0;
752  tcds_pointer_ = 0;
753  while (eventSize > 0) {
754  assert(eventSize>=sizeof(fedt_t));
755  eventSize -= sizeof(fedt_t);
756  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
757  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
758  assert(eventSize>=fedSize - sizeof(fedt_t));
759  eventSize -= (fedSize - sizeof(fedt_t));
760  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
761  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
762  if(fedId>FEDNumbering::MAXFEDID)
763  {
764  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
765  }
766  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
767  tcds_pointer_ = (unsigned char *)(event + eventSize );
768  }
769  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
770  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
771  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
772  else
773  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
774  //evf::evtn::evm_board_setformat(fedSize);
775  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
776  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
777  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
778  }
779  //take event ID from GTPE FED
780  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
781  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
782  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
783  }
784  }
785  FEDRawData& fedData = rawData.FEDData(fedId);
786  fedData.resize(fedSize);
787  memcpy(fedData.data(), event + eventSize, fedSize);
788  }
789  assert(eventSize == 0);
790 
791  return tstamp;
792 }
793 
795 {
797  try {
798  // assemble json destination path
800 
801  //TODO:should be ported to use fffnaming
802  std::ostringstream fileNameWithPID;
803  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
804  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
805  jsonDestPath /= fileNameWithPID.str();
806 
807  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
808  << jsonDestPath;
809  try {
810  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
811  }
812  catch (const boost::filesystem::filesystem_error& ex)
813  {
814  // Input dir gone?
815  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
816  // << " Maybe the file is not yet visible by FU. Trying again in one second";
817  sleep(1);
818  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
819  }
821 
822  try {
823  //sometimes this fails but file gets deleted
824  boost::filesystem::remove(jsonSourcePath);
825  }
826  catch (const boost::filesystem::filesystem_error& ex)
827  {
828  // Input dir gone?
829  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
830  }
831  catch (std::exception& ex)
832  {
833  // Input dir gone?
834  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
835  }
836 
837  boost::filesystem::ifstream ij(jsonDestPath);
838  Json::Value deserializeRoot;
840 
841  std::stringstream ss;
842  ss << ij.rdbuf();
843  if (!reader.parse(ss.str(), deserializeRoot)) {
844  edm::LogError("FedRawDataInputSource") << "Failed to deserialize JSON file -: " << jsonDestPath
845  << "\nERROR:\n" << reader.getFormatedErrorMessages()
846  << "CONTENT:\n" << ss.str()<<".";
847  throw std::runtime_error("Cannot deserialize input JSON file");
848  }
849 
850  //read BU JSON
852  DataPoint dp;
853  dp.deserialize(deserializeRoot);
854  bool success = false;
855  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
856  if (dpd_->getNames().at(i)=="NEvents")
857  if (i<dp.getData().size()) {
858  data = dp.getData()[i];
859  success=true;
860  }
861  }
862  if (!success) {
863  if (dp.getData().size())
864  data = dp.getData()[0];
865  else
866  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
867  " error reading number of events from BU JSON -: No input value " << data;
868  }
869  return boost::lexical_cast<int>(data);
870 
871  }
872  catch (const boost::filesystem::filesystem_error& ex)
873  {
874  // Input dir gone?
876  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
877  }
878  catch (std::runtime_error e)
879  {
880  // Another process grabbed the file and NFS did not register this
882  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
883  }
884 
885  catch( boost::bad_lexical_cast const& ) {
886  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
887  << "Input value is -: " << data;
888  }
889 
890  catch (std::exception e)
891  {
892  // BU run directory disappeared?
894  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
895  }
896 
897  return -1;
898 }
899 
901 {}
902 
903 void FedRawDataInputSource::postForkReacquireResources(std::shared_ptr<edm::multicore::MessageReceiverForSource>)
904 {
905  InputSource::rewind();
909 }
910 
912 {}
913 
914 
916 {
917  bool stop=false;
918  unsigned int currentLumiSection = 0;
919  //threadInit_.exchange(true,std::memory_order_acquire);
920 
921  {
922  std::unique_lock<std::mutex> lk(startupLock_);
923  startupCv_.notify_one();
924  }
925 
926  uint32_t ls=0;
927  uint32_t monLS=1;
928  uint32_t lockCount=0;
929  uint64_t sumLockWaitTimeUs=0.;
930 
931  while (!stop) {
932 
933  //wait for at least one free thread and chunk
934  int counter=0;
936  {
937  //report state to monitoring
938  if (fms_) {
939  bool copy_active=false;
940  for (auto j : tid_active_) if (j) copy_active=true;
942  else if (freeChunks_.empty()) {
943  if (copy_active)
945  else
947  }
948  else {
949  if (copy_active)
951  else
953  }
954  }
955  std::unique_lock<std::mutex> lkw(mWakeup_);
956  //sleep until woken up by condition or a timeout
957  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
958  counter++;
959  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
960  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
961  }
962  else {
963  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
964  }
965  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
966  }
967 
968  if (stop) break;
969 
970  //look for a new file
971  std::string nextFile;
972  uint32_t fileSize;
973 
974  if (fms_) {
978  }
979 
981 
982  while (status == evf::EvFDaqDirector::noFile) {
983  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
984  stop=true;
985  break;
986  }
987 
988  uint64_t thisLockWaitTimeUs=0.;
989  if (fileListMode_) {
990  //return LS if LS not set, otherwise return file
991  status = getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
992  }
993  else
994  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
995 
997 
998  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
999 
1000  //monitoring of lock wait time
1001  if (thisLockWaitTimeUs>0.)
1002  sumLockWaitTimeUs+=thisLockWaitTimeUs;
1003  lockCount++;
1004  if (ls>monLS) {
1005  monLS=ls;
1006  if (lockCount)
1007  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
1008  lockCount=0;
1009  sumLockWaitTimeUs=0;
1010  }
1011 
1012  //check again for any remaining index/EoLS files after EoR file is seen
1013  if ( status == evf::EvFDaqDirector::runEnded && !fileListMode_) {
1015  usleep(100000);
1016  //now all files should have appeared in ramdisk, check again if any raw files were left behind
1017  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
1018  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
1019  }
1020 
1021  if ( status == evf::EvFDaqDirector::runEnded) {
1023  stop=true;
1024  break;
1025  }
1026 
1027  //queue new lumisection
1028  if( getLSFromFilename_ && ls > currentLumiSection) {
1029  //fms_->setInStateSup(evf::FastMonitoringThread::inSupNewLumi);
1030  currentLumiSection = ls;
1031  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
1032  }
1033 
1034  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
1035  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
1037  stop=true;
1038  break;
1039  }
1040 
1041  int dbgcount=0;
1042  if (status == evf::EvFDaqDirector::noFile) {
1044  dbgcount++;
1045  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1046  usleep(100000);
1047  }
1048  }
1049  if ( status == evf::EvFDaqDirector::newFile ) {
1051  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1052 
1053 
1054  boost::filesystem::path rawFilePath(nextFile);
1055  std::string rawFile = rawFilePath.replace_extension(".raw").string();
1056 
1057  struct stat st;
1058  int stat_res = stat(rawFile.c_str(),&st);
1059  if (stat_res==-1) {
1060  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-"<< rawFile << std::endl;
1061  setExceptionState_=true;
1062  stop=true;
1063  break;
1064  }
1065  fileSize=st.st_size;
1066 
1067  if (fms_) {
1071  }
1072  int eventsInNewFile;
1073  if (fileListMode_) {
1074  if (fileSize==0) eventsInNewFile=0;
1075  else eventsInNewFile=-1;
1076  }
1077  else {
1078  eventsInNewFile = grabNextJsonFile(nextFile);
1079  assert( eventsInNewFile>=0 );
1080  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1081  }
1082  //int eventsInNewFile = fileListMode_ ? -1 : grabNextJsonFile(nextFile);
1083  //if (fileListMode_ && fileSize==0) {eventsInNewFile=0;}
1084  //if (!fileListMode_) {
1085  // assert( eventsInNewFile>=0 );
1086  // assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1087  //}
1088 
1089  if (!singleBufferMode_) {
1090  //calculate number of needed chunks
1091  unsigned int neededChunks = fileSize/eventChunkSize_;
1092  if (fileSize%eventChunkSize_) neededChunks++;
1093 
1094  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
1096  fileQueue_.push(newInputFile);
1097 
1098  for (unsigned int i=0;i<neededChunks;i++) {
1099 
1100  if (fms_) {
1101  bool copy_active=false;
1102  for (auto j : tid_active_) if (j) copy_active=true;
1103  if (copy_active)
1105  else
1107  }
1108  //get thread
1109  unsigned int newTid = 0xffffffff;
1110  while (!workerPool_.try_pop(newTid)) {
1111  usleep(100000);
1112  }
1113 
1114  if (fms_) {
1115  bool copy_active=false;
1116  for (auto j : tid_active_) if (j) copy_active=true;
1117  if (copy_active)
1119  else
1121  }
1122  InputChunk * newChunk = nullptr;
1123  while (!freeChunks_.try_pop(newChunk)) {
1124  usleep(100000);
1125  if (quit_threads_.load(std::memory_order_relaxed)) break;
1126  }
1127 
1128  if (newChunk == nullptr) {
1129  //return unused tid if we received shutdown (nullptr chunk)
1130  if (newTid!=0xffffffff) workerPool_.push(newTid);
1131  stop = true;
1132  break;
1133  }
1135 
1136  std::unique_lock<std::mutex> lk(mReader_);
1137 
1138  unsigned int toRead = eventChunkSize_;
1139  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1140  newChunk->reset(i*eventChunkSize_,toRead,i);
1141 
1142  workerJob_[newTid].first=newInputFile;
1143  workerJob_[newTid].second=newChunk;
1144 
1145  //wake up the worker thread
1146  cvReader_[newTid]->notify_one();
1147  }
1148  }
1149  else {
1150  if (!eventsInNewFile) {
1151  //still queue file for lumi update
1152  std::unique_lock<std::mutex> lkw(mWakeup_);
1153  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1155  fileQueue_.push(newInputFile);
1156  cvWakeup_.notify_one();
1157  return;
1158  }
1159  //in single-buffer mode put single chunk in the file and let the main thread read the file
1160  InputChunk * newChunk;
1161  //should be available immediately
1162  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1163 
1164  std::unique_lock<std::mutex> lkw(mWakeup_);
1165 
1166  unsigned int toRead = eventChunkSize_;
1167  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1168  newChunk->reset(0,toRead,0);
1169  newChunk->readComplete_=true;
1170 
1171  //push file and wakeup main thread
1172  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1173  newInputFile->chunks_[0]=newChunk;
1175  fileQueue_.push(newInputFile);
1176  cvWakeup_.notify_one();
1177  }
1178  }
1179  }
1181  //make sure threads finish reading
1182  unsigned numFinishedThreads = 0;
1183  while (numFinishedThreads < workerThreads_.size()) {
1184  unsigned int tid;
1185  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1186  std::unique_lock<std::mutex> lk(mReader_);
1187  thread_quit_signal[tid]=true;
1188  cvReader_[tid]->notify_one();
1189  numFinishedThreads++;
1190  }
1191  for (unsigned int i=0;i<workerThreads_.size();i++) {
1192  workerThreads_[i]->join();
1193  delete workerThreads_[i];
1194  }
1195 }
1196 
1198 {
1199  bool init = true;
1200  threadInit_.exchange(true,std::memory_order_acquire);
1201 
1202  while (1) {
1203 
1204  tid_active_[tid]=false;
1205  std::unique_lock<std::mutex> lk(mReader_);
1206  workerJob_[tid].first=nullptr;
1207  workerJob_[tid].first=nullptr;
1208 
1209  assert(!thread_quit_signal[tid]);//should never get it here
1210  workerPool_.push(tid);
1211 
1212  if (init) {
1213  std::unique_lock<std::mutex> lk(startupLock_);
1214  init = false;
1215  startupCv_.notify_one();
1216  }
1217  cvReader_[tid]->wait(lk);
1218 
1219  if (thread_quit_signal[tid]) return;
1220  tid_active_[tid]=true;
1221 
1222  InputFile * file;
1223  InputChunk * chunk;
1224 
1225  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1226 
1227  file = workerJob_[tid].first;
1228  chunk = workerJob_[tid].second;
1229 
1230  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1231  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1232 
1233 
1234  if (fileDescriptor>=0)
1235  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1236  else
1237  {
1238  edm::LogError("FedRawDataInputSource") <<
1239  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1240  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1241  setExceptionState_=true;
1242  return;
1243 
1244  }
1245 
1246  unsigned int bufferLeft = 0;
1248  for (unsigned int i=0;i<readBlocks_;i++)
1249  {
1250  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1251  if ( last > 0 )
1252  bufferLeft+=last;
1253  if (last < eventChunkBlock_) {
1254  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1255  break;
1256  }
1257  }
1259  auto diff = end-start;
1260  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1261  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1262  close(fileDescriptor);
1263 
1264  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1266  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1267  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1268 
1269  }
1270 }
1271 
1273 {
1274  quit_threads_=true;
1275  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1276 
1277 }
1278 
1279 
1280 inline bool InputFile::advance(unsigned char* & dataPosition, const size_t size)
1281 {
1282  //wait for chunk
1283  while (!waitForChunk(currentChunk_)) {
1284  usleep(100000);
1286  }
1287 
1288  dataPosition = chunks_[currentChunk_]->buf_+ chunkPosition_;
1289  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1290 
1291  if (currentLeft < size) {
1292 
1293  //we need next chunk
1294  while (!waitForChunk(currentChunk_+1)) {
1295  usleep(100000);
1297  }
1298  //copy everything to beginning of the first chunk
1299  dataPosition-=chunkPosition_;
1300  assert(dataPosition==chunks_[currentChunk_]->buf_);
1301  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_+chunkPosition_, currentLeft);
1302  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_+1]->buf_, size - currentLeft);
1303  //set pointers at the end of the old data position
1305  chunkPosition_=size-currentLeft;
1306  currentChunk_++;
1307  return true;
1308  }
1309  else {
1312  return false;
1313  }
1314 }
1315 
1316 inline void InputFile::moveToPreviousChunk(const size_t size, const size_t offset)
1317 {
1318  //this will fail in case of events that are too large
1320  assert(size - offset < chunks_[currentChunk_]->size_);
1321  memcpy(chunks_[currentChunk_-1]->buf_+offset,chunks_[currentChunk_]->buf_+chunkPosition_,size);
1324 }
1325 
1326 inline void InputFile::rewindChunk(const size_t size) {
1329 }
1330 
1331 //single-buffer mode file reading
1333 {
1334 
1335  if (fileDescriptor_<0) {
1336  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1337  bufferInputRead_ = 0;
1338  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1339  if (fileDescriptor_>=0)
1340  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1341  else
1342  {
1343  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1344  << file->fileName_ << " fd:" << fileDescriptor_;
1345  }
1346  }
1347 
1348  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1349  uint32_t existingSize = 0;
1350  for (unsigned int i=0;i<readBlocks_;i++)
1351  {
1352  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1354  existingSize+=last;
1355  }
1356  }
1357  else {
1358  const uint32_t chunksize = file->chunkPosition_;
1359  const uint32_t blockcount=chunksize/eventChunkBlock_;
1360  const uint32_t leftsize = chunksize%eventChunkBlock_;
1361  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1362  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1363 
1364  for (uint32_t i=0;i<blockcount;i++) {
1365  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1367  existingSize+=last;
1368  }
1369  if (leftsize) {
1370  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1372  }
1373  file->chunkPosition_=0;//data was moved to beginning of the chunk
1374  }
1375  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1376  if (fileDescriptor_!=-1)
1377  {
1378  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1379  close(fileDescriptor_);
1380  fileDescriptor_=-1;
1381  }
1382  }
1383 }
1384 
1385 
1387 {
1388 
1389  std::lock_guard<std::mutex> lock(monlock_);
1390  auto itr = sourceEventsReport_.find(lumi);
1391  if (itr!=sourceEventsReport_.end())
1392  itr->second+=events;
1393  else
1394  sourceEventsReport_[lumi]=events;
1395 }
1396 
1397 std::pair<bool,unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase)
1398 {
1399  std::lock_guard<std::mutex> lock(monlock_);
1400  auto itr = sourceEventsReport_.find(lumi);
1401  if (itr!=sourceEventsReport_.end()) {
1402  auto && ret = std::pair<bool,unsigned int>(true,itr->second);
1403  if (erase)
1404  sourceEventsReport_.erase(itr);
1405  return ret;
1406  }
1407  else
1408  return std::pair<bool,unsigned int>(false,0);
1409 }
1410 
1412 {
1413  std::sort(fileNames_.begin(),fileNames_.end(),
1414  [](std::string a, std::string b) {
1415  if (a.rfind("/")!=std::string::npos) a=a.substr(a.rfind("/"));
1416  if (b.rfind("/")!=std::string::npos) b=b.substr(b.rfind("/"));
1417  return b > a;});
1418 
1419  if (fileNames_.size()) {
1420  //get run number from first file in the vector
1422  std::string fileStem = fileName.stem().string();
1423  auto end = fileStem.find("_");
1424  if (fileStem.find("run")==0) {
1425  std::string runStr = fileStem.substr(3,end-3);
1426  try {
1427  //get long to support test run numbers < 2^32
1428  long rval = boost::lexical_cast<long>(runStr);
1429  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: "<< rval;
1430  return rval;
1431  }
1432  catch( boost::bad_lexical_cast const& ) {
1433  edm::LogWarning("FedRawDataInputSource") << "Unable to autodetect run number in fileListMode from file -: "<< fileName;
1434  }
1435  }
1436  }
1437  return -1;
1438 }
1439 
1440 evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile(unsigned int& ls, std::string& nextFile, uint32_t& fsize, uint64_t& lockWaitTime)
1441 {
1442  if (fileListIndex_ < fileNames_.size()) {
1443  nextFile = fileNames_[fileListIndex_];
1444  if (nextFile.find("file://")==0) nextFile=nextFile.substr(7);
1445  else if (nextFile.find("file:")==0) nextFile=nextFile.substr(5);
1446  boost::filesystem::path fileName = nextFile;
1447  std::string fileStem = fileName.stem().string();
1448  if (fileStem.find("ls")) fileStem = fileStem.substr(fileStem.find("ls")+2);
1449  if (fileStem.find("_")) fileStem = fileStem.substr(0,fileStem.find("_"));
1450  ls = boost::lexical_cast<unsigned int>(fileStem);
1451  //fsize = 0;
1452  //lockWaitTime = 0;
1453  fileListIndex_++;
1455  }
1456  else
1458 }
#define LogDebug(id)
static const char runNumber_[]
void setComment(std::string const &value)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
unsigned int lumi_
int i
Definition: DBlmapReader.cc:9
std::vector< std::string > fileNames_
tuple ret
prodAgent to be discontinued
unsigned int getgpshigh(const unsigned char *)
uint32_t chunkPosition_
std::condition_variable cvWakeup_
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void read(edm::EventPrincipal &eventPrincipal) override
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:350
bool gtpe_board_sense(const unsigned char *p)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
virtual void rewind_() override
std::vector< int > * getStreamFileTracker()
bool crc32c_hw_test()
Definition: crc32c.cc:354
unsigned int offset_
jsoncollector::DataPointDefinition * dpd_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
tbb::concurrent_queue< unsigned int > workerPool_
JetCorrectorParameters::Record record
Definition: classes.h:7
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
tuple lumi
Definition: fjr2json.py:35
void setInState(FastMonitoringThread::InputState inputState)
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
int init
Definition: HydjetWrapper.h:67
def ls
Definition: eostools.py:348
evf::EvFDaqDirector::FileStatus status_
void setInStateSup(FastMonitoringThread::InputState inputState)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:590
std::atomic< bool > quit_threads_
volatile std::atomic< bool > shutdown_flag
std::vector< ReaderInfo > workerJob_
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
std::unique_ptr< std::thread > readSupervisorThread_
ProductProvenance const & dummyProvenance() const
Represents a JSON value.
Definition: value.h:111
std::string getEoLSFilePathOnBU(const unsigned int ls) const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
#define nullptr
int timeout
Definition: mps_check.py:51
std::vector< std::condition_variable * > cvReader_
list diff
Definition: mps_update.py:85
FedRawDataInputSource * parent_
unsigned int sourceid
Definition: fed_header.h:32
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:217
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
U second(std::pair< T, U > const &p)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void setComment(std::string const &value)
uint32_t bufferPosition_
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void updateFileIndex(int const &fileIndex)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
Definition: FEDRawData.cc:32
bool advance(unsigned char *&dataPosition, const size_t size)
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:357
void moveToPreviousChunk(const size_t size, const size_t offset)
unsigned char * buf_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
int grabNextJsonFile(boost::filesystem::path const &)
def move
Definition: eostools.py:510
def load
Definition: svgfig.py:546
BranchDescription const & branchDescription() const
int j
Definition: DBlmapReader.cc:9
edm::ProcessHistoryID processHistoryID_
#define end
Definition: vmac.h:37
virtual void preForkReleaseResources() override
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
std::map< unsigned int, unsigned int > sourceEventsReport_
bool waitForChunk(unsigned int chunkid)
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
Definition: Timestamp.h:28
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:382
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
void setInputSource(FedRawDataInputSource *inputSource)
std::atomic< bool > readComplete_
virtual bool checkNextEvent() override
unsigned long long int rval
Definition: vlib.h:22
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:596
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned long long uint64_t
Definition: Time.h:15
unsigned int currentChunk_
auto dp
Definition: deltaR.h:22
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
double b
Definition: hdecay.h:120
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void setProcessHistoryID(ProcessHistoryID const &phid)
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:262
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:365
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:351
unsigned int eventsize
Definition: fed_trailer.h:33
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
std::string getBoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:353
tuple events
Definition: patZpeak.py:19
void readWorker(unsigned int tid)
double a
Definition: hdecay.h:121
static std::atomic< unsigned int > counter
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
bool emptyLumisectionMode() const
virtual void postForkReacquireResources(std::shared_ptr< edm::multicore::MessageReceiverForSource >) override
friend class InputFile
Open Root file and provide MEs ############.
size_(0)
Definition: OwnArray.h:181
unsigned int gtpe_get(const unsigned char *)
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
volatile std::atomic< bool > shutdown_flag false
tbb::concurrent_queue< InputChunk * > freeChunks_
evf::EvFDaqDirector::FileStatus getNextEvent()
evf::EvFDaqDirector::FileStatus nextEvent()
void readNextChunkIntoBuffer(InputFile *file)
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:265
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
tuple size
Write out results.
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
std::string getEoRFilePathOnFU() const
tuple status
Definition: mps_update.py:57