CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <memory>
5 #include <sstream>
6 #include <sys/types.h>
7 #include <sys/file.h>
8 #include <sys/time.h>
9 #include <unistd.h>
10 #include <vector>
11 #include <fstream>
12 #include <zlib.h>
13 #include <stdio.h>
14 #include <chrono>
15 
16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
18 
19 
22 
29 
33 
35 
39 
41 
46 
47 //JSON file reader
49 
50 #include <boost/lexical_cast.hpp>
51 
52 using namespace jsoncollector;
53 
55  edm::InputSourceDescription const& desc) :
56  edm::RawInputSource(pset, desc),
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",16)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",eventChunkSize_/1048576)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",1)),
61  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
62  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
63  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
64  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
65  testModeNoBuilderUnit_(edm::Service<evf::EvFDaqDirector>()->getTestModeNoBuilderUnit()),
66  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
67  fuOutputDir_(edm::Service<evf::EvFDaqDirector>()->baseRunDir()),
68  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
69  eventID_(),
70  processHistoryID_(),
71  currentLumiSection_(0),
72  tcds_pointer_(0),
73  eventsThisLumi_(0),
74  dpd_(nullptr)
75 {
76  char thishost[256];
77  gethostname(thishost, 255);
78  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
79  << std::endl << (eventChunkSize_/1048576)
80  << " MB on host " << thishost;
82  edm::LogInfo("FedRawDataInputSource") << "Test mode is ON!";
83 
85  setNewRun();
88 
89  dpd_ = new DataPointDefinition();
90  std::string defLabel = "data";
91  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
92 
93  //make sure that chunk size is N * block size
98 
99  if (!numBuffers_)
100  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
101  "no reading enabled with numBuffers parameter 0";
102 
105 
106  if (!crc32c_hw_test())
107  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
108 
109  //het handles to DaqDirector and FastMonitoringService because it isn't acessible in readSupervisor thread
110 
111  try {
113  } catch (...){
114  edm::LogWarning("FedRawDataInputSource") << "FastMonitoringService not found";
115  assert(0);//test
116  }
117 
118  try {
120  //set DaqDirector to delete files in preGlobalEndLumi callback
123  if (fms_) daqDirector_->setFMS(fms_);
124  } catch (...){
125  edm::LogWarning("FedRawDataInputSource") << "EvFDaqDirector not found";
126  assert(0);//test
127  }
128 
129  //should delete chunks when run stops
130  for (unsigned int i=0;i<numBuffers_;i++) {
132  }
133 
134  quit_threads_ = false;
135 
136  for (unsigned int i=0;i<numConcurrentReads_;i++)
137  {
138  std::unique_lock<std::mutex> lk(startupLock_);
139  //issue a memory fence here and in threads (constructor was segfaulting without this)
140  thread_quit_signal.push_back(false);
141  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
142  cvReader_.push_back(new std::condition_variable);
143  threadInit_.store(false,std::memory_order_release);
144  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
145  startupCv_.wait(lk);
146  }
147 
148  runAuxiliary()->setProcessHistoryID(processHistoryID_);
149 }
150 
152 {
153  quit_threads_=true;
154 
155  //delete any remaining open files
156  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
157  deleteFile(it->second->fileName_);
158  delete it->second;
159  }
161  readSupervisorThread_->join();
162  }
163  else {
164  //join aux threads in case the supervisor thread was not started
165  for (unsigned int i=0;i<workerThreads_.size();i++) {
166  std::unique_lock<std::mutex> lk(mReader_);
167  thread_quit_signal[i]=true;
168  cvReader_[i]->notify_one();
169  lk.unlock();
170  workerThreads_[i]->join();
171  delete workerThreads_[i];
172  }
173  }
174  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
175  /*
176  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
177  InputChunk *ch;
178  while (!freeChunks_.try_pop(ch)) {}
179  delete ch;
180  }
181  */
182 }
183 
185 {
187  {
188  //this thread opens new files and dispatches reading to worker readers
189  //threadInit_.store(false,std::memory_order_release);
190  std::unique_lock<std::mutex> lk(startupLock_);
191  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
193  startupCv_.wait(lk);
194  }
195  switch (nextEvent() ) {
197 
198  //maybe create EoL file in working directory before ending run
199  struct stat buf;
200  if ( currentLumiSection_ > 0 ) {
201  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
202  if (eolFound) {
204  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
205  if ( !found ) {
207  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
208  close(eol_fd);
210  }
211  }
212  }
213  //also create EoR file in FU data directory
214  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
215  if (!eorFound) {
216  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
217  close(eor_fd);
218  }
220  eventsThisLumi_=0;
222  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
223  return false;
224  }
226  //this is not reachable
227  return true;
228  }
230  //std::cout << "--------------NEW LUMI---------------" << std::endl;
231  return true;
232  }
233  default: {
234  if (!getLSFromFilename_) {
235  //get new lumi from file header
236  if (event_->lumi() > currentLumiSection_) {
238  eventsThisLumi_=0;
239  maybeOpenNewLumiSection( event_->lumi() );
240  }
241  }
242  eventRunNumber_=event_->run();
243  L1EventID_ = event_->event();
244 
245  setEventCached();
246 
247  return true;
248  }
249  }
250 }
251 
252 void FedRawDataInputSource::createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
253 {
254  //used for backpressure mechanisms and monitoring
255  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
256  struct stat buf;
257  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
258  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
259  close(bol_fd);
260  }
261 }
262 
263 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection)
264 {
266  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
267 
268  if ( currentLumiSection_ > 0 ) {
269  const std::string fuEoLS =
271  struct stat buf;
272  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
273  if ( !found ) {
275  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
276  close(eol_fd);
277  createBoLSFile(lumiSection,false);
279  }
280  }
281  else createBoLSFile(lumiSection,true);//needed for initial lumisection
282 
283  currentLumiSection_ = lumiSection;
284 
286 
287  timeval tv;
288  gettimeofday(&tv, 0);
289  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
290 
291  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
293  runAuxiliary()->run(),
294  lumiSection, lsopentime,
296 
297  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
298  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
299 
300  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
301  }
302 }
303 
305 {
307  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
308  {
309  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
310  }
311  return status;
312 }
313 
315 {
316 
318  if (!currentFile_)
319  {
320  if (!streamFileTrackerPtr_) {
324  }
325 
327  if (!fileQueue_.try_pop(currentFile_))
328  {
329  //sleep until wakeup (only in single-buffer mode) or timeout
330  std::unique_lock<std::mutex> lkw(mWakeup_);
331  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
333  }
334  status = currentFile_->status_;
335  if ( status == evf::EvFDaqDirector::runEnded)
336  {
337  delete currentFile_;
338  currentFile_=nullptr;
339  return status;
340  }
341  else if ( status == evf::EvFDaqDirector::runAbort)
342  {
343  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
344  }
345  else if (status == evf::EvFDaqDirector::newLumi)
346  {
347  if (getLSFromFilename_) {
350  eventsThisLumi_=0;
352  }
353  }
354  else {//let this be picked up from next event
356  }
357 
358  delete currentFile_;
359  currentFile_=nullptr;
360  return status;
361  }
362  else if (status == evf::EvFDaqDirector::newFile) {
365  }
366  else
367  assert(0);
368  }
369 
370  //file is empty
371  if (!currentFile_->fileSize_) {
372  //try to open new lumi
374  if (getLSFromFilename_)
377  eventsThisLumi_=0;
379  }
380  //immediately delete empty file
382  delete currentFile_;
383  currentFile_=nullptr;
385  }
386 
387  //file is finished
389  //release last chunk (it is never released elsewhere)
392  {
393  throw cms::Exception("FedRawDataInputSource::getNextEvent")
394  << "Fully processed " << currentFile_->nProcessed_
395  << " from the file " << currentFile_->fileName_
396  << " but according to BU JSON there should be "
397  << currentFile_->nEvents_ << " events";
398  }
399  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
400  if (singleBufferMode_) {
401  std::unique_lock<std::mutex> lkw(mWakeup_);
402  cvWakeup_.notify_one();
403  }
406  //put the file in pending delete list;
407  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
408  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
409  }
410  else {
411  //in single-thread and stream jobs, events are already processed
413  delete currentFile_;
414  }
415  currentFile_=nullptr;
417  }
418 
419 
420  //file is too short
422  {
423  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
424  "Premature end of input file while reading event header";
425  }
426  if (singleBufferMode_) {
427 
428  //should already be there
430  usleep(10000);
432  }
433 
434  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
435 
436  //conditions when read amount is not sufficient for the header to fit
437  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
439  {
441 
442  if (detectedFRDversion_==0) {
443  detectedFRDversion_=*((uint32*)dataPosition);
444  if (detectedFRDversion_>5)
445  throw cms::Exception("FedRawDataInputSource::getNextEvent")
446  << "Unknown FRD version -: " << detectedFRDversion_;
447  assert(detectedFRDversion_>=1);
448  }
449 
450  //recalculate chunk position
451  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
452  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
453  {
454  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
455  "Premature end of input file while reading event header";
456  }
457  }
458 
459  event_.reset( new FRDEventMsgView(dataPosition) );
460  if (event_->size()>eventChunkSize_) {
461  throw cms::Exception("FedRawDataInputSource::getNextEvent")
462  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
463  << " run:" << event_->run() << " of size:" << event_->size()
464  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
465  }
466 
467  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
468 
470  {
471  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
472  "Premature end of input file while reading event data";
473  }
474  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
476  //recalculate chunk position
477  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
478  event_.reset( new FRDEventMsgView(dataPosition) );
479  }
480  currentFile_->bufferPosition_ += event_->size();
481  currentFile_->chunkPosition_ += event_->size();
482  //last chunk is released when this function is invoked next time
483 
484  }
485  //multibuffer mode:
486  else
487  {
488  //wait for the current chunk to become added to the vector
490  usleep(10000);
492  }
493 
494  //check if header is at the boundary of two chunks
495  chunkIsFree_ = false;
496  unsigned char *dataPosition;
497 
498  //read header, copy it to a single chunk if necessary
499  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
500 
501  event_.reset( new FRDEventMsgView(dataPosition) );
502  if (event_->size()>eventChunkSize_) {
503  throw cms::Exception("FedRawDataInputSource::getNextEvent")
504  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
505  << " run:" << event_->run() << " of size:" << event_->size()
506  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
507  }
508 
509  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
510 
512  {
513  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
514  "Premature end of input file while reading event data";
515  }
516 
517  if (chunkEnd) {
518  //header was at the chunk boundary, we will have to move payload as well
519  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
520  chunkIsFree_ = true;
521  }
522  else {
523  //header was contiguous, but check if payload fits the chunk
524  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
525  //rewind to header start position
526  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
527  //copy event to a chunk start and move pointers
528  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
529  assert(chunkEnd);
530  chunkIsFree_=true;
531  //header is moved
532  event_.reset( new FRDEventMsgView(dataPosition) );
533  }
534  else {
535  //everything is in a single chunk, only move pointers forward
536  chunkEnd = currentFile_->advance(dataPosition,msgSize);
537  assert(!chunkEnd);
538  chunkIsFree_=false;
539  }
540  }
541  }//end multibuffer mode
542 
543  if (verifyChecksum_ && event_->version() >= 5)
544  {
545  uint32_t crc=0;
546  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
547  if ( crc != event_->crc32c() ) {
549  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
550  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
551  " but calculated 0x" << crc;
552  }
553  }
554  else if ( verifyAdler32_ && event_->version() >= 3)
555  {
556  uint32_t adler = adler32(0L,Z_NULL,0);
557  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
558 
559  if ( adler != event_->adler32() ) {
561  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
562  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
563  " but calculated 0x" << adler;
564  }
565  }
566 
567 
569 
571 }
572 
574 {
575  const boost::filesystem::path filePath(fileName);
576  if (!testModeNoBuilderUnit_) {
577  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
578  try {
579  //sometimes this fails but file gets deleted
580  boost::filesystem::remove(filePath);
581  }
582  catch (const boost::filesystem::filesystem_error& ex)
583  {
584  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
585  << ". Trying again.";
586  usleep(100000);
587  try {
588  boost::filesystem::remove(filePath);
589  }
590  catch (...) {/*file gets deleted first time but exception is still thrown*/}
591  }
592  catch (std::exception& ex)
593  {
594  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
595  << ". Trying again.";
596  usleep(100000);
597  try {
598  boost::filesystem::remove(filePath);
599  } catch (...) {/*file gets deleted first time but exception is still thrown*/}
600  }
601  } else {
602  renameToNextFree(fileName);
603  }
604 }
605 
606 
608 {
609  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
610  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
611 
612  if (useL1EventID_){
614  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
617  makeEvent(eventPrincipal, aux);
618  }
619  else if(tcds_pointer_==0){
622  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
625  makeEvent(eventPrincipal, aux);
626  }
627  else{
628  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
631  processGUID());
633  makeEvent(eventPrincipal, aux);
634  }
635 
636 
637 
638  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
639 
640  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
641  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
642  // daqProvenanceHelper_.dummyProvenance_);
643 
644  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
646 
647  eventsThisLumi_++;
648 
649  //this old file check runs no more often than every 10 events
650  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
651  //delete files that are not in processing
652  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
653  auto it = filesToDelete_.begin();
654  while (it!=filesToDelete_.end()) {
655  bool fileIsBeingProcessed = false;
656  for (unsigned int i=0;i<nStreams_;i++) {
657  if (it->first == streamFileTrackerPtr_->at(i)) {
658  fileIsBeingProcessed = true;
659  break;
660  }
661  }
662  if (!fileIsBeingProcessed) {
663  deleteFile(it->second->fileName_);
664  delete it->second;
665  it = filesToDelete_.erase(it);
666  }
667  else it++;
668  }
669 
670  }
672  chunkIsFree_=false;
673  return;
674 }
675 
677 {
679  timeval stv;
680  gettimeofday(&stv,0);
681  time = stv.tv_sec;
682  time = (time << 32) + stv.tv_usec;
683  edm::Timestamp tstamp(time);
684 
685  uint32_t eventSize = event_->eventSize();
686  char* event = (char*)event_->payload();
687  GTPEventID_=0;
688  tcds_pointer_ = 0;
689  while (eventSize > 0) {
690  assert(eventSize>=sizeof(fedt_t));
691  eventSize -= sizeof(fedt_t);
692  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
693  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
694  assert(eventSize>=fedSize - sizeof(fedt_t));
695  eventSize -= (fedSize - sizeof(fedt_t));
696  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
697  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
698  if(fedId>FEDNumbering::MAXFEDID)
699  {
700  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
701  }
702  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
703  tcds_pointer_ = (unsigned char *)(event + eventSize );
704  }
705  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
706  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
707  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
708  else
709  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
710  //evf::evtn::evm_board_setformat(fedSize);
711  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
712  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
713  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
714  }
715  //take event ID from GTPE FED
716  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
717  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
718  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
719  }
720  }
721  FEDRawData& fedData = rawData.FEDData(fedId);
722  fedData.resize(fedSize);
723  memcpy(fedData.data(), event + eventSize, fedSize);
724  }
725  assert(eventSize == 0);
726 
727  return tstamp;
728 }
729 
731 {
733  try {
734  // assemble json destination path
736 
737  //TODO:should be ported to use fffnaming
738  std::ostringstream fileNameWithPID;
739  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
740  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
741  jsonDestPath /= fileNameWithPID.str();
742 
743  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
744  << jsonDestPath;
745 
747  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
748  else {
749  try {
750  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
751  }
752  catch (const boost::filesystem::filesystem_error& ex)
753  {
754  // Input dir gone?
755  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
756  // << " Maybe the file is not yet visible by FU. Trying again in one second";
757  sleep(1);
758  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
759  }
761 
762 
763  try {
764  //sometimes this fails but file gets deleted
765  boost::filesystem::remove(jsonSourcePath);
766  }
767  catch (const boost::filesystem::filesystem_error& ex)
768  {
769  // Input dir gone?
770  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
771  }
772  catch (std::exception& ex)
773  {
774  // Input dir gone?
775  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
776  }
777 
778  }
779 
780  boost::filesystem::ifstream ij(jsonDestPath);
781  Json::Value deserializeRoot;
783 
784  if (!reader.parse(ij, deserializeRoot))
785  throw std::runtime_error("Cannot deserialize input JSON file");
786 
787  //read BU JSON
789  DataPoint dp;
790  dp.deserialize(deserializeRoot);
791  bool success = false;
792  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
793  if (dpd_->getNames().at(i)=="NEvents")
794  if (i<dp.getData().size()) {
795  data = dp.getData()[i];
796  success=true;
797  }
798  }
799  if (!success) {
800  if (dp.getData().size())
801  data = dp.getData()[0];
802  else
803  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
804  " error reading number of events from BU JSON -: No input value " << data;
805  }
806  return boost::lexical_cast<int>(data);
807 
808  }
809  catch (const boost::filesystem::filesystem_error& ex)
810  {
811  // Input dir gone?
813  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
814  }
815  catch (std::runtime_error e)
816  {
817  // Another process grabbed the file and NFS did not register this
819  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
820  }
821 
822  catch( boost::bad_lexical_cast const& ) {
823  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
824  << "Input value is -: " << data;
825  }
826 
827  catch (std::exception e)
828  {
829  // BU run directory disappeared?
831  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
832  }
833 
834  return -1;
835 }
836 
838 {
841 
842  edm::LogInfo("FedRawDataInputSource") << "Instead of delete, RENAME -: " << fileName
843  << " to: " << destination.string();
844  boost::filesystem::rename(source,destination);
845  boost::filesystem::rename(source.replace_extension(".jsn"),destination.replace_extension(".jsn"));
846 }
847 
849 {}
850 
851 void FedRawDataInputSource::postForkReacquireResources(std::shared_ptr<edm::multicore::MessageReceiverForSource>)
852 {
853  InputSource::rewind();
857 }
858 
860 {}
861 
862 
864 {
865  bool stop=false;
866  unsigned int currentLumiSection = 0;
867  //threadInit_.exchange(true,std::memory_order_acquire);
868 
869  {
870  std::unique_lock<std::mutex> lk(startupLock_);
871  startupCv_.notify_one();
872  }
873 
874  while (!stop) {
875 
876  //wait for at least one free thread and chunk
877  int counter=0;
878  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty())
879  {
880  std::unique_lock<std::mutex> lkw(mWakeup_);
881  //sleep until woken up by condition or a timeout
882  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
883  counter++;
884  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
885  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
886  }
887  else {
888  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
889  }
890  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
891  }
892 
893  if (stop) break;
894 
895  //look for a new file
896  std::string nextFile;
897  uint32_t ls;
898  uint32_t fileSize;
899 
900  uint32_t monLS=1;
901  uint32_t lockCount=0;
902  uint64_t sumLockWaitTimeUs=0.;
903 
905 
907 
908  while (status == evf::EvFDaqDirector::noFile) {
909  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
910  stop=true;
911  break;
912  }
913 
914  uint64_t thisLockWaitTimeUs=0.;
915  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
916 
917  //monitoring of lock wait time
918  if (thisLockWaitTimeUs>0.)
919  sumLockWaitTimeUs+=thisLockWaitTimeUs;
920  lockCount++;
921  if (ls>monLS) {
922  monLS=ls;
923  if (lockCount)
924  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
925  lockCount=0;
926  sumLockWaitTimeUs=0;
927  }
928 
929  //check again for any remaining index/EoLS files after EoR file is seen
930  if ( status == evf::EvFDaqDirector::runEnded) {
931  usleep(100000);
932  //now all files should have appeared in ramdisk, check again if any raw files were left behind
933  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
934  }
935 
936  if ( status == evf::EvFDaqDirector::runEnded) {
938  stop=true;
939  break;
940  }
941 
942  //queue new lumisection
943  if( getLSFromFilename_ && ls > currentLumiSection) {
944  currentLumiSection = ls;
945  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
946  }
947 
948  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
949  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
951  stop=true;
952  break;
953  }
954 
955  int dbgcount=0;
956  if (status == evf::EvFDaqDirector::noFile) {
957  dbgcount++;
958  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
959  usleep(100000);
960  }
961  }
962  if ( status == evf::EvFDaqDirector::newFile ) {
963  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
964 
965 
966  boost::filesystem::path rawFilePath(nextFile);
967  std::string rawFile = rawFilePath.replace_extension(".raw").string();
968 
969  struct stat st;
970  stat(rawFile.c_str(),&st);
971  fileSize=st.st_size;
972 
973  int eventsInNewFile = grabNextJsonFile(nextFile);
974  if (fms_) fms_->stoppedLookingForFile(ls);
975  assert( eventsInNewFile>=0 );
976  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
977 
978  if (!singleBufferMode_) {
979  //calculate number of needed chunks
980  unsigned int neededChunks = fileSize/eventChunkSize_;
981  if (fileSize%eventChunkSize_) neededChunks++;
982 
983  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
984  fileQueue_.push(newInputFile);
985 
986  for (unsigned int i=0;i<neededChunks;i++) {
987 
988  //get thread
989  unsigned int newTid = 0xffffffff;
990  while (!workerPool_.try_pop(newTid)) {
991  usleep(100000);
992  }
993 
994  InputChunk * newChunk = nullptr;
995  while (!freeChunks_.try_pop(newChunk)) {
996  usleep(100000);
997  if (quit_threads_.load(std::memory_order_relaxed)) break;
998  }
999 
1000  if (newChunk == nullptr) {
1001  //return unused tid if we received shutdown (nullptr chunk)
1002  if (newTid!=0xffffffff) workerPool_.push(newTid);
1003  stop = true;
1004  break;
1005  }
1006 
1007  std::unique_lock<std::mutex> lk(mReader_);
1008 
1009  unsigned int toRead = eventChunkSize_;
1010  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1011  newChunk->reset(i*eventChunkSize_,toRead,i);
1012 
1013  workerJob_[newTid].first=newInputFile;
1014  workerJob_[newTid].second=newChunk;
1015 
1016  //wake up the worker thread
1017  cvReader_[newTid]->notify_one();
1018  }
1019  }
1020  else {
1021  if (!eventsInNewFile) {
1022  //still queue file for lumi update
1023  std::unique_lock<std::mutex> lkw(mWakeup_);
1024  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1025  fileQueue_.push(newInputFile);
1026  cvWakeup_.notify_one();
1027  return;
1028  }
1029  //in single-buffer mode put single chunk in the file and let the main thread read the file
1030  InputChunk * newChunk;
1031  //should be available immediately
1032  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1033 
1034  std::unique_lock<std::mutex> lkw(mWakeup_);
1035 
1036  unsigned int toRead = eventChunkSize_;
1037  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1038  newChunk->reset(0,toRead,0);
1039  newChunk->readComplete_=true;
1040 
1041  //push file and wakeup main thread
1042  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1043  newInputFile->chunks_[0]=newChunk;
1044  fileQueue_.push(newInputFile);
1045  cvWakeup_.notify_one();
1046  }
1047  }
1048  }
1049  //make sure threads finish reading
1050  unsigned numFinishedThreads = 0;
1051  while (numFinishedThreads < workerThreads_.size()) {
1052  unsigned int tid;
1053  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1054  std::unique_lock<std::mutex> lk(mReader_);
1055  thread_quit_signal[tid]=true;
1056  cvReader_[tid]->notify_one();
1057  numFinishedThreads++;
1058  }
1059  for (unsigned int i=0;i<workerThreads_.size();i++) {
1060  workerThreads_[i]->join();
1061  delete workerThreads_[i];
1062  }
1063 }
1064 
1066 {
1067  bool init = true;
1068  threadInit_.exchange(true,std::memory_order_acquire);
1069 
1070  while (1) {
1071 
1072  std::unique_lock<std::mutex> lk(mReader_);
1073  workerJob_[tid].first=nullptr;
1074  workerJob_[tid].first=nullptr;
1075 
1076  assert(!thread_quit_signal[tid]);//should never get it here
1077  workerPool_.push(tid);
1078 
1079  if (init) {
1080  std::unique_lock<std::mutex> lk(startupLock_);
1081  init = false;
1082  startupCv_.notify_one();
1083  }
1084  cvReader_[tid]->wait(lk);
1085 
1086  if (thread_quit_signal[tid]) return;
1087 
1088  InputFile * file;
1089  InputChunk * chunk;
1090 
1091  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1092 
1093  file = workerJob_[tid].first;
1094  chunk = workerJob_[tid].second;
1095 
1096  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1097  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1098 
1099 
1100  if (fileDescriptor>=0)
1101  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1102  else
1103  {
1104  edm::LogError("FedRawDataInputSource") <<
1105  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1106  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1107  setExceptionState_=true;
1108  return;
1109 
1110  }
1111 
1112  unsigned int bufferLeft = 0;
1114  for (unsigned int i=0;i<readBlocks_;i++)
1115  {
1116  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1117  if ( last > 0 )
1118  bufferLeft+=last;
1119  if (last < eventChunkBlock_) {
1120  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1121  break;
1122  }
1123  }
1125  auto diff = end-start;
1126  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1127  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1128  close(fileDescriptor);
1129 
1130  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1132  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1133  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1134 
1135  }
1136 }
1137 
1139 {
1140  quit_threads_=true;
1141  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1142 
1143 }
1144 
1145 
1146 inline bool InputFile::advance(unsigned char* & dataPosition, const size_t size)
1147 {
1148  //wait for chunk
1149  while (!waitForChunk(currentChunk_)) {
1150  usleep(100000);
1152  }
1153 
1154  dataPosition = chunks_[currentChunk_]->buf_+ chunkPosition_;
1155  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1156 
1157  if (currentLeft < size) {
1158 
1159  //we need next chunk
1160  while (!waitForChunk(currentChunk_+1)) {
1161  usleep(100000);
1163  }
1164  //copy everything to beginning of the first chunk
1165  dataPosition-=chunkPosition_;
1166  assert(dataPosition==chunks_[currentChunk_]->buf_);
1167  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_+chunkPosition_, currentLeft);
1168  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_+1]->buf_, size - currentLeft);
1169  //set pointers at the end of the old data position
1171  chunkPosition_=size-currentLeft;
1172  currentChunk_++;
1173  return true;
1174  }
1175  else {
1178  return false;
1179  }
1180 }
1181 
1182 inline void InputFile::moveToPreviousChunk(const size_t size, const size_t offset)
1183 {
1184  //this will fail in case of events that are too large
1186  assert(size - offset < chunks_[currentChunk_]->size_);
1187  memcpy(chunks_[currentChunk_-1]->buf_+offset,chunks_[currentChunk_]->buf_+chunkPosition_,size);
1190 }
1191 
1192 inline void InputFile::rewindChunk(const size_t size) {
1195 }
1196 
1197 //single-buffer mode file reading
1199 {
1200 
1201  if (fileDescriptor_<0) {
1202  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1203  bufferInputRead_ = 0;
1204  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1205  if (fileDescriptor_>=0)
1206  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1207  else
1208  {
1209  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1210  << file->fileName_ << " fd:" << fileDescriptor_;
1211  }
1212  }
1213 
1214  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1215  uint32_t existingSize = 0;
1216  for (unsigned int i=0;i<readBlocks_;i++)
1217  {
1218  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1220  existingSize+=last;
1221  }
1222  }
1223  else {
1224  const uint32_t chunksize = file->chunkPosition_;
1225  const uint32_t blockcount=chunksize/eventChunkBlock_;
1226  const uint32_t leftsize = chunksize%eventChunkBlock_;
1227  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1228  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1229 
1230  for (uint32_t i=0;i<blockcount;i++) {
1231  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1233  existingSize+=last;
1234  }
1235  if (leftsize) {
1236  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1238  existingSize+=last;
1239  }
1240  file->chunkPosition_=0;//data was moved to beginning of the chunk
1241  }
1242  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1243  if (fileDescriptor_!=-1)
1244  {
1245  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1246  close(fileDescriptor_);
1247  fileDescriptor_=-1;
1248  }
1249  }
1250 }
1251 
1252 // define this class as an input source
#define LogDebug(id)
static const char runNumber_[]
std::vector< std::string > & getData()
Definition: DataPoint.h:58
unsigned int lumi_
int i
Definition: DBlmapReader.cc:9
unsigned int getgpshigh(const unsigned char *)
uint32_t chunkPosition_
std::condition_variable cvWakeup_
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void read(edm::EventPrincipal &eventPrincipal) override
bool gtpe_board_sense(const unsigned char *p)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
virtual void rewind_() override
std::vector< int > * getStreamFileTracker()
bool crc32c_hw_test()
Definition: crc32c.cc:354
unsigned int offset_
jsoncollector::DataPointDefinition * dpd_
tbb::concurrent_queue< unsigned int > workerPool_
JetCorrectorParameters::Record record
Definition: classes.h:7
unsigned int get(const unsigned char *, bool)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
int init
Definition: HydjetWrapper.h:62
evf::EvFDaqDirector::FileStatus status_
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:590
std::atomic< bool > quit_threads_
volatile std::atomic< bool > shutdown_flag
std::vector< ReaderInfo > workerJob_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
std::unique_ptr< std::thread > readSupervisorThread_
ProductProvenance const & dummyProvenance() const
Represents a JSON value.
Definition: value.h:111
std::string getEoLSFilePathOnBU(const unsigned int ls) const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
#define nullptr
std::vector< std::condition_variable * > cvReader_
FedRawDataInputSource * parent_
unsigned int sourceid
Definition: fed_header.h:32
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:212
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
U second(std::pair< T, U > const &p)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
static Timestamp beginOfTime()
Definition: Timestamp.h:103
std::vector< bool > thread_quit_signal
uint32_t bufferPosition_
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
tuple path
else: Piece not in the list, fine.
void updateFileIndex(int const &fileIndex)
#define DEFINE_FWK_INPUT_SOURCE(type)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
Definition: FEDRawData.cc:32
const edm::RunNumber_t runNumber_
bool advance(unsigned char *&dataPosition, const size_t size)
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:352
void moveToPreviousChunk(const size_t size, const size_t offset)
unsigned char * buf_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
int grabNextJsonFile(boost::filesystem::path const &)
def load
Definition: svgfig.py:546
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
#define end
Definition: vmac.h:37
virtual void preForkReleaseResources() override
bool waitForChunk(unsigned int chunkid)
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
Definition: Timestamp.h:28
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:379
const std::string fuOutputDir_
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::atomic< bool > readComplete_
virtual bool checkNextEvent() override
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:596
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned long long uint64_t
Definition: Time.h:15
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance)
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:345
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
void setProcessHistoryID(ProcessHistoryID const &phid)
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:257
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:360
unsigned int eventsize
Definition: fed_trailer.h:33
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
std::string getBoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:348
void readWorker(unsigned int tid)
static std::atomic< unsigned int > counter
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
Unserialize a JSON document into a Value.
Definition: reader.h:16
ProcessHistoryRegistry & processHistoryRegistryForUpdate() const
Definition: InputSource.h:346
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
virtual void postForkReacquireResources(std::shared_ptr< edm::multicore::MessageReceiverForSource >) override
friend class InputFile
Open Root file and provide MEs ############.
size_(0)
Definition: OwnArray.h:181
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::string getJumpFilePath() const
volatile std::atomic< bool > shutdown_flag false
tbb::concurrent_queue< InputChunk * > freeChunks_
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: ntuplemaker.py:245
evf::EvFDaqDirector::FileStatus nextEvent()
void readNextChunkIntoBuffer(InputFile *file)
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:260
unsigned int nEvents_
void setFMS(evf::FastMonitoringService *fms)
static std::string const source
Definition: EdmProvDump.cc:42
unsigned int getgpslow(const unsigned char *)
tuple size
Write out results.
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
std::string getEoRFilePathOnFU() const
void renameToNextFree(std::string const &fileName) const