CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <memory>
5 #include <sstream>
6 #include <sys/types.h>
7 #include <sys/file.h>
8 #include <sys/time.h>
9 #include <unistd.h>
10 #include <vector>
11 #include <fstream>
12 #include <zlib.h>
13 #include <stdio.h>
14 #include <chrono>
15 
16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
18 
19 
22 
29 
33 
35 
39 
41 
46 
47 //JSON file reader
49 
50 #include <boost/lexical_cast.hpp>
51 
52 using namespace jsoncollector;
53 
55  edm::InputSourceDescription const& desc) :
56  edm::RawInputSource(pset, desc),
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",16)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",eventChunkSize_/1048576)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",1)),
61  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
62  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
63  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
64  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
65  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
66  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
67  fuOutputDir_(edm::Service<evf::EvFDaqDirector>()->baseRunDir()),
68  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
69  eventID_(),
70  processHistoryID_(),
71  currentLumiSection_(0),
72  tcds_pointer_(0),
73  eventsThisLumi_(0),
74  dpd_(nullptr)
75 {
76  char thishost[256];
77  gethostname(thishost, 255);
78  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
79  << std::endl << (eventChunkSize_/1048576)
80  << " MB on host " << thishost;
81 
83  setNewRun();
86 
87  dpd_ = new DataPointDefinition();
88  std::string defLabel = "data";
89  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
90 
91  //make sure that chunk size is N * block size
96 
97  if (!numBuffers_)
98  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
99  "no reading enabled with numBuffers parameter 0";
100 
104 
105  if (!crc32c_hw_test())
106  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
107 
108  //het handles to DaqDirector and FastMonitoringService because it isn't acessible in readSupervisor thread
109 
110  try {
112  } catch (...){
113  edm::LogWarning("FedRawDataInputSource") << "FastMonitoringService not found";
114  assert(0);//test
115  }
116 
117  try {
119  //set DaqDirector to delete files in preGlobalEndLumi callback
121  if (fms_) daqDirector_->setFMS(fms_);
122  } catch (...){
123  edm::LogWarning("FedRawDataInputSource") << "EvFDaqDirector not found";
124  assert(0);//test
125  }
126 
127  //should delete chunks when run stops
128  for (unsigned int i=0;i<numBuffers_;i++) {
130  }
131 
132  quit_threads_ = false;
133 
134  for (unsigned int i=0;i<numConcurrentReads_;i++)
135  {
136  std::unique_lock<std::mutex> lk(startupLock_);
137  //issue a memory fence here and in threads (constructor was segfaulting without this)
138  thread_quit_signal.push_back(false);
139  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
140  cvReader_.push_back(new std::condition_variable);
141  threadInit_.store(false,std::memory_order_release);
142  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
143  startupCv_.wait(lk);
144  }
145 
146  runAuxiliary()->setProcessHistoryID(processHistoryID_);
147 }
148 
150 {
151  quit_threads_=true;
152 
153  //delete any remaining open files
154  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
155  deleteFile(it->second->fileName_);
156  delete it->second;
157  }
159  readSupervisorThread_->join();
160  }
161  else {
162  //join aux threads in case the supervisor thread was not started
163  for (unsigned int i=0;i<workerThreads_.size();i++) {
164  std::unique_lock<std::mutex> lk(mReader_);
165  thread_quit_signal[i]=true;
166  cvReader_[i]->notify_one();
167  lk.unlock();
168  workerThreads_[i]->join();
169  delete workerThreads_[i];
170  }
171  }
172  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
173  /*
174  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
175  InputChunk *ch;
176  while (!freeChunks_.try_pop(ch)) {}
177  delete ch;
178  }
179  */
180 }
181 
183 {
185  {
186  //this thread opens new files and dispatches reading to worker readers
187  //threadInit_.store(false,std::memory_order_release);
188  std::unique_lock<std::mutex> lk(startupLock_);
189  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
191  startupCv_.wait(lk);
192  }
193  //signal hltd to start event accounting
196 
197  switch (nextEvent() ) {
199 
200  //maybe create EoL file in working directory before ending run
201  struct stat buf;
202  if ( currentLumiSection_ > 0 ) {
203  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
204  if (eolFound) {
206  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
207  if ( !found ) {
209  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
210  close(eol_fd);
212  }
213  }
214  }
215  //also create EoR file in FU data directory
216  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
217  if (!eorFound) {
218  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
219  close(eor_fd);
220  }
222  eventsThisLumi_=0;
224  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
225  return false;
226  }
228  //this is not reachable
229  return true;
230  }
232  //std::cout << "--------------NEW LUMI---------------" << std::endl;
233  return true;
234  }
235  default: {
236  if (!getLSFromFilename_) {
237  //get new lumi from file header
238  if (event_->lumi() > currentLumiSection_) {
240  eventsThisLumi_=0;
241  maybeOpenNewLumiSection( event_->lumi() );
242  }
243  }
244  eventRunNumber_=event_->run();
245  L1EventID_ = event_->event();
246 
247  setEventCached();
248 
249  return true;
250  }
251  }
252 }
253 
254 void FedRawDataInputSource::createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
255 {
256  //used for backpressure mechanisms and monitoring
257  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
258  struct stat buf;
259  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
260  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
261  close(bol_fd);
262  }
263 }
264 
265 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection)
266 {
268  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
269 
270  if ( currentLumiSection_ > 0 ) {
271  const std::string fuEoLS =
273  struct stat buf;
274  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
275  if ( !found ) {
277  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
278  close(eol_fd);
279  createBoLSFile(lumiSection,false);
281  }
282  }
283  else createBoLSFile(lumiSection,true);//needed for initial lumisection
284 
285  currentLumiSection_ = lumiSection;
286 
288 
289  timeval tv;
290  gettimeofday(&tv, 0);
291  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
292 
293  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
295  runAuxiliary()->run(),
296  lumiSection, lsopentime,
298 
299  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
300  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
301 
302  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
303  }
304 }
305 
307 {
309  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
310  {
311  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
312  }
313  return status;
314 }
315 
317 {
318 
320  if (!currentFile_)
321  {
322  if (!streamFileTrackerPtr_) {
326  }
327 
329  if (!fileQueue_.try_pop(currentFile_))
330  {
331  //sleep until wakeup (only in single-buffer mode) or timeout
332  std::unique_lock<std::mutex> lkw(mWakeup_);
333  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
335  }
336  status = currentFile_->status_;
337  if ( status == evf::EvFDaqDirector::runEnded)
338  {
339  delete currentFile_;
340  currentFile_=nullptr;
341  return status;
342  }
343  else if ( status == evf::EvFDaqDirector::runAbort)
344  {
345  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
346  }
347  else if (status == evf::EvFDaqDirector::newLumi)
348  {
349  if (getLSFromFilename_) {
352  eventsThisLumi_=0;
354  }
355  }
356  else {//let this be picked up from next event
358  }
359 
360  delete currentFile_;
361  currentFile_=nullptr;
362  return status;
363  }
364  else if (status == evf::EvFDaqDirector::newFile) {
367  }
368  else
369  assert(0);
370  }
371 
372  //file is empty
373  if (!currentFile_->fileSize_) {
375  //try to open new lumi
377  if (getLSFromFilename_)
380  eventsThisLumi_=0;
382  }
383  //immediately delete empty file
385  delete currentFile_;
386  currentFile_=nullptr;
388  }
389 
390  //file is finished
393  //release last chunk (it is never released elsewhere)
396  {
397  throw cms::Exception("FedRawDataInputSource::getNextEvent")
398  << "Fully processed " << currentFile_->nProcessed_
399  << " from the file " << currentFile_->fileName_
400  << " but according to BU JSON there should be "
401  << currentFile_->nEvents_ << " events";
402  }
403  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
404  if (singleBufferMode_) {
405  std::unique_lock<std::mutex> lkw(mWakeup_);
406  cvWakeup_.notify_one();
407  }
410  //put the file in pending delete list;
411  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
412  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
413  }
414  else {
415  //in single-thread and stream jobs, events are already processed
417  delete currentFile_;
418  }
419  currentFile_=nullptr;
421  }
422 
423 
424  //file is too short
426  {
427  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
428  "Premature end of input file while reading event header";
429  }
430  if (singleBufferMode_) {
431 
432  //should already be there
434  usleep(10000);
436  }
437 
438  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
439 
440  //conditions when read amount is not sufficient for the header to fit
441  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
443  {
445 
446  if (detectedFRDversion_==0) {
447  detectedFRDversion_=*((uint32*)dataPosition);
448  if (detectedFRDversion_>5)
449  throw cms::Exception("FedRawDataInputSource::getNextEvent")
450  << "Unknown FRD version -: " << detectedFRDversion_;
451  assert(detectedFRDversion_>=1);
452  }
453 
454  //recalculate chunk position
455  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
456  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
457  {
458  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
459  "Premature end of input file while reading event header";
460  }
461  }
462 
463  event_.reset( new FRDEventMsgView(dataPosition) );
464  if (event_->size()>eventChunkSize_) {
465  throw cms::Exception("FedRawDataInputSource::getNextEvent")
466  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
467  << " run:" << event_->run() << " of size:" << event_->size()
468  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
469  }
470 
471  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
472 
474  {
475  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
476  "Premature end of input file while reading event data";
477  }
478  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
480  //recalculate chunk position
481  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
482  event_.reset( new FRDEventMsgView(dataPosition) );
483  }
484  currentFile_->bufferPosition_ += event_->size();
485  currentFile_->chunkPosition_ += event_->size();
486  //last chunk is released when this function is invoked next time
487 
488  }
489  //multibuffer mode:
490  else
491  {
492  //wait for the current chunk to become added to the vector
494  usleep(10000);
496  }
497 
498  //check if header is at the boundary of two chunks
499  chunkIsFree_ = false;
500  unsigned char *dataPosition;
501 
502  //read header, copy it to a single chunk if necessary
503  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
504 
505  event_.reset( new FRDEventMsgView(dataPosition) );
506  if (event_->size()>eventChunkSize_) {
507  throw cms::Exception("FedRawDataInputSource::getNextEvent")
508  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
509  << " run:" << event_->run() << " of size:" << event_->size()
510  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
511  }
512 
513  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
514 
516  {
517  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
518  "Premature end of input file while reading event data";
519  }
520 
521  if (chunkEnd) {
522  //header was at the chunk boundary, we will have to move payload as well
523  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
524  chunkIsFree_ = true;
525  }
526  else {
527  //header was contiguous, but check if payload fits the chunk
528  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
529  //rewind to header start position
530  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
531  //copy event to a chunk start and move pointers
532  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
533  assert(chunkEnd);
534  chunkIsFree_=true;
535  //header is moved
536  event_.reset( new FRDEventMsgView(dataPosition) );
537  }
538  else {
539  //everything is in a single chunk, only move pointers forward
540  chunkEnd = currentFile_->advance(dataPosition,msgSize);
541  assert(!chunkEnd);
542  chunkIsFree_=false;
543  }
544  }
545  }//end multibuffer mode
546 
547  if (verifyChecksum_ && event_->version() >= 5)
548  {
549  uint32_t crc=0;
550  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
551  if ( crc != event_->crc32c() ) {
553  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
554  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
555  " but calculated 0x" << crc;
556  }
557  }
558  else if ( verifyAdler32_ && event_->version() >= 3)
559  {
560  uint32_t adler = adler32(0L,Z_NULL,0);
561  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
562 
563  if ( adler != event_->adler32() ) {
565  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
566  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
567  " but calculated 0x" << adler;
568  }
569  }
570 
571 
573 
575 }
576 
578 {
579  const boost::filesystem::path filePath(fileName);
580  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
581  try {
582  //sometimes this fails but file gets deleted
583  boost::filesystem::remove(filePath);
584  }
585  catch (const boost::filesystem::filesystem_error& ex)
586  {
587  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
588  << ". Trying again.";
589  usleep(100000);
590  try {
591  boost::filesystem::remove(filePath);
592  }
593  catch (...) {/*file gets deleted first time but exception is still thrown*/}
594  }
595  catch (std::exception& ex)
596  {
597  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
598  << ". Trying again.";
599  usleep(100000);
600  try {
601  boost::filesystem::remove(filePath);
602  } catch (...) {/*file gets deleted first time but exception is still thrown*/}
603  }
604 }
605 
606 
608 {
609  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
610  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
611 
612  if (useL1EventID_){
614  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
617  makeEvent(eventPrincipal, aux);
618  }
619  else if(tcds_pointer_==0){
622  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
625  makeEvent(eventPrincipal, aux);
626  }
627  else{
628  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
631  processGUID());
633  makeEvent(eventPrincipal, aux);
634  }
635 
636 
637 
638  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
639 
640  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
641  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
642  // daqProvenanceHelper_.dummyProvenance_);
643 
644  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
646 
647  eventsThisLumi_++;
648 
649  //this old file check runs no more often than every 10 events
650  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
651  //delete files that are not in processing
652  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
653  auto it = filesToDelete_.begin();
654  while (it!=filesToDelete_.end()) {
655  bool fileIsBeingProcessed = false;
656  for (unsigned int i=0;i<nStreams_;i++) {
657  if (it->first == streamFileTrackerPtr_->at(i)) {
658  fileIsBeingProcessed = true;
659  break;
660  }
661  }
662  if (!fileIsBeingProcessed) {
663  deleteFile(it->second->fileName_);
664  delete it->second;
665  it = filesToDelete_.erase(it);
666  }
667  else it++;
668  }
669 
670  }
672  chunkIsFree_=false;
673  return;
674 }
675 
677 {
679  timeval stv;
680  gettimeofday(&stv,0);
681  time = stv.tv_sec;
682  time = (time << 32) + stv.tv_usec;
683  edm::Timestamp tstamp(time);
684 
685  uint32_t eventSize = event_->eventSize();
686  char* event = (char*)event_->payload();
687  GTPEventID_=0;
688  tcds_pointer_ = 0;
689  while (eventSize > 0) {
690  assert(eventSize>=sizeof(fedt_t));
691  eventSize -= sizeof(fedt_t);
692  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
693  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
694  assert(eventSize>=fedSize - sizeof(fedt_t));
695  eventSize -= (fedSize - sizeof(fedt_t));
696  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
697  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
698  if(fedId>FEDNumbering::MAXFEDID)
699  {
700  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
701  }
702  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
703  tcds_pointer_ = (unsigned char *)(event + eventSize );
704  }
705  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
706  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
707  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
708  else
709  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
710  //evf::evtn::evm_board_setformat(fedSize);
711  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
712  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
713  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
714  }
715  //take event ID from GTPE FED
716  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
717  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
718  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
719  }
720  }
721  FEDRawData& fedData = rawData.FEDData(fedId);
722  fedData.resize(fedSize);
723  memcpy(fedData.data(), event + eventSize, fedSize);
724  }
725  assert(eventSize == 0);
726 
727  return tstamp;
728 }
729 
731 {
733  try {
734  // assemble json destination path
736 
737  //TODO:should be ported to use fffnaming
738  std::ostringstream fileNameWithPID;
739  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
740  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
741  jsonDestPath /= fileNameWithPID.str();
742 
743  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
744  << jsonDestPath;
745  try {
746  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
747  }
748  catch (const boost::filesystem::filesystem_error& ex)
749  {
750  // Input dir gone?
751  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
752  // << " Maybe the file is not yet visible by FU. Trying again in one second";
753  sleep(1);
754  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
755  }
757 
758  try {
759  //sometimes this fails but file gets deleted
760  boost::filesystem::remove(jsonSourcePath);
761  }
762  catch (const boost::filesystem::filesystem_error& ex)
763  {
764  // Input dir gone?
765  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
766  }
767  catch (std::exception& ex)
768  {
769  // Input dir gone?
770  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
771  }
772 
773  boost::filesystem::ifstream ij(jsonDestPath);
774  Json::Value deserializeRoot;
776 
777  if (!reader.parse(ij, deserializeRoot))
778  throw std::runtime_error("Cannot deserialize input JSON file");
779 
780  //read BU JSON
782  DataPoint dp;
783  dp.deserialize(deserializeRoot);
784  bool success = false;
785  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
786  if (dpd_->getNames().at(i)=="NEvents")
787  if (i<dp.getData().size()) {
788  data = dp.getData()[i];
789  success=true;
790  }
791  }
792  if (!success) {
793  if (dp.getData().size())
794  data = dp.getData()[0];
795  else
796  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
797  " error reading number of events from BU JSON -: No input value " << data;
798  }
799  return boost::lexical_cast<int>(data);
800 
801  }
802  catch (const boost::filesystem::filesystem_error& ex)
803  {
804  // Input dir gone?
806  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
807  }
808  catch (std::runtime_error e)
809  {
810  // Another process grabbed the file and NFS did not register this
812  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
813  }
814 
815  catch( boost::bad_lexical_cast const& ) {
816  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
817  << "Input value is -: " << data;
818  }
819 
820  catch (std::exception e)
821  {
822  // BU run directory disappeared?
824  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
825  }
826 
827  return -1;
828 }
829 
831 {}
832 
833 void FedRawDataInputSource::postForkReacquireResources(std::shared_ptr<edm::multicore::MessageReceiverForSource>)
834 {
835  InputSource::rewind();
839 }
840 
842 {}
843 
844 
846 {
847  bool stop=false;
848  unsigned int currentLumiSection = 0;
849  //threadInit_.exchange(true,std::memory_order_acquire);
850 
851  {
852  std::unique_lock<std::mutex> lk(startupLock_);
853  startupCv_.notify_one();
854  }
855 
856  while (!stop) {
857 
858  //wait for at least one free thread and chunk
859  int counter=0;
861  {
862  std::unique_lock<std::mutex> lkw(mWakeup_);
863  //sleep until woken up by condition or a timeout
864  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
865  counter++;
866  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
867  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
868  }
869  else {
870  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
871  }
872  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
873  }
874 
875  if (stop) break;
876 
877  //look for a new file
878  std::string nextFile;
879  uint32_t ls;
880  uint32_t fileSize;
881 
882  uint32_t monLS=1;
883  uint32_t lockCount=0;
884  uint64_t sumLockWaitTimeUs=0.;
885 
887 
889 
890  while (status == evf::EvFDaqDirector::noFile) {
891  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
892  stop=true;
893  break;
894  }
895 
896  uint64_t thisLockWaitTimeUs=0.;
897  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
898 
899  //monitoring of lock wait time
900  if (thisLockWaitTimeUs>0.)
901  sumLockWaitTimeUs+=thisLockWaitTimeUs;
902  lockCount++;
903  if (ls>monLS) {
904  monLS=ls;
905  if (lockCount)
906  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
907  lockCount=0;
908  sumLockWaitTimeUs=0;
909  }
910 
911  //check again for any remaining index/EoLS files after EoR file is seen
912  if ( status == evf::EvFDaqDirector::runEnded) {
913  usleep(100000);
914  //now all files should have appeared in ramdisk, check again if any raw files were left behind
915  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
916  }
917 
918  if ( status == evf::EvFDaqDirector::runEnded) {
920  stop=true;
921  break;
922  }
923 
924  //queue new lumisection
925  if( getLSFromFilename_ && ls > currentLumiSection) {
926  currentLumiSection = ls;
927  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
928  }
929 
930  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
931  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
933  stop=true;
934  break;
935  }
936 
937  int dbgcount=0;
938  if (status == evf::EvFDaqDirector::noFile) {
939  dbgcount++;
940  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
941  usleep(100000);
942  }
943  }
944  if ( status == evf::EvFDaqDirector::newFile ) {
945  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
946 
947 
948  boost::filesystem::path rawFilePath(nextFile);
949  std::string rawFile = rawFilePath.replace_extension(".raw").string();
950 
951  struct stat st;
952  stat(rawFile.c_str(),&st);
953  fileSize=st.st_size;
954 
955  int eventsInNewFile = grabNextJsonFile(nextFile);
956  if (fms_) fms_->stoppedLookingForFile(ls);
957  assert( eventsInNewFile>=0 );
958  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
959 
960  if (!singleBufferMode_) {
961  //calculate number of needed chunks
962  unsigned int neededChunks = fileSize/eventChunkSize_;
963  if (fileSize%eventChunkSize_) neededChunks++;
964 
965  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
967  fileQueue_.push(newInputFile);
968 
969  for (unsigned int i=0;i<neededChunks;i++) {
970 
971  //get thread
972  unsigned int newTid = 0xffffffff;
973  while (!workerPool_.try_pop(newTid)) {
974  usleep(100000);
975  }
976 
977  InputChunk * newChunk = nullptr;
978  while (!freeChunks_.try_pop(newChunk)) {
979  usleep(100000);
980  if (quit_threads_.load(std::memory_order_relaxed)) break;
981  }
982 
983  if (newChunk == nullptr) {
984  //return unused tid if we received shutdown (nullptr chunk)
985  if (newTid!=0xffffffff) workerPool_.push(newTid);
986  stop = true;
987  break;
988  }
989 
990  std::unique_lock<std::mutex> lk(mReader_);
991 
992  unsigned int toRead = eventChunkSize_;
993  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
994  newChunk->reset(i*eventChunkSize_,toRead,i);
995 
996  workerJob_[newTid].first=newInputFile;
997  workerJob_[newTid].second=newChunk;
998 
999  //wake up the worker thread
1000  cvReader_[newTid]->notify_one();
1001  }
1002  }
1003  else {
1004  if (!eventsInNewFile) {
1005  //still queue file for lumi update
1006  std::unique_lock<std::mutex> lkw(mWakeup_);
1007  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1009  fileQueue_.push(newInputFile);
1010  cvWakeup_.notify_one();
1011  return;
1012  }
1013  //in single-buffer mode put single chunk in the file and let the main thread read the file
1014  InputChunk * newChunk;
1015  //should be available immediately
1016  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1017 
1018  std::unique_lock<std::mutex> lkw(mWakeup_);
1019 
1020  unsigned int toRead = eventChunkSize_;
1021  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1022  newChunk->reset(0,toRead,0);
1023  newChunk->readComplete_=true;
1024 
1025  //push file and wakeup main thread
1026  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1027  newInputFile->chunks_[0]=newChunk;
1029  fileQueue_.push(newInputFile);
1030  cvWakeup_.notify_one();
1031  }
1032  }
1033  }
1034  //make sure threads finish reading
1035  unsigned numFinishedThreads = 0;
1036  while (numFinishedThreads < workerThreads_.size()) {
1037  unsigned int tid;
1038  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1039  std::unique_lock<std::mutex> lk(mReader_);
1040  thread_quit_signal[tid]=true;
1041  cvReader_[tid]->notify_one();
1042  numFinishedThreads++;
1043  }
1044  for (unsigned int i=0;i<workerThreads_.size();i++) {
1045  workerThreads_[i]->join();
1046  delete workerThreads_[i];
1047  }
1048 }
1049 
1051 {
1052  bool init = true;
1053  threadInit_.exchange(true,std::memory_order_acquire);
1054 
1055  while (1) {
1056 
1057  std::unique_lock<std::mutex> lk(mReader_);
1058  workerJob_[tid].first=nullptr;
1059  workerJob_[tid].first=nullptr;
1060 
1061  assert(!thread_quit_signal[tid]);//should never get it here
1062  workerPool_.push(tid);
1063 
1064  if (init) {
1065  std::unique_lock<std::mutex> lk(startupLock_);
1066  init = false;
1067  startupCv_.notify_one();
1068  }
1069  cvReader_[tid]->wait(lk);
1070 
1071  if (thread_quit_signal[tid]) return;
1072 
1073  InputFile * file;
1074  InputChunk * chunk;
1075 
1076  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1077 
1078  file = workerJob_[tid].first;
1079  chunk = workerJob_[tid].second;
1080 
1081  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1082  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1083 
1084 
1085  if (fileDescriptor>=0)
1086  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1087  else
1088  {
1089  edm::LogError("FedRawDataInputSource") <<
1090  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1091  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1092  setExceptionState_=true;
1093  return;
1094 
1095  }
1096 
1097  unsigned int bufferLeft = 0;
1099  for (unsigned int i=0;i<readBlocks_;i++)
1100  {
1101  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1102  if ( last > 0 )
1103  bufferLeft+=last;
1104  if (last < eventChunkBlock_) {
1105  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1106  break;
1107  }
1108  }
1110  auto diff = end-start;
1111  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1112  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1113  close(fileDescriptor);
1114 
1115  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1117  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1118  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1119 
1120  }
1121 }
1122 
1124 {
1125  quit_threads_=true;
1126  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1127 
1128 }
1129 
1130 
1131 inline bool InputFile::advance(unsigned char* & dataPosition, const size_t size)
1132 {
1133  //wait for chunk
1134  while (!waitForChunk(currentChunk_)) {
1135  usleep(100000);
1137  }
1138 
1139  dataPosition = chunks_[currentChunk_]->buf_+ chunkPosition_;
1140  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1141 
1142  if (currentLeft < size) {
1143 
1144  //we need next chunk
1145  while (!waitForChunk(currentChunk_+1)) {
1146  usleep(100000);
1148  }
1149  //copy everything to beginning of the first chunk
1150  dataPosition-=chunkPosition_;
1151  assert(dataPosition==chunks_[currentChunk_]->buf_);
1152  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_+chunkPosition_, currentLeft);
1153  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_+1]->buf_, size - currentLeft);
1154  //set pointers at the end of the old data position
1156  chunkPosition_=size-currentLeft;
1157  currentChunk_++;
1158  return true;
1159  }
1160  else {
1163  return false;
1164  }
1165 }
1166 
1167 inline void InputFile::moveToPreviousChunk(const size_t size, const size_t offset)
1168 {
1169  //this will fail in case of events that are too large
1171  assert(size - offset < chunks_[currentChunk_]->size_);
1172  memcpy(chunks_[currentChunk_-1]->buf_+offset,chunks_[currentChunk_]->buf_+chunkPosition_,size);
1175 }
1176 
1177 inline void InputFile::rewindChunk(const size_t size) {
1180 }
1181 
1182 //single-buffer mode file reading
1184 {
1185 
1186  if (fileDescriptor_<0) {
1187  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1188  bufferInputRead_ = 0;
1189  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1190  if (fileDescriptor_>=0)
1191  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1192  else
1193  {
1194  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1195  << file->fileName_ << " fd:" << fileDescriptor_;
1196  }
1197  }
1198 
1199  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1200  uint32_t existingSize = 0;
1201  for (unsigned int i=0;i<readBlocks_;i++)
1202  {
1203  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1205  existingSize+=last;
1206  }
1207  }
1208  else {
1209  const uint32_t chunksize = file->chunkPosition_;
1210  const uint32_t blockcount=chunksize/eventChunkBlock_;
1211  const uint32_t leftsize = chunksize%eventChunkBlock_;
1212  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1213  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1214 
1215  for (uint32_t i=0;i<blockcount;i++) {
1216  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1218  existingSize+=last;
1219  }
1220  if (leftsize) {
1221  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1223  existingSize+=last;
1224  }
1225  file->chunkPosition_=0;//data was moved to beginning of the chunk
1226  }
1227  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1228  if (fileDescriptor_!=-1)
1229  {
1230  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1231  close(fileDescriptor_);
1232  fileDescriptor_=-1;
1233  }
1234  }
1235 }
1236 
1237 // define this class as an input source
#define LogDebug(id)
static const char runNumber_[]
std::vector< std::string > & getData()
Definition: DataPoint.h:58
unsigned int lumi_
int i
Definition: DBlmapReader.cc:9
unsigned int getgpshigh(const unsigned char *)
uint32_t chunkPosition_
std::condition_variable cvWakeup_
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void read(edm::EventPrincipal &eventPrincipal) override
bool gtpe_board_sense(const unsigned char *p)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
virtual void rewind_() override
std::vector< int > * getStreamFileTracker()
bool crc32c_hw_test()
Definition: crc32c.cc:354
unsigned int offset_
jsoncollector::DataPointDefinition * dpd_
tbb::concurrent_queue< unsigned int > workerPool_
JetCorrectorParameters::Record record
Definition: classes.h:7
unsigned int get(const unsigned char *, bool)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
int init
Definition: HydjetWrapper.h:62
def ls
Definition: eostools.py:346
evf::EvFDaqDirector::FileStatus status_
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:600
std::atomic< bool > quit_threads_
volatile std::atomic< bool > shutdown_flag
std::vector< ReaderInfo > workerJob_
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
std::unique_ptr< std::thread > readSupervisorThread_
ProductProvenance const & dummyProvenance() const
Represents a JSON value.
Definition: value.h:111
std::string getEoLSFilePathOnBU(const unsigned int ls) const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
#define nullptr
std::vector< std::condition_variable * > cvReader_
FedRawDataInputSource * parent_
unsigned int sourceid
Definition: fed_header.h:32
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:218
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
U second(std::pair< T, U > const &p)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
static Timestamp beginOfTime()
Definition: Timestamp.h:103
std::vector< bool > thread_quit_signal
uint32_t bufferPosition_
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
tuple path
else: Piece not in the list, fine.
void updateFileIndex(int const &fileIndex)
#define DEFINE_FWK_INPUT_SOURCE(type)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
Definition: FEDRawData.cc:32
const edm::RunNumber_t runNumber_
bool advance(unsigned char *&dataPosition, const size_t size)
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:358
void moveToPreviousChunk(const size_t size, const size_t offset)
unsigned char * buf_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
int grabNextJsonFile(boost::filesystem::path const &)
def move
Definition: eostools.py:508
def load
Definition: svgfig.py:546
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
#define end
Definition: vmac.h:37
virtual void preForkReleaseResources() override
bool waitForChunk(unsigned int chunkid)
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
Definition: Timestamp.h:28
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:385
const std::string fuOutputDir_
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::atomic< bool > readComplete_
virtual bool checkNextEvent() override
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:606
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned long long uint64_t
Definition: Time.h:15
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance)
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:351
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
void setProcessHistoryID(ProcessHistoryID const &phid)
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:263
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:366
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Non-const accessor for process history registry.
Definition: InputSource.h:175
unsigned int eventsize
Definition: fed_trailer.h:33
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
std::string getBoLSFilePathOnFU(const unsigned int ls) const
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:354
void readWorker(unsigned int tid)
static std::atomic< unsigned int > counter
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::atomic< unsigned int > readingFilesCount_
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
bool emptyLumisectionMode() const
virtual void postForkReacquireResources(std::shared_ptr< edm::multicore::MessageReceiverForSource >) override
friend class InputFile
Open Root file and provide MEs ############.
size_(0)
Definition: OwnArray.h:181
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
volatile std::atomic< bool > shutdown_flag false
tbb::concurrent_queue< InputChunk * > freeChunks_
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: ntuplemaker.py:245
evf::EvFDaqDirector::FileStatus nextEvent()
void readNextChunkIntoBuffer(InputFile *file)
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:266
unsigned int nEvents_
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
tuple size
Write out results.
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
std::string getEoRFilePathOnFU() const