CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <memory>
5 #include <sstream>
6 #include <sys/types.h>
7 #include <sys/file.h>
8 #include <sys/time.h>
9 #include <unistd.h>
10 #include <vector>
11 #include <fstream>
12 #include <zlib.h>
13 #include <stdio.h>
14 #include <chrono>
15 
16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
18 
19 
22 
29 
33 
35 
39 
41 
45 
46 
47 //JSON file reader
49 
50 #include <boost/lexical_cast.hpp>
51 
52 using namespace jsoncollector;
53 
55  edm::InputSourceDescription const& desc) :
56  edm::RawInputSource(pset, desc),
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",16)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",eventChunkSize_/1048576)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",1)),
61  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
62  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
63  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
64  testModeNoBuilderUnit_(edm::Service<evf::EvFDaqDirector>()->getTestModeNoBuilderUnit()),
65  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
66  fuOutputDir_(edm::Service<evf::EvFDaqDirector>()->baseRunDir()),
67  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
68  eventID_(),
69  processHistoryID_(),
70  currentLumiSection_(0),
71  tcds_pointer_(0),
72  eventsThisLumi_(0),
73  dpd_(nullptr)
74 {
75  char thishost[256];
76  gethostname(thishost, 255);
77  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
78  << std::endl << (eventChunkSize_/1048576)
79  << " MB on host " << thishost;
81  edm::LogInfo("FedRawDataInputSource") << "Test mode is ON!";
82 
84  setNewRun();
87 
88  dpd_ = new DataPointDefinition();
89  std::string defLabel = "data";
90  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
91 
92  //make sure that chunk size is N * block size
97 
98  if (!numBuffers_)
99  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
100  "no reading enabled with numBuffers parameter 0";
101 
104 
105  //het handles to DaqDirector and FastMonitoringService because it isn't acessible in readSupervisor thread
106 
107  try {
109  } catch (...){
110  edm::LogWarning("FedRawDataInputSource") << "FastMonitoringService not found";
111  assert(0);//test
112  }
113 
114  try {
116  //set DaqDirector to delete files in preGlobalEndLumi callback
119  if (fms_) daqDirector_->setFMS(fms_);
120  } catch (...){
121  edm::LogWarning("FedRawDataInputSource") << "EvFDaqDirector not found";
122  assert(0);//test
123  }
124 
125  //should delete chunks when run stops
126  for (unsigned int i=0;i<numBuffers_;i++) {
128  }
129 
130  quit_threads_ = false;
131 
132  for (unsigned int i=0;i<numConcurrentReads_;i++)
133  {
134  std::unique_lock<std::mutex> lk(startupLock_);
135  //issue a memory fence here and in threads (constructor was segfaulting without this)
136  thread_quit_signal.push_back(false);
137  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
138  cvReader_.push_back(new std::condition_variable);
139  threadInit_.store(false,std::memory_order_release);
140  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
141  startupCv_.wait(lk);
142  }
143 
144  runAuxiliary()->setProcessHistoryID(processHistoryID_);
145 }
146 
148 {
149  quit_threads_=true;
150 
151  //delete any remaining open files
152  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
153  deleteFile(it->second->fileName_);
154  delete it->second;
155  }
157  readSupervisorThread_->join();
158  }
159  else {
160  //join aux threads in case the supervisor thread was not started
161  for (unsigned int i=0;i<workerThreads_.size();i++) {
162  std::unique_lock<std::mutex> lk(mReader_);
163  thread_quit_signal[i]=true;
164  cvReader_[i]->notify_one();
165  lk.unlock();
166  workerThreads_[i]->join();
167  delete workerThreads_[i];
168  }
169  }
170  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
171  /*
172  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
173  InputChunk *ch;
174  while (!freeChunks_.try_pop(ch)) {}
175  delete ch;
176  }
177  */
178 }
179 
181 {
183  {
184  //this thread opens new files and dispatches reading to worker readers
185  //threadInit_.store(false,std::memory_order_release);
186  std::unique_lock<std::mutex> lk(startupLock_);
187  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
189  startupCv_.wait(lk);
190  }
191  switch (nextEvent() ) {
193 
194  //maybe create EoL file in working directory before ending run
195  struct stat buf;
196  if ( currentLumiSection_ > 0 ) {
197  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
198  if (eolFound) {
200  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
201  if ( !found ) {
203  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
204  close(eol_fd);
206  }
207  }
208  }
209  //also create EoR file in FU data directory
210  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
211  if (!eorFound) {
212  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
213  close(eor_fd);
214  }
216  eventsThisLumi_=0;
218  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
219  return false;
220  }
222  //this is not reachable
223  return true;
224  }
226  //std::cout << "--------------NEW LUMI---------------" << std::endl;
227  return true;
228  }
229  default: {
230  if (!getLSFromFilename_) {
231  //get new lumi from file header
232  if (event_->lumi() > currentLumiSection_) {
234  eventsThisLumi_=0;
235  maybeOpenNewLumiSection( event_->lumi() );
236  }
237  }
238  eventRunNumber_=event_->run();
239  L1EventID_ = event_->event();
240 
241  setEventCached();
242 
243  return true;
244  }
245  }
246 }
247 
248 void FedRawDataInputSource::createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
249 {
250  //used for backpressure mechanisms and monitoring
251  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
252  struct stat buf;
253  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
254  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
255  close(bol_fd);
256  }
257 }
258 
259 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection)
260 {
262  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
263 
264  if ( currentLumiSection_ > 0 ) {
265  const std::string fuEoLS =
267  struct stat buf;
268  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
269  if ( !found ) {
271  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
272  close(eol_fd);
273  createBoLSFile(lumiSection,false);
275  }
276  }
277  else createBoLSFile(lumiSection,true);//needed for initial lumisection
278 
279  currentLumiSection_ = lumiSection;
280 
282 
283  timeval tv;
284  gettimeofday(&tv, 0);
285  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
286 
287  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
289  runAuxiliary()->run(),
290  lumiSection, lsopentime,
292 
293  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
294  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
295 
296  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
297  }
298 }
299 
301 {
303  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
304  {
305  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
306  }
307  return status;
308 }
309 
311 {
312  const size_t headerSize[4] = {0,2*sizeof(uint32),(4 + 1024) * sizeof(uint32),7*sizeof(uint32)}; //size per version of FRDEventHeader
313 
315  if (!currentFile_)
316  {
317  if (!streamFileTrackerPtr_) {
321  }
322 
324  if (!fileQueue_.try_pop(currentFile_))
325  {
326  //sleep until wakeup (only in single-buffer mode) or timeout
327  std::unique_lock<std::mutex> lkw(mWakeup_);
328  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
330  }
331  status = currentFile_->status_;
332  if ( status == evf::EvFDaqDirector::runEnded)
333  {
334  delete currentFile_;
335  currentFile_=nullptr;
336  return status;
337  }
338  else if ( status == evf::EvFDaqDirector::runAbort)
339  {
340  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
341  }
342  else if (status == evf::EvFDaqDirector::newLumi)
343  {
344  if (getLSFromFilename_) {
347  eventsThisLumi_=0;
349  }
350  }
351  else {//let this be picked up from next event
353  }
354 
355  delete currentFile_;
356  currentFile_=nullptr;
357  return status;
358  }
359  else if (status == evf::EvFDaqDirector::newFile) {
362  }
363  else
364  assert(0);
365  }
366 
367  //file is empty
368  if (!currentFile_->fileSize_) {
369  //try to open new lumi
371  if (getLSFromFilename_)
374  eventsThisLumi_=0;
376  }
377  //immediately delete empty file
379  delete currentFile_;
380  currentFile_=nullptr;
382  }
383 
384  //file is finished
386  //release last chunk (it is never released elsewhere)
389  {
390  throw cms::Exception("FedRawDataInputSource::getNextEvent")
391  << "Fully processed " << currentFile_->nProcessed_
392  << " from the file " << currentFile_->fileName_
393  << " but according to BU JSON there should be "
394  << currentFile_->nEvents_ << " events";
395  }
396  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
397  if (singleBufferMode_) {
398  std::unique_lock<std::mutex> lkw(mWakeup_);
399  cvWakeup_.notify_one();
400  }
403  //put the file in pending delete list;
404  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
405  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
406  }
407  else {
408  //in single-thread and stream jobs, events are already processed
410  delete currentFile_;
411  }
412  currentFile_=nullptr;
414  }
415 
416 
417  //file is too short
419  {
420  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
421  "Premature end of input file while reading event header";
422  }
423  if (singleBufferMode_) {
424 
425  //should already be there
427  usleep(10000);
429  }
430 
431  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
432 
433  //conditions when read amount is not sufficient for the header to fit
434  if (!bufferInputRead_ || bufferInputRead_ < headerSize[detectedFRDversion_]
435  || eventChunkSize_ - currentFile_->chunkPosition_ < headerSize[detectedFRDversion_])
436  {
438 
439  if (detectedFRDversion_==0) {
440  detectedFRDversion_=*((uint32*)dataPosition);
441  assert(detectedFRDversion_>=1 && detectedFRDversion_<=3);
442  }
443 
444  //recalculate chunk position
445  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
446  if ( bufferInputRead_ < headerSize[detectedFRDversion_])
447  {
448  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
449  "Premature end of input file while reading event header";
450  }
451  }
452 
453  event_.reset( new FRDEventMsgView(dataPosition) );
454  if (event_->size()>eventChunkSize_) {
455  throw cms::Exception("FedRawDataInputSource::getNextEvent")
456  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
457  << " run:" << event_->run() << " of size:" << event_->size()
458  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
459  }
460 
461  const uint32_t msgSize = event_->size()-headerSize[detectedFRDversion_];
462 
464  {
465  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
466  "Premature end of input file while reading event data";
467  }
468  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
470  //recalculate chunk position
471  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
472  event_.reset( new FRDEventMsgView(dataPosition) );
473  }
474  currentFile_->bufferPosition_ += event_->size();
475  currentFile_->chunkPosition_ += event_->size();
476  //last chunk is released when this function is invoked next time
477 
478  }
479  //multibuffer mode:
480  else
481  {
482  //wait for the current chunk to become added to the vector
484  usleep(10000);
486  }
487 
488  //check if header is at the boundary of two chunks
489  chunkIsFree_ = false;
490  unsigned char *dataPosition;
491 
492  //read header, copy it to a single chunk if necessary
493  bool chunkEnd = currentFile_->advance(dataPosition,headerSize[detectedFRDversion_]);
494 
495  event_.reset( new FRDEventMsgView(dataPosition) );
496  if (event_->size()>eventChunkSize_) {
497  throw cms::Exception("FedRawDataInputSource::getNextEvent")
498  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
499  << " run:" << event_->run() << " of size:" << event_->size()
500  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
501  }
502 
503  const uint32_t msgSize = event_->size()-headerSize[detectedFRDversion_];
504 
506  {
507  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
508  "Premature end of input file while reading event data";
509  }
510 
511  if (chunkEnd) {
512  //header was at the chunk boundary, we will have to move payload as well
513  currentFile_->moveToPreviousChunk(msgSize,headerSize[detectedFRDversion_]);
514  chunkIsFree_ = true;
515  }
516  else {
517  //header was contiguous, but check if payload fits the chunk
518  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
519  //rewind to header start position
520  currentFile_->rewindChunk(headerSize[detectedFRDversion_]);
521  //copy event to a chunk start and move pointers
522  chunkEnd = currentFile_->advance(dataPosition,headerSize[detectedFRDversion_]+msgSize);
523  assert(chunkEnd);
524  chunkIsFree_=true;
525  //header is moved
526  event_.reset( new FRDEventMsgView(dataPosition) );
527  }
528  else {
529  //everything is in a single chunk, only move pointers forward
530  chunkEnd = currentFile_->advance(dataPosition,msgSize);
531  assert(!chunkEnd);
532  chunkIsFree_=false;
533  }
534  }
535  }//end multibuffer mode
536 
537  if ( verifyAdler32_ && event_->version() >= 3 )
538  {
539  uint32_t adler = adler32(0L,Z_NULL,0);
540  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
541 
542  if ( adler != event_->adler32() ) {
543  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
544  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
545  " but calculated 0x" << adler;
546  }
547  }
549 
551 }
552 
554 {
555  const boost::filesystem::path filePath(fileName);
556  if (!testModeNoBuilderUnit_) {
557  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
558  try {
559  //sometimes this fails but file gets deleted
560  boost::filesystem::remove(filePath);
561  }
562  catch (const boost::filesystem::filesystem_error& ex)
563  {
564  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
565  << ". Trying again.";
566  usleep(100000);
567  try {
568  boost::filesystem::remove(filePath);
569  }
570  catch (...) {/*file gets deleted first time but exception is still thrown*/}
571  }
572  catch (std::exception& ex)
573  {
574  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
575  << ". Trying again.";
576  usleep(100000);
577  try {
578  boost::filesystem::remove(filePath);
579  } catch (...) {/*file gets deleted first time but exception is still thrown*/}
580  }
581  } else {
582  renameToNextFree(fileName);
583  }
584 }
585 
586 
588 {
589  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
590  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
591 
592  if (useL1EventID_){
594  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
597  makeEvent(eventPrincipal, aux);
598  }
599  else if(tcds_pointer_==0){
602  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
605  makeEvent(eventPrincipal, aux);
606  }
607  else{
608  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
611  processGUID());
613  makeEvent(eventPrincipal, aux);
614  }
615 
616 
617 
618  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
619 
620  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
621  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
622  // daqProvenanceHelper_.dummyProvenance_);
623 
624  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
626 
627  eventsThisLumi_++;
628 
629  //this old file check runs no more often than every 10 events
630  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
631  //delete files that are not in processing
632  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
633  auto it = filesToDelete_.begin();
634  while (it!=filesToDelete_.end()) {
635  bool fileIsBeingProcessed = false;
636  for (unsigned int i=0;i<nStreams_;i++) {
637  if (it->first == streamFileTrackerPtr_->at(i)) {
638  fileIsBeingProcessed = true;
639  break;
640  }
641  }
642  if (!fileIsBeingProcessed) {
643  deleteFile(it->second->fileName_);
644  delete it->second;
645  it = filesToDelete_.erase(it);
646  }
647  else it++;
648  }
649 
650  }
652  chunkIsFree_=false;
653  return;
654 }
655 
657 {
659  timeval stv;
660  gettimeofday(&stv,0);
661  time = stv.tv_sec;
662  time = (time << 32) + stv.tv_usec;
663  edm::Timestamp tstamp(time);
664 
665  uint32_t eventSize = event_->eventSize();
666  char* event = (char*)event_->payload();
667  GTPEventID_=0;
668  tcds_pointer_ = 0;
669  while (eventSize > 0) {
670  eventSize -= sizeof(fedt_t);
671  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
672  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
673  eventSize -= (fedSize - sizeof(fedh_t));
674  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
675  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
676  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
677  tcds_pointer_ = (unsigned char *)(event + eventSize );
678  }
679  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
680  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
681  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
682  else
683  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
684  //evf::evtn::evm_board_setformat(fedSize);
685  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
686  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
687  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
688  }
689  //take event ID from GTPE FED
690  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
691  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
692  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
693  }
694  }
695  FEDRawData& fedData = rawData.FEDData(fedId);
696  fedData.resize(fedSize);
697  memcpy(fedData.data(), event + eventSize, fedSize);
698  }
699  assert(eventSize == 0);
700 
701  return tstamp;
702 }
703 
705 {
707  try {
708  // assemble json destination path
710 
711  //TODO:should be ported to use fffnaming
712  std::ostringstream fileNameWithPID;
713  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
714  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
715  jsonDestPath /= fileNameWithPID.str();
716 
717  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
718  << jsonDestPath;
719 
721  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
722  else {
723  try {
724  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
725  }
726  catch (const boost::filesystem::filesystem_error& ex)
727  {
728  // Input dir gone?
729  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
730  // << " Maybe the file is not yet visible by FU. Trying again in one second";
731  sleep(1);
732  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
733  }
735 
736 
737  try {
738  //sometimes this fails but file gets deleted
739  boost::filesystem::remove(jsonSourcePath);
740  }
741  catch (const boost::filesystem::filesystem_error& ex)
742  {
743  // Input dir gone?
744  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
745  }
746  catch (std::exception& ex)
747  {
748  // Input dir gone?
749  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
750  }
751 
752  }
753 
754  boost::filesystem::ifstream ij(jsonDestPath);
755  Json::Value deserializeRoot;
757 
758  if (!reader.parse(ij, deserializeRoot))
759  throw std::runtime_error("Cannot deserialize input JSON file");
760 
761  //read BU JSON
763  DataPoint dp;
764  dp.deserialize(deserializeRoot);
765  bool success = false;
766  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
767  if (dpd_->getNames().at(i)=="NEvents")
768  if (i<dp.getData().size()) {
769  data = dp.getData()[i];
770  success=true;
771  }
772  }
773  if (!success) {
774  if (dp.getData().size())
775  data = dp.getData()[0];
776  else
777  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
778  " error reading number of events from BU JSON -: No input value " << data;
779  }
780  return boost::lexical_cast<int>(data);
781 
782  }
783  catch (const boost::filesystem::filesystem_error& ex)
784  {
785  // Input dir gone?
787  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
788  }
789  catch (std::runtime_error e)
790  {
791  // Another process grabbed the file and NFS did not register this
793  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
794  }
795 
796  catch( boost::bad_lexical_cast const& ) {
797  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
798  << "Input value is -: " << data;
799  }
800 
801  catch (std::exception e)
802  {
803  // BU run directory disappeared?
805  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
806  }
807 
808  return -1;
809 }
810 
812 {
815 
816  edm::LogInfo("FedRawDataInputSource") << "Instead of delete, RENAME -: " << fileName
817  << " to: " << destination.string();
818  boost::filesystem::rename(source,destination);
819  boost::filesystem::rename(source.replace_extension(".jsn"),destination.replace_extension(".jsn"));
820 }
821 
823 {}
824 
825 void FedRawDataInputSource::postForkReacquireResources(std::shared_ptr<edm::multicore::MessageReceiverForSource>)
826 {
827  InputSource::rewind();
831 }
832 
834 {}
835 
836 
838 {
839  bool stop=false;
840  unsigned int currentLumiSection = 0;
841  //threadInit_.exchange(true,std::memory_order_acquire);
842 
843  {
844  std::unique_lock<std::mutex> lk(startupLock_);
845  startupCv_.notify_one();
846  }
847 
848  while (!stop) {
849 
850  //wait for at least one free thread and chunk
851  int counter=0;
852  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty())
853  {
854  std::unique_lock<std::mutex> lkw(mWakeup_);
855  //sleep until woken up by condition or a timeout
856  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
857  counter++;
858  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
859  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
860  }
861  else {
862  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
863  }
864  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
865  }
866 
867  if (stop) break;
868 
869  //look for a new file
870  std::string nextFile;
871  uint32_t ls;
872  uint32_t fileSize;
873 
875 
877 
878  while (status == evf::EvFDaqDirector::noFile) {
879  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
880  stop=true;
881  break;
882  }
883 
884  status = daqDirector_->updateFuLock(ls,nextFile,fileSize);
885 
886  //check again for any remaining index/EoLS files after EoR file is seen
887  if ( status == evf::EvFDaqDirector::runEnded) {
888  usleep(100000);
889  //now all files should have appeared in ramdisk, check again if any raw files were left behind
890  status = daqDirector_->updateFuLock(ls,nextFile,fileSize);
891  }
892 
893  if ( status == evf::EvFDaqDirector::runEnded) {
895  stop=true;
896  break;
897  }
898 
899  //queue new lumisection
900  if( getLSFromFilename_ && ls > currentLumiSection) {
901  currentLumiSection = ls;
902  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
903  }
904 
905  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
906  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
908  stop=true;
909  break;
910  }
911 
912  int dbgcount=0;
913  if (status == evf::EvFDaqDirector::noFile) {
914  dbgcount++;
915  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
916  usleep(100000);
917  }
918  }
919  if ( status == evf::EvFDaqDirector::newFile ) {
920  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
921 
922 
923  boost::filesystem::path rawFilePath(nextFile);
924  std::string rawFile = rawFilePath.replace_extension(".raw").string();
925 
926  struct stat st;
927  stat(rawFile.c_str(),&st);
928  fileSize=st.st_size;
929 
930  int eventsInNewFile = grabNextJsonFile(nextFile);
931  if (fms_) fms_->stoppedLookingForFile(ls);
932  assert( eventsInNewFile>=0 );
933  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
934 
935  if (!singleBufferMode_) {
936  //calculate number of needed chunks
937  unsigned int neededChunks = fileSize/eventChunkSize_;
938  if (fileSize%eventChunkSize_) neededChunks++;
939 
940  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
941  fileQueue_.push(newInputFile);
942 
943  for (unsigned int i=0;i<neededChunks;i++) {
944 
945  //get thread
946  unsigned int newTid = 0xffffffff;
947  while (!workerPool_.try_pop(newTid)) {
948  usleep(100000);
949  }
950 
951  InputChunk * newChunk = nullptr;
952  while (!freeChunks_.try_pop(newChunk)) {
953  usleep(100000);
954  if (quit_threads_.load(std::memory_order_relaxed)) break;
955  }
956 
957  if (newChunk == nullptr) {
958  //return unused tid if we received shutdown (nullptr chunk)
959  if (newTid!=0xffffffff) workerPool_.push(newTid);
960  stop = true;
961  break;
962  }
963 
964  std::unique_lock<std::mutex> lk(mReader_);
965 
966  unsigned int toRead = eventChunkSize_;
967  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
968  newChunk->reset(i*eventChunkSize_,toRead,i);
969 
970  workerJob_[newTid].first=newInputFile;
971  workerJob_[newTid].second=newChunk;
972 
973  //wake up the worker thread
974  cvReader_[newTid]->notify_one();
975  }
976  }
977  else {
978  if (!eventsInNewFile) {
979  //still queue file for lumi update
980  std::unique_lock<std::mutex> lkw(mWakeup_);
981  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
982  fileQueue_.push(newInputFile);
983  cvWakeup_.notify_one();
984  return;
985  }
986  //in single-buffer mode put single chunk in the file and let the main thread read the file
987  InputChunk * newChunk;
988  //should be available immediately
989  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
990 
991  std::unique_lock<std::mutex> lkw(mWakeup_);
992 
993  unsigned int toRead = eventChunkSize_;
994  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
995  newChunk->reset(0,toRead,0);
996  newChunk->readComplete_=true;
997 
998  //push file and wakeup main thread
999  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1000  newInputFile->chunks_[0]=newChunk;
1001  fileQueue_.push(newInputFile);
1002  cvWakeup_.notify_one();
1003  }
1004  }
1005  }
1006  //make sure threads finish reading
1007  unsigned numFinishedThreads = 0;
1008  while (numFinishedThreads < workerThreads_.size()) {
1009  unsigned int tid;
1010  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1011  std::unique_lock<std::mutex> lk(mReader_);
1012  thread_quit_signal[tid]=true;
1013  cvReader_[tid]->notify_one();
1014  numFinishedThreads++;
1015  }
1016  for (unsigned int i=0;i<workerThreads_.size();i++) {
1017  workerThreads_[i]->join();
1018  delete workerThreads_[i];
1019  }
1020 }
1021 
1023 {
1024  bool init = true;
1025  threadInit_.exchange(true,std::memory_order_acquire);
1026 
1027  while (1) {
1028 
1029  std::unique_lock<std::mutex> lk(mReader_);
1030  workerJob_[tid].first=nullptr;
1031  workerJob_[tid].first=nullptr;
1032 
1033  assert(!thread_quit_signal[tid]);//should never get it here
1034  workerPool_.push(tid);
1035 
1036  if (init) {
1037  std::unique_lock<std::mutex> lk(startupLock_);
1038  init = false;
1039  startupCv_.notify_one();
1040  }
1041  cvReader_[tid]->wait(lk);
1042 
1043  if (thread_quit_signal[tid]) return;
1044 
1045  InputFile * file;
1046  InputChunk * chunk;
1047 
1048  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1049 
1050  file = workerJob_[tid].first;
1051  chunk = workerJob_[tid].second;
1052 
1053  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1054  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1055 
1056 
1057  if (fileDescriptor>=0)
1058  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1059  else
1060  {
1061  edm::LogError("FedRawDataInputSource") <<
1062  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1063  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1064  setExceptionState_=true;
1065  return;
1066 
1067  }
1068 
1069  unsigned int bufferLeft = 0;
1071  for (unsigned int i=0;i<readBlocks_;i++)
1072  {
1073  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1074  if ( last > 0 )
1075  bufferLeft+=last;
1076  if (last < eventChunkBlock_) {
1077  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1078  break;
1079  }
1080  }
1082  auto diff = end-start;
1083  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1084  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1085  close(fileDescriptor);
1086 
1087  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1089  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1090  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1091 
1092  }
1093 }
1094 
1096 {
1097  quit_threads_=true;
1098  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1099 
1100 }
1101 
1102 
1103 inline bool InputFile::advance(unsigned char* & dataPosition, const size_t size)
1104 {
1105  //wait for chunk
1106  while (!waitForChunk(currentChunk_)) {
1107  usleep(100000);
1109  }
1110 
1111  dataPosition = chunks_[currentChunk_]->buf_+ chunkPosition_;
1112  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1113 
1114  if (currentLeft < size) {
1115 
1116  //we need next chunk
1117  while (!waitForChunk(currentChunk_+1)) {
1118  usleep(100000);
1120  }
1121  //copy everything to beginning of the first chunk
1122  dataPosition-=chunkPosition_;
1123  assert(dataPosition==chunks_[currentChunk_]->buf_);
1124  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_+chunkPosition_, currentLeft);
1125  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_+1]->buf_, size - currentLeft);
1126  //set pointers at the end of the old data position
1128  chunkPosition_=size-currentLeft;
1129  currentChunk_++;
1130  return true;
1131  }
1132  else {
1135  return false;
1136  }
1137 }
1138 
1139 inline void InputFile::moveToPreviousChunk(const size_t size, const size_t offset)
1140 {
1141  //this will fail in case of events that are too large
1143  assert(size - offset < chunks_[currentChunk_]->size_);
1144  memcpy(chunks_[currentChunk_-1]->buf_+offset,chunks_[currentChunk_]->buf_+chunkPosition_,size);
1147 }
1148 
1149 inline void InputFile::rewindChunk(const size_t size) {
1152 }
1153 
1154 //single-buffer mode file reading
1156 {
1157 
1158  if (fileDescriptor_<0) {
1159  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1160  bufferInputRead_ = 0;
1161  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1162  if (fileDescriptor_>=0)
1163  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1164  else
1165  {
1166  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1167  << file->fileName_ << " fd:" << fileDescriptor_;
1168  }
1169  }
1170 
1171  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1172  uint32_t existingSize = 0;
1173  for (unsigned int i=0;i<readBlocks_;i++)
1174  {
1175  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1177  existingSize+=last;
1178  }
1179  }
1180  else {
1181  const uint32_t chunksize = file->chunkPosition_;
1182  const uint32_t blockcount=chunksize/eventChunkBlock_;
1183  const uint32_t leftsize = chunksize%eventChunkBlock_;
1184  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1185  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1186 
1187  for (uint32_t i=0;i<blockcount;i++) {
1188  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1190  existingSize+=last;
1191  }
1192  if (leftsize) {
1193  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1195  existingSize+=last;
1196  }
1197  file->chunkPosition_=0;//data was moved to beginning of the chunk
1198  }
1199  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1200  if (fileDescriptor_!=-1)
1201  {
1202  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1203  close(fileDescriptor_);
1204  fileDescriptor_=-1;
1205  }
1206  }
1207 }
1208 
1209 // define this class as an input source
#define LogDebug(id)
static const char runNumber_[]
std::vector< std::string > & getData()
Definition: DataPoint.h:58
unsigned int lumi_
int i
Definition: DBlmapReader.cc:9
unsigned int getgpshigh(const unsigned char *)
uint32_t chunkPosition_
std::condition_variable cvWakeup_
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void read(edm::EventPrincipal &eventPrincipal) override
bool gtpe_board_sense(const unsigned char *p)
struct fedh_struct fedh_t
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
virtual void rewind_() override
std::vector< int > * getStreamFileTracker()
unsigned int offset_
jsoncollector::DataPointDefinition * dpd_
tbb::concurrent_queue< unsigned int > workerPool_
JetCorrectorParameters::Record record
Definition: classes.h:7
unsigned int get(const unsigned char *, bool)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
void rewindChunk(const size_t size)
string destination
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
int init
Definition: HydjetWrapper.h:62
evf::EvFDaqDirector::FileStatus status_
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:600
std::atomic< bool > quit_threads_
volatile std::atomic< bool > shutdown_flag
std::vector< ReaderInfo > workerJob_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
std::unique_ptr< std::thread > readSupervisorThread_
ProductProvenance const & dummyProvenance() const
Represents a JSON value.
Definition: value.h:111
std::string getEoLSFilePathOnBU(const unsigned int ls) const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
#define nullptr
std::vector< std::condition_variable * > cvReader_
FedRawDataInputSource * parent_
unsigned int sourceid
Definition: fed_header.h:32
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:218
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize)
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
U second(std::pair< T, U > const &p)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
static Timestamp beginOfTime()
Definition: Timestamp.h:103
std::vector< bool > thread_quit_signal
uint32_t bufferPosition_
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
tuple path
else: Piece not in the list, fine.
void updateFileIndex(int const &fileIndex)
#define DEFINE_FWK_INPUT_SOURCE(type)
void resize(size_t newsize)
Definition: FEDRawData.cc:32
const edm::RunNumber_t runNumber_
bool advance(unsigned char *&dataPosition, const size_t size)
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:358
void moveToPreviousChunk(const size_t size, const size_t offset)
unsigned char * buf_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
int grabNextJsonFile(boost::filesystem::path const &)
def load
Definition: svgfig.py:546
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
#define end
Definition: vmac.h:37
virtual void preForkReleaseResources() override
bool waitForChunk(unsigned int chunkid)
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
Definition: Timestamp.h:28
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:385
const std::string fuOutputDir_
std::atomic< bool > readComplete_
virtual bool checkNextEvent() override
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:606
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned long long uint64_t
Definition: Time.h:15
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance)
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:351
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
void setProcessHistoryID(ProcessHistoryID const &phid)
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:263
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:366
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Non-const accessor for process history registry.
Definition: InputSource.h:175
unsigned int eventsize
Definition: fed_trailer.h:33
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
std::string getBoLSFilePathOnFU(const unsigned int ls) const
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:354
void readWorker(unsigned int tid)
static std::atomic< unsigned int > counter
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
Unserialize a JSON document into a Value.
Definition: reader.h:16
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
virtual void postForkReacquireResources(std::shared_ptr< edm::multicore::MessageReceiverForSource >) override
friend class InputFile
Open Root file and provide MEs ############.
size_(0)
Definition: OwnArray.h:181
unsigned int gtpe_get(const unsigned char *)
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::string getJumpFilePath() const
volatile std::atomic< bool > shutdown_flag false
tbb::concurrent_queue< InputChunk * > freeChunks_
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: ntuplemaker.py:245
evf::EvFDaqDirector::FileStatus nextEvent()
void readNextChunkIntoBuffer(InputFile *file)
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:266
unsigned int nEvents_
void setFMS(evf::FastMonitoringService *fms)
static std::string const source
Definition: EdmProvDump.cc:42
unsigned int getgpslow(const unsigned char *)
tuple size
Write out results.
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
std::string getEoRFilePathOnFU() const
void renameToNextFree(std::string const &fileName) const