CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <memory>
5 #include <sstream>
6 #include <sys/types.h>
7 #include <sys/file.h>
8 #include <sys/time.h>
9 #include <unistd.h>
10 #include <vector>
11 #include <fstream>
12 #include <zlib.h>
13 #include <stdio.h>
14 #include <chrono>
15 
16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
18 
19 
22 
29 
33 
35 
39 
41 
46 
47 //JSON file reader
49 
50 #include <boost/lexical_cast.hpp>
51 
52 using namespace jsoncollector;
53 
55  edm::InputSourceDescription const& desc) :
56  edm::RawInputSource(pset, desc),
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",32)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",32)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",2)),
61  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
62  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
63  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
64  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
65  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
66  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
67  fuOutputDir_(edm::Service<evf::EvFDaqDirector>()->baseRunDir()),
68  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
69  eventID_(),
70  processHistoryID_(),
71  currentLumiSection_(0),
72  tcds_pointer_(0),
73  eventsThisLumi_(0),
74  dpd_(nullptr)
75 {
76  char thishost[256];
77  gethostname(thishost, 255);
78  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
79  << std::endl << (eventChunkSize_/1048576)
80  << " MB on host " << thishost;
81 
83  setNewRun();
86 
87  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
88  defPath_ = std::string(getenv("CMSSW_BASE")) + "/" + defPathSuffix;
89  struct stat statbuf;
90  if (stat(defPath_.c_str(), &statbuf)) {
91  defPath_ = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
92  if (stat(defPath_.c_str(), &statbuf)) {
93  defPath_ = defPathSuffix;
94  }
95  }
96 
97  dpd_ = new DataPointDefinition();
98  std::string defLabel = "data";
99  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
100 
101  //make sure that chunk size is N * block size
106 
107  if (!numBuffers_)
108  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
109  "no reading enabled with numBuffers parameter 0";
110 
114 
115  if (!crc32c_hw_test())
116  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
117 
118  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
120  if (!fms_) {
121  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
122  }
123 
125  if (!daqDirector_)
126  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
127 
128  //set DaqDirector to delete files in preGlobalEndLumi callback
130  if (fms_) daqDirector_->setFMS(fms_);
131 
132  fms_->setInputSource(this);
133  //should delete chunks when run stops
134  for (unsigned int i=0;i<numBuffers_;i++) {
136  }
137 
138  quit_threads_ = false;
139 
140  for (unsigned int i=0;i<numConcurrentReads_;i++)
141  {
142  std::unique_lock<std::mutex> lk(startupLock_);
143  //issue a memory fence here and in threads (constructor was segfaulting without this)
144  thread_quit_signal.push_back(false);
145  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
146  cvReader_.push_back(new std::condition_variable);
147  threadInit_.store(false,std::memory_order_release);
148  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
149  startupCv_.wait(lk);
150  }
151 
152  runAuxiliary()->setProcessHistoryID(processHistoryID_);
153 }
154 
156 {
157  quit_threads_=true;
158 
159  //delete any remaining open files
160  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
161  deleteFile(it->second->fileName_);
162  delete it->second;
163  }
165  readSupervisorThread_->join();
166  }
167  else {
168  //join aux threads in case the supervisor thread was not started
169  for (unsigned int i=0;i<workerThreads_.size();i++) {
170  std::unique_lock<std::mutex> lk(mReader_);
171  thread_quit_signal[i]=true;
172  cvReader_[i]->notify_one();
173  lk.unlock();
174  workerThreads_[i]->join();
175  delete workerThreads_[i];
176  }
177  }
178  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
179  /*
180  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
181  InputChunk *ch;
182  while (!freeChunks_.try_pop(ch)) {}
183  delete ch;
184  }
185  */
186 }
187 
189 {
191  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
192  desc.addUntracked<unsigned int> ("eventChunkSize",32)->setComment("Input buffer (chunk) size");
193  desc.addUntracked<unsigned int> ("eventChunkBlock",32)->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
194  desc.addUntracked<unsigned int> ("numBuffers",2)->setComment("Number of buffers used for reading input");
195  desc.addUntracked<unsigned int> ("maxBufferedFiles",2)->setComment("Maximum number of simultaneously buffered raw files");
196  desc.addUntracked<bool> ("verifyAdler32", true)->setComment("Verify event Adler32 checksum with FRDv3 or v4");
197  desc.addUntracked<bool> ("verifyChecksum", true)->setComment("Verify event CRC-32C checksum of FRDv5 or higher");
198  desc.addUntracked<bool> ("useL1EventID", false)->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
199  desc.setAllowAnything();
200  descriptions.add("source", desc);
201 }
202 
204 {
206  {
207  //this thread opens new files and dispatches reading to worker readers
208  //threadInit_.store(false,std::memory_order_release);
209  std::unique_lock<std::mutex> lk(startupLock_);
210  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
212  startupCv_.wait(lk);
213  }
214  //signal hltd to start event accounting
217 
218  switch (nextEvent() ) {
220 
221  //maybe create EoL file in working directory before ending run
222  struct stat buf;
223  if ( currentLumiSection_ > 0 ) {
224  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
225  if (eolFound) {
227  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
228  if ( !found ) {
230  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
231  close(eol_fd);
233  }
234  }
235  }
236  //also create EoR file in FU data directory
237  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
238  if (!eorFound) {
239  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
240  close(eor_fd);
241  }
243  eventsThisLumi_=0;
245  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
246  return false;
247  }
249  //this is not reachable
250  return true;
251  }
253  //std::cout << "--------------NEW LUMI---------------" << std::endl;
254  return true;
255  }
256  default: {
257  if (!getLSFromFilename_) {
258  //get new lumi from file header
259  if (event_->lumi() > currentLumiSection_) {
261  eventsThisLumi_=0;
262  maybeOpenNewLumiSection( event_->lumi() );
263  }
264  }
265  eventRunNumber_=event_->run();
266  L1EventID_ = event_->event();
267 
268  setEventCached();
269 
270  return true;
271  }
272  }
273 }
274 
275 void FedRawDataInputSource::createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
276 {
277  //used for backpressure mechanisms and monitoring
278  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
279  struct stat buf;
280  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
281  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
282  close(bol_fd);
283  }
284 }
285 
286 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection)
287 {
289  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
290 
291  if ( currentLumiSection_ > 0 ) {
292  const std::string fuEoLS =
294  struct stat buf;
295  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
296  if ( !found ) {
298  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
299  close(eol_fd);
300  createBoLSFile(lumiSection,false);
302  }
303  }
304  else createBoLSFile(lumiSection,true);//needed for initial lumisection
305 
306  currentLumiSection_ = lumiSection;
307 
309 
310  timeval tv;
311  gettimeofday(&tv, 0);
312  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
313 
314  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
316  runAuxiliary()->run(),
317  lumiSection, lsopentime,
319 
320  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
321  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
322 
323  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
324  }
325 }
326 
328 {
330  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
331  {
332  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
333  }
334  return status;
335 }
336 
338 {
339 
341  if (!currentFile_)
342  {
343  if (!streamFileTrackerPtr_) {
347  }
348 
350  if (!fileQueue_.try_pop(currentFile_))
351  {
352  //sleep until wakeup (only in single-buffer mode) or timeout
353  std::unique_lock<std::mutex> lkw(mWakeup_);
354  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
356  }
357  status = currentFile_->status_;
358  if ( status == evf::EvFDaqDirector::runEnded)
359  {
360  delete currentFile_;
361  currentFile_=nullptr;
362  return status;
363  }
364  else if ( status == evf::EvFDaqDirector::runAbort)
365  {
366  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
367  }
368  else if (status == evf::EvFDaqDirector::newLumi)
369  {
370  if (getLSFromFilename_) {
373  eventsThisLumi_=0;
375  }
376  }
377  else {//let this be picked up from next event
379  }
380 
381  delete currentFile_;
382  currentFile_=nullptr;
383  return status;
384  }
385  else if (status == evf::EvFDaqDirector::newFile) {
388  }
389  else
390  assert(0);
391  }
392 
393  //file is empty
394  if (!currentFile_->fileSize_) {
396  //try to open new lumi
398  if (getLSFromFilename_)
401  eventsThisLumi_=0;
403  }
404  //immediately delete empty file
406  delete currentFile_;
407  currentFile_=nullptr;
409  }
410 
411  //file is finished
414  //release last chunk (it is never released elsewhere)
417  {
418  throw cms::Exception("FedRawDataInputSource::getNextEvent")
419  << "Fully processed " << currentFile_->nProcessed_
420  << " from the file " << currentFile_->fileName_
421  << " but according to BU JSON there should be "
422  << currentFile_->nEvents_ << " events";
423  }
424  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
425  if (singleBufferMode_) {
426  std::unique_lock<std::mutex> lkw(mWakeup_);
427  cvWakeup_.notify_one();
428  }
431  //put the file in pending delete list;
432  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
433  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
434  }
435  else {
436  //in single-thread and stream jobs, events are already processed
438  delete currentFile_;
439  }
440  currentFile_=nullptr;
442  }
443 
444 
445  //file is too short
447  {
448  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
449  "Premature end of input file while reading event header";
450  }
451  if (singleBufferMode_) {
452 
453  //should already be there
455  usleep(10000);
457  }
458 
459  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
460 
461  //conditions when read amount is not sufficient for the header to fit
462  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
464  {
466 
467  if (detectedFRDversion_==0) {
468  detectedFRDversion_=*((uint32*)dataPosition);
469  if (detectedFRDversion_>5)
470  throw cms::Exception("FedRawDataInputSource::getNextEvent")
471  << "Unknown FRD version -: " << detectedFRDversion_;
472  assert(detectedFRDversion_>=1);
473  }
474 
475  //recalculate chunk position
476  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
477  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
478  {
479  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
480  "Premature end of input file while reading event header";
481  }
482  }
483 
484  event_.reset( new FRDEventMsgView(dataPosition) );
485  if (event_->size()>eventChunkSize_) {
486  throw cms::Exception("FedRawDataInputSource::getNextEvent")
487  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
488  << " run:" << event_->run() << " of size:" << event_->size()
489  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
490  }
491 
492  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
493 
495  {
496  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
497  "Premature end of input file while reading event data";
498  }
499  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
501  //recalculate chunk position
502  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
503  event_.reset( new FRDEventMsgView(dataPosition) );
504  }
505  currentFile_->bufferPosition_ += event_->size();
506  currentFile_->chunkPosition_ += event_->size();
507  //last chunk is released when this function is invoked next time
508 
509  }
510  //multibuffer mode:
511  else
512  {
513  //wait for the current chunk to become added to the vector
515  usleep(10000);
517  }
518 
519  //check if header is at the boundary of two chunks
520  chunkIsFree_ = false;
521  unsigned char *dataPosition;
522 
523  //read header, copy it to a single chunk if necessary
524  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
525 
526  event_.reset( new FRDEventMsgView(dataPosition) );
527  if (event_->size()>eventChunkSize_) {
528  throw cms::Exception("FedRawDataInputSource::getNextEvent")
529  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
530  << " run:" << event_->run() << " of size:" << event_->size()
531  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
532  }
533 
534  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
535 
537  {
538  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
539  "Premature end of input file while reading event data";
540  }
541 
542  if (chunkEnd) {
543  //header was at the chunk boundary, we will have to move payload as well
544  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
545  chunkIsFree_ = true;
546  }
547  else {
548  //header was contiguous, but check if payload fits the chunk
549  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
550  //rewind to header start position
551  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
552  //copy event to a chunk start and move pointers
553  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
554  assert(chunkEnd);
555  chunkIsFree_=true;
556  //header is moved
557  event_.reset( new FRDEventMsgView(dataPosition) );
558  }
559  else {
560  //everything is in a single chunk, only move pointers forward
561  chunkEnd = currentFile_->advance(dataPosition,msgSize);
562  assert(!chunkEnd);
563  chunkIsFree_=false;
564  }
565  }
566  }//end multibuffer mode
567 
568  if (verifyChecksum_ && event_->version() >= 5)
569  {
570  uint32_t crc=0;
571  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
572  if ( crc != event_->crc32c() ) {
574  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
575  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
576  " but calculated 0x" << crc;
577  }
578  }
579  else if ( verifyAdler32_ && event_->version() >= 3)
580  {
581  uint32_t adler = adler32(0L,Z_NULL,0);
582  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
583 
584  if ( adler != event_->adler32() ) {
586  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
587  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
588  " but calculated 0x" << adler;
589  }
590  }
591 
592 
594 
596 }
597 
599 {
600  const boost::filesystem::path filePath(fileName);
601  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
602  try {
603  //sometimes this fails but file gets deleted
604  boost::filesystem::remove(filePath);
605  }
606  catch (const boost::filesystem::filesystem_error& ex)
607  {
608  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
609  << ". Trying again.";
610  usleep(100000);
611  try {
612  boost::filesystem::remove(filePath);
613  }
614  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
615  }
616  catch (std::exception& ex)
617  {
618  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
619  << ". Trying again.";
620  usleep(100000);
621  try {
622  boost::filesystem::remove(filePath);
623  } catch (std::exception&) {/*file gets deleted first time but exception is still thrown*/}
624  }
625 }
626 
627 
629 {
630  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
631  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
632 
633  if (useL1EventID_){
635  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
638  makeEvent(eventPrincipal, aux);
639  }
640  else if(tcds_pointer_==0){
643  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
646  makeEvent(eventPrincipal, aux);
647  }
648  else{
649  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
652  processGUID());
654  makeEvent(eventPrincipal, aux);
655  }
656 
657 
658 
659  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
660 
661  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
662  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
663  // daqProvenanceHelper_.dummyProvenance_);
664 
665  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
667 
668  eventsThisLumi_++;
669 
670  //this old file check runs no more often than every 10 events
671  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
672  //delete files that are not in processing
673  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
674  auto it = filesToDelete_.begin();
675  while (it!=filesToDelete_.end()) {
676  bool fileIsBeingProcessed = false;
677  for (unsigned int i=0;i<nStreams_;i++) {
678  if (it->first == streamFileTrackerPtr_->at(i)) {
679  fileIsBeingProcessed = true;
680  break;
681  }
682  }
683  if (!fileIsBeingProcessed) {
684  deleteFile(it->second->fileName_);
685  delete it->second;
686  it = filesToDelete_.erase(it);
687  }
688  else it++;
689  }
690 
691  }
693  chunkIsFree_=false;
694  return;
695 }
696 
698 {
699  edm::TimeValue_t time;
700  timeval stv;
701  gettimeofday(&stv,0);
702  time = stv.tv_sec;
703  time = (time << 32) + stv.tv_usec;
704  edm::Timestamp tstamp(time);
705 
706  uint32_t eventSize = event_->eventSize();
707  char* event = (char*)event_->payload();
708  GTPEventID_=0;
709  tcds_pointer_ = 0;
710  while (eventSize > 0) {
711  assert(eventSize>=sizeof(fedt_t));
712  eventSize -= sizeof(fedt_t);
713  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
714  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
715  assert(eventSize>=fedSize - sizeof(fedt_t));
716  eventSize -= (fedSize - sizeof(fedt_t));
717  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
718  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
719  if(fedId>FEDNumbering::MAXFEDID)
720  {
721  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
722  }
723  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
724  tcds_pointer_ = (unsigned char *)(event + eventSize );
725  }
726  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
727  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
728  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
729  else
730  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
731  //evf::evtn::evm_board_setformat(fedSize);
732  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
733  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
734  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
735  }
736  //take event ID from GTPE FED
737  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
738  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
739  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
740  }
741  }
742  FEDRawData& fedData = rawData.FEDData(fedId);
743  fedData.resize(fedSize);
744  memcpy(fedData.data(), event + eventSize, fedSize);
745  }
746  assert(eventSize == 0);
747 
748  return tstamp;
749 }
750 
752 {
754  try {
755  // assemble json destination path
757 
758  //TODO:should be ported to use fffnaming
759  std::ostringstream fileNameWithPID;
760  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
761  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
762  jsonDestPath /= fileNameWithPID.str();
763 
764  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
765  << jsonDestPath;
766  try {
767  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
768  }
769  catch (const boost::filesystem::filesystem_error& ex)
770  {
771  // Input dir gone?
772  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
773  // << " Maybe the file is not yet visible by FU. Trying again in one second";
774  sleep(1);
775  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
776  }
778 
779  try {
780  //sometimes this fails but file gets deleted
781  boost::filesystem::remove(jsonSourcePath);
782  }
783  catch (const boost::filesystem::filesystem_error& ex)
784  {
785  // Input dir gone?
786  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
787  }
788  catch (std::exception& ex)
789  {
790  // Input dir gone?
791  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
792  }
793 
794  boost::filesystem::ifstream ij(jsonDestPath);
795  Json::Value deserializeRoot;
797 
798  if (!reader.parse(ij, deserializeRoot))
799  throw std::runtime_error("Cannot deserialize input JSON file");
800 
801  //read BU JSON
803  DataPoint dp;
804  dp.deserialize(deserializeRoot);
805  bool success = false;
806  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
807  if (dpd_->getNames().at(i)=="NEvents")
808  if (i<dp.getData().size()) {
809  data = dp.getData()[i];
810  success=true;
811  }
812  }
813  if (!success) {
814  if (dp.getData().size())
815  data = dp.getData()[0];
816  else
817  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
818  " error reading number of events from BU JSON -: No input value " << data;
819  }
820  return boost::lexical_cast<int>(data);
821 
822  }
823  catch (const boost::filesystem::filesystem_error& ex)
824  {
825  // Input dir gone?
827  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
828  }
829  catch (std::runtime_error e)
830  {
831  // Another process grabbed the file and NFS did not register this
833  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
834  }
835 
836  catch( boost::bad_lexical_cast const& ) {
837  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
838  << "Input value is -: " << data;
839  }
840 
841  catch (std::exception e)
842  {
843  // BU run directory disappeared?
845  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
846  }
847 
848  return -1;
849 }
850 
852 {}
853 
854 void FedRawDataInputSource::postForkReacquireResources(std::shared_ptr<edm::multicore::MessageReceiverForSource>)
855 {
856  InputSource::rewind();
860 }
861 
863 {}
864 
865 
867 {
868  bool stop=false;
869  unsigned int currentLumiSection = 0;
870  //threadInit_.exchange(true,std::memory_order_acquire);
871 
872  {
873  std::unique_lock<std::mutex> lk(startupLock_);
874  startupCv_.notify_one();
875  }
876 
877  while (!stop) {
878 
879  //wait for at least one free thread and chunk
880  int counter=0;
882  {
883  std::unique_lock<std::mutex> lkw(mWakeup_);
884  //sleep until woken up by condition or a timeout
885  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
886  counter++;
887  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
888  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
889  }
890  else {
891  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
892  }
893  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
894  }
895 
896  if (stop) break;
897 
898  //look for a new file
899  std::string nextFile;
900  uint32_t ls=0;
901  uint32_t fileSize;
902 
903  uint32_t monLS=1;
904  uint32_t lockCount=0;
905  uint64_t sumLockWaitTimeUs=0.;
906 
908 
910 
911  while (status == evf::EvFDaqDirector::noFile) {
912  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
913  stop=true;
914  break;
915  }
916 
917  uint64_t thisLockWaitTimeUs=0.;
918  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
919  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
920 
921  //monitoring of lock wait time
922  if (thisLockWaitTimeUs>0.)
923  sumLockWaitTimeUs+=thisLockWaitTimeUs;
924  lockCount++;
925  if (ls>monLS) {
926  monLS=ls;
927  if (lockCount)
928  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
929  lockCount=0;
930  sumLockWaitTimeUs=0;
931  }
932 
933  //check again for any remaining index/EoLS files after EoR file is seen
934  if ( status == evf::EvFDaqDirector::runEnded) {
935  usleep(100000);
936  //now all files should have appeared in ramdisk, check again if any raw files were left behind
937  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
938  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
939  }
940 
941  if ( status == evf::EvFDaqDirector::runEnded) {
943  stop=true;
944  break;
945  }
946 
947  //queue new lumisection
948  if( getLSFromFilename_ && ls > currentLumiSection) {
949  currentLumiSection = ls;
950  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
951  }
952 
953  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
954  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
956  stop=true;
957  break;
958  }
959 
960  int dbgcount=0;
961  if (status == evf::EvFDaqDirector::noFile) {
962  dbgcount++;
963  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
964  usleep(100000);
965  }
966  }
967  if ( status == evf::EvFDaqDirector::newFile ) {
968  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
969 
970 
971  boost::filesystem::path rawFilePath(nextFile);
972  std::string rawFile = rawFilePath.replace_extension(".raw").string();
973 
974  struct stat st;
975  stat(rawFile.c_str(),&st);
976  fileSize=st.st_size;
977 
978  int eventsInNewFile = grabNextJsonFile(nextFile);
979  if (fms_) fms_->stoppedLookingForFile(ls);
980  assert( eventsInNewFile>=0 );
981  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
982 
983  if (!singleBufferMode_) {
984  //calculate number of needed chunks
985  unsigned int neededChunks = fileSize/eventChunkSize_;
986  if (fileSize%eventChunkSize_) neededChunks++;
987 
988  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
990  fileQueue_.push(newInputFile);
991 
992  for (unsigned int i=0;i<neededChunks;i++) {
993 
994  //get thread
995  unsigned int newTid = 0xffffffff;
996  while (!workerPool_.try_pop(newTid)) {
997  usleep(100000);
998  }
999 
1000  InputChunk * newChunk = nullptr;
1001  while (!freeChunks_.try_pop(newChunk)) {
1002  usleep(100000);
1003  if (quit_threads_.load(std::memory_order_relaxed)) break;
1004  }
1005 
1006  if (newChunk == nullptr) {
1007  //return unused tid if we received shutdown (nullptr chunk)
1008  if (newTid!=0xffffffff) workerPool_.push(newTid);
1009  stop = true;
1010  break;
1011  }
1012 
1013  std::unique_lock<std::mutex> lk(mReader_);
1014 
1015  unsigned int toRead = eventChunkSize_;
1016  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1017  newChunk->reset(i*eventChunkSize_,toRead,i);
1018 
1019  workerJob_[newTid].first=newInputFile;
1020  workerJob_[newTid].second=newChunk;
1021 
1022  //wake up the worker thread
1023  cvReader_[newTid]->notify_one();
1024  }
1025  }
1026  else {
1027  if (!eventsInNewFile) {
1028  //still queue file for lumi update
1029  std::unique_lock<std::mutex> lkw(mWakeup_);
1030  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1032  fileQueue_.push(newInputFile);
1033  cvWakeup_.notify_one();
1034  return;
1035  }
1036  //in single-buffer mode put single chunk in the file and let the main thread read the file
1037  InputChunk * newChunk;
1038  //should be available immediately
1039  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1040 
1041  std::unique_lock<std::mutex> lkw(mWakeup_);
1042 
1043  unsigned int toRead = eventChunkSize_;
1044  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1045  newChunk->reset(0,toRead,0);
1046  newChunk->readComplete_=true;
1047 
1048  //push file and wakeup main thread
1049  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1050  newInputFile->chunks_[0]=newChunk;
1052  fileQueue_.push(newInputFile);
1053  cvWakeup_.notify_one();
1054  }
1055  }
1056  }
1057  //make sure threads finish reading
1058  unsigned numFinishedThreads = 0;
1059  while (numFinishedThreads < workerThreads_.size()) {
1060  unsigned int tid;
1061  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1062  std::unique_lock<std::mutex> lk(mReader_);
1063  thread_quit_signal[tid]=true;
1064  cvReader_[tid]->notify_one();
1065  numFinishedThreads++;
1066  }
1067  for (unsigned int i=0;i<workerThreads_.size();i++) {
1068  workerThreads_[i]->join();
1069  delete workerThreads_[i];
1070  }
1071 }
1072 
1074 {
1075  bool init = true;
1076  threadInit_.exchange(true,std::memory_order_acquire);
1077 
1078  while (1) {
1079 
1080  std::unique_lock<std::mutex> lk(mReader_);
1081  workerJob_[tid].first=nullptr;
1082  workerJob_[tid].first=nullptr;
1083 
1084  assert(!thread_quit_signal[tid]);//should never get it here
1085  workerPool_.push(tid);
1086 
1087  if (init) {
1088  std::unique_lock<std::mutex> lk(startupLock_);
1089  init = false;
1090  startupCv_.notify_one();
1091  }
1092  cvReader_[tid]->wait(lk);
1093 
1094  if (thread_quit_signal[tid]) return;
1095 
1096  InputFile * file;
1097  InputChunk * chunk;
1098 
1099  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1100 
1101  file = workerJob_[tid].first;
1102  chunk = workerJob_[tid].second;
1103 
1104  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1105  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1106 
1107 
1108  if (fileDescriptor>=0)
1109  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1110  else
1111  {
1112  edm::LogError("FedRawDataInputSource") <<
1113  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1114  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1115  setExceptionState_=true;
1116  return;
1117 
1118  }
1119 
1120  unsigned int bufferLeft = 0;
1122  for (unsigned int i=0;i<readBlocks_;i++)
1123  {
1124  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1125  if ( last > 0 )
1126  bufferLeft+=last;
1127  if (last < eventChunkBlock_) {
1128  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1129  break;
1130  }
1131  }
1133  auto diff = end-start;
1134  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1135  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1136  close(fileDescriptor);
1137 
1138  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1140  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1141  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1142 
1143  }
1144 }
1145 
1147 {
1148  quit_threads_=true;
1149  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1150 
1151 }
1152 
1153 
1154 inline bool InputFile::advance(unsigned char* & dataPosition, const size_t size)
1155 {
1156  //wait for chunk
1157  while (!waitForChunk(currentChunk_)) {
1158  usleep(100000);
1160  }
1161 
1162  dataPosition = chunks_[currentChunk_]->buf_+ chunkPosition_;
1163  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1164 
1165  if (currentLeft < size) {
1166 
1167  //we need next chunk
1168  while (!waitForChunk(currentChunk_+1)) {
1169  usleep(100000);
1171  }
1172  //copy everything to beginning of the first chunk
1173  dataPosition-=chunkPosition_;
1174  assert(dataPosition==chunks_[currentChunk_]->buf_);
1175  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_+chunkPosition_, currentLeft);
1176  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_+1]->buf_, size - currentLeft);
1177  //set pointers at the end of the old data position
1179  chunkPosition_=size-currentLeft;
1180  currentChunk_++;
1181  return true;
1182  }
1183  else {
1186  return false;
1187  }
1188 }
1189 
1190 inline void InputFile::moveToPreviousChunk(const size_t size, const size_t offset)
1191 {
1192  //this will fail in case of events that are too large
1194  assert(size - offset < chunks_[currentChunk_]->size_);
1195  memcpy(chunks_[currentChunk_-1]->buf_+offset,chunks_[currentChunk_]->buf_+chunkPosition_,size);
1198 }
1199 
1200 inline void InputFile::rewindChunk(const size_t size) {
1203 }
1204 
1205 //single-buffer mode file reading
1207 {
1208 
1209  if (fileDescriptor_<0) {
1210  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1211  bufferInputRead_ = 0;
1212  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1213  if (fileDescriptor_>=0)
1214  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1215  else
1216  {
1217  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1218  << file->fileName_ << " fd:" << fileDescriptor_;
1219  }
1220  }
1221 
1222  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1223  uint32_t existingSize = 0;
1224  for (unsigned int i=0;i<readBlocks_;i++)
1225  {
1226  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1228  existingSize+=last;
1229  }
1230  }
1231  else {
1232  const uint32_t chunksize = file->chunkPosition_;
1233  const uint32_t blockcount=chunksize/eventChunkBlock_;
1234  const uint32_t leftsize = chunksize%eventChunkBlock_;
1235  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1236  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1237 
1238  for (uint32_t i=0;i<blockcount;i++) {
1239  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1241  existingSize+=last;
1242  }
1243  if (leftsize) {
1244  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1246  }
1247  file->chunkPosition_=0;//data was moved to beginning of the chunk
1248  }
1249  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1250  if (fileDescriptor_!=-1)
1251  {
1252  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1253  close(fileDescriptor_);
1254  fileDescriptor_=-1;
1255  }
1256  }
1257 }
1258 
1259 
1261 {
1262 
1263  std::lock_guard<std::mutex> lock(monlock_);
1264  auto itr = sourceEventsReport_.find(lumi);
1265  if (itr!=sourceEventsReport_.end())
1266  itr->second+=events;
1267  else
1268  sourceEventsReport_[lumi]=events;
1269 }
1270 
1271 std::pair<bool,unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase)
1272 {
1273  std::lock_guard<std::mutex> lock(monlock_);
1274  auto itr = sourceEventsReport_.find(lumi);
1275  if (itr!=sourceEventsReport_.end()) {
1276  auto && ret = std::pair<bool,unsigned int>(true,itr->second);
1277  if (erase)
1278  sourceEventsReport_.erase(itr);
1279  return ret;
1280  }
1281  else
1282  return std::pair<bool,unsigned int>(false,0);
1283 }
#define LogDebug(id)
static const char runNumber_[]
std::vector< std::string > & getData()
Definition: DataPoint.h:58
unsigned int lumi_
int i
Definition: DBlmapReader.cc:9
tuple ret
prodAgent to be discontinued
unsigned int getgpshigh(const unsigned char *)
uint32_t chunkPosition_
std::condition_variable cvWakeup_
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void read(edm::EventPrincipal &eventPrincipal) override
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:350
bool gtpe_board_sense(const unsigned char *p)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
virtual void rewind_() override
std::vector< int > * getStreamFileTracker()
bool crc32c_hw_test()
Definition: crc32c.cc:354
unsigned int offset_
jsoncollector::DataPointDefinition * dpd_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
tbb::concurrent_queue< unsigned int > workerPool_
JetCorrectorParameters::Record record
Definition: classes.h:7
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
tuple lumi
Definition: fjr2json.py:35
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
int init
Definition: HydjetWrapper.h:67
def ls
Definition: eostools.py:348
evf::EvFDaqDirector::FileStatus status_
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:590
std::atomic< bool > quit_threads_
volatile std::atomic< bool > shutdown_flag
std::vector< ReaderInfo > workerJob_
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
std::unique_ptr< std::thread > readSupervisorThread_
ProductProvenance const & dummyProvenance() const
Represents a JSON value.
Definition: value.h:111
std::string getEoLSFilePathOnBU(const unsigned int ls) const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
#define nullptr
std::vector< std::condition_variable * > cvReader_
FedRawDataInputSource * parent_
unsigned int sourceid
Definition: fed_header.h:32
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:217
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
U second(std::pair< T, U > const &p)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
static Timestamp beginOfTime()
Definition: Timestamp.h:103
std::vector< bool > thread_quit_signal
void setComment(std::string const &value)
uint32_t bufferPosition_
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void updateFileIndex(int const &fileIndex)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
Definition: FEDRawData.cc:32
const edm::RunNumber_t runNumber_
bool advance(unsigned char *&dataPosition, const size_t size)
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:357
void moveToPreviousChunk(const size_t size, const size_t offset)
unsigned char * buf_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
int grabNextJsonFile(boost::filesystem::path const &)
def move
Definition: eostools.py:510
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
#define end
Definition: vmac.h:37
virtual void preForkReleaseResources() override
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
std::map< unsigned int, unsigned int > sourceEventsReport_
bool waitForChunk(unsigned int chunkid)
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
Definition: Timestamp.h:28
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:382
const std::string fuOutputDir_
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
void setInputSource(FedRawDataInputSource *inputSource)
std::atomic< bool > readComplete_
virtual bool checkNextEvent() override
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:596
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned long long uint64_t
Definition: Time.h:15
unsigned int currentChunk_
auto dp
Definition: deltaR.h:22
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
void add(std::string const &label, ParameterSetDescription const &psetDescription)
void setProcessHistoryID(ProcessHistoryID const &phid)
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:262
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:365
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:351
unsigned int eventsize
Definition: fed_trailer.h:33
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
std::string getBoLSFilePathOnFU(const unsigned int ls) const
void load(int perCUT=90)
Definition: getMaxPt.h:59
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:353
tuple events
Definition: patZpeak.py:19
void readWorker(unsigned int tid)
static std::atomic< unsigned int > counter
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
bool emptyLumisectionMode() const
virtual void postForkReacquireResources(std::shared_ptr< edm::multicore::MessageReceiverForSource >) override
friend class InputFile
Open Root file and provide MEs ############.
size_(0)
Definition: OwnArray.h:181
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
volatile std::atomic< bool > shutdown_flag false
tbb::concurrent_queue< InputChunk * > freeChunks_
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: ntuplemaker.py:245
evf::EvFDaqDirector::FileStatus nextEvent()
void readNextChunkIntoBuffer(InputFile *file)
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:265
unsigned int nEvents_
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
tuple size
Write out results.
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
std::string getEoRFilePathOnFU() const