CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <sstream>
5 #include <sys/types.h>
6 #include <sys/file.h>
7 #include <sys/time.h>
8 #include <unistd.h>
9 #include <vector>
10 #include <fstream>
11 #include <zlib.h>
12 #include <stdio.h>
13 #include <chrono>
14 
15 #include <boost/algorithm/string.hpp>
16 #include <boost/filesystem/fstream.hpp>
17 
21 
24 
31 
35 
37 
41 
42 //JSON file reader
44 
45 #include <boost/lexical_cast.hpp>
46 
47 using namespace jsoncollector;
48 
50  edm::InputSourceDescription const& desc) :
51  edm::RawInputSource(pset, desc),
52  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
53  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",16)*1048576),
54  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",eventChunkSize_/1048576)*1048576),
55  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",1)),
56  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
57  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
58  testModeNoBuilderUnit_(edm::Service<evf::EvFDaqDirector>()->getTestModeNoBuilderUnit()),
59  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
60  fuOutputDir_(edm::Service<evf::EvFDaqDirector>()->baseRunDir()),
61  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
62  eventID_(),
63  processHistoryID_(),
64  currentLumiSection_(0),
65  eventsThisLumi_(0),
66  dpd_(nullptr)
67 {
68  char thishost[256];
69  gethostname(thishost, 255);
70  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
71  << std::endl << (eventChunkSize_/1048576)
72  << " MB on host " << thishost;
74  edm::LogInfo("FedRawDataInputSource") << "Test mode is ON!";
75 
77  setNewRun();
80 
81  dpd_ = new DataPointDefinition();
82  std::string defLabel = "data";
83  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
84 
85  //make sure that chunk size is N * block size
90 
91  if (!numBuffers_)
92  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
93  "no reading enabled with numBuffers parameter 0";
94 
97 
98  //het handles to DaqDirector and FastMonitoringService because it isn't acessible in readSupervisor thread
99 
100  try {
102  } catch (...){
103  edm::LogWarning("FedRawDataInputSource") << "FastMonitoringService not found";
104  assert(0);//test
105  }
106 
107  try {
109  //set DaqDirector to delete files in preGlobalEndLumi callback
112  if (fms_) daqDirector_->setFMS(fms_);
113  } catch (...){
114  edm::LogWarning("FedRawDataInputSource") << "EvFDaqDirector not found";
115  assert(0);//test
116  }
117 
118  //should delete chunks when run stops
119  for (unsigned int i=0;i<numBuffers_;i++) {
121  }
122 
123  quit_threads_ = false;
124 
125  for (unsigned int i=0;i<numConcurrentReads_;i++)
126  {
127  std::unique_lock<std::mutex> lk(startupLock_);
128  //issue a memory fence here and in threads (constructor was segfaulting without this)
129  thread_quit_signal.push_back(false);
130  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
131  cvReader_.push_back(new std::condition_variable);
132  threadInit_.store(false,std::memory_order_release);
133  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
134  startupCv_.wait(lk);
135  }
136 
137  runAuxiliary()->setProcessHistoryID(processHistoryID_);
138 }
139 
141 {
142  quit_threads_=true;
143 
144  //delete any remaining open files
145  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
146  deleteFile(it->second->fileName_);
147  delete it->second;
148  }
150  readSupervisorThread_->join();
151  }
152  else {
153  //join aux threads in case the supervisor thread was not started
154  for (unsigned int i=0;i<workerThreads_.size();i++) {
155  std::unique_lock<std::mutex> lk(mReader_);
156  thread_quit_signal[i]=true;
157  cvReader_[i]->notify_one();
158  lk.unlock();
159  workerThreads_[i]->join();
160  delete workerThreads_[i];
161  }
162  }
163  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
164  /*
165  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
166  InputChunk *ch;
167  while (!freeChunks_.try_pop(ch)) {}
168  delete ch;
169  }
170  */
171 }
172 
174 {
176  {
177  //this thread opens new files and dispatches reading to worker readers
178  //threadInit_.store(false,std::memory_order_release);
179  std::unique_lock<std::mutex> lk(startupLock_);
180  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
182  startupCv_.wait(lk);
183  }
184  switch (nextEvent() ) {
186 
187  //maybe create EoL file in working directory before ending run
188  struct stat buf;
189  if ( currentLumiSection_ > 0 ) {
190  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
191  if (eolFound) {
193  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
194  if ( !found ) {
196  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
197  close(eol_fd);
199  }
200  }
201  }
202  //also create EoR file in FU data directory
203  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
204  if (!eorFound) {
205  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
206  close(eor_fd);
207  }
209  eventsThisLumi_=0;
211  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
212  return false;
213  }
215  //this is not reachable
216  return true;
217  }
219  //std::cout << "--------------NEW LUMI---------------" << std::endl;
220  return true;
221  }
222  default: {
223  if (!getLSFromFilename_) {
224  //get new lumi from file header
225  if (event_->lumi() > currentLumiSection_) {
227  eventsThisLumi_=0;
228  maybeOpenNewLumiSection( event_->lumi() );
229  }
230  }
232 
233  setEventCached();
234 
235  return true;
236  }
237  }
238 }
239 
240 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection)
241 {
243  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
244 
245  if ( currentLumiSection_ > 0 ) {
246  const std::string fuEoLS =
248  struct stat buf;
249  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
250  if ( !found ) {
252  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
253  close(eol_fd);
255  }
256  }
257 
258  currentLumiSection_ = lumiSection;
259 
261 
262  timeval tv;
263  gettimeofday(&tv, 0);
264  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
265 
266  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
268  runAuxiliary()->run(),
269  lumiSection, lsopentime,
271 
272  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
273  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
274 
275  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
276  }
277 }
278 
280 {
282  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
283  {
284  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
285  }
286  return status;
287 }
288 
290 {
291  const size_t headerSize[4] = {0,2*sizeof(uint32),(4 + 1024) * sizeof(uint32),7*sizeof(uint32)}; //size per version of FRDEventHeader
292 
294  if (!currentFile_)
295  {
296  if (!streamFileTrackerPtr_) {
300  }
301 
303  if (!fileQueue_.try_pop(currentFile_))
304  {
305  //sleep until wakeup (only in single-buffer mode) or timeout
306  std::unique_lock<std::mutex> lkw(mWakeup_);
307  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
309  }
310  status = currentFile_->status_;
311  if ( status == evf::EvFDaqDirector::runEnded)
312  {
313  delete currentFile_;
314  currentFile_=nullptr;
315  return status;
316  }
317 
318  else if (status == evf::EvFDaqDirector::newLumi)
319  {
320  if (getLSFromFilename_) {
323  eventsThisLumi_=0;
325  }
326  }
327  else {//let this be picked up from next event
329  }
330 
331  delete currentFile_;
332  currentFile_=nullptr;
333  return status;
334  }
335  else if (status == evf::EvFDaqDirector::newFile) {
338  }
339  else
340  assert(0);
341  }
342 
343  //file is empty
344  if (!currentFile_->fileSize_) {
345  //try to open new lumi
346  assert(currentFile_->nChunks_==0);
347  if (getLSFromFilename_)
350  eventsThisLumi_=0;
352  }
353  //immediately delete empty file
355  delete currentFile_;
356  currentFile_=nullptr;
358  }
359 
360  //file is finished
362  //release last chunk (it is never released elsewhere)
365  {
366  throw cms::Exception("RuntimeError")
367  << "Fully processed " << currentFile_->nProcessed_
368  << " from the file " << currentFile_->fileName_
369  << " but according to BU JSON there should be "
370  << currentFile_->nEvents_ << " events";
371  }
372  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
373  if (singleBufferMode_) {
374  std::unique_lock<std::mutex> lkw(mWakeup_);
375  cvWakeup_.notify_one();
376  }
379  //put the file in pending delete list;
380  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
381  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
382  }
383  else {
384  //in single-thread and stream jobs, events are already processed
386  delete currentFile_;
387  }
388  currentFile_=nullptr;
390  }
391 
392 
393  //file is too short
395  {
396  throw cms::Exception("FedRawDataInputSource::cacheNextEvent") <<
397  "Premature end of input file while reading event header";
398  }
399  if (singleBufferMode_) {
400 
401  //should already be there
403  usleep(10000);
405  }
406 
407  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
408 
409 
410  if (!bufferInputRead_ || bufferInputRead_ < headerSize[detectedFRDversion_])
411  {
413 
414  if (detectedFRDversion_==0) {
415  detectedFRDversion_=*((uint32*)dataPosition);
416  assert(detectedFRDversion_>=1 && detectedFRDversion_<=3);
417  }
418 
419  //recalculate chunk position
420  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
421  if ( bufferInputRead_ < headerSize[detectedFRDversion_])
422  {
423  throw cms::Exception("FedRawDataInputSource::cacheNextEvent") <<
424  "Premature end of input file while reading event header";
425  }
426  }
427 
428  event_.reset( new FRDEventMsgView(dataPosition) );
429  if (event_->size()>eventChunkSize_) {
430  throw cms::Exception("FedRawDataInputSource::nextEvent")
431  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
432  << " run:" << event_->run() << " of size:" << event_->size()
433  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
434  }
435 
436  const uint32_t msgSize = event_->size()-headerSize[detectedFRDversion_];
437 
439  {
440  throw cms::Exception("FedRawDataInputSource::nextEvent") <<
441  "Premature end of input file while reading event data";
442  }
443  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
445  //recalculate chunk position
446  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
447  event_.reset( new FRDEventMsgView(dataPosition) );
448  }
449  currentFile_->bufferPosition_ += event_->size();
450  currentFile_->chunkPosition_ += event_->size();
451  //last chunk is released when this function is invoked next time
452 
453  }
454  //multibuffer mode:
455  else
456  {
457  //wait for the current chunk to become added to the vector
459  usleep(10000);
461  }
462 
463  //check if header is at the boundary of two chunks
464  chunkIsFree_ = false;
465  unsigned char *dataPosition;
466 
467  //read header, copy it to a single chunk if necessary
468  bool chunkEnd = currentFile_->advance(dataPosition,headerSize[detectedFRDversion_]);
469 
470  event_.reset( new FRDEventMsgView(dataPosition) );
471  if (event_->size()>eventChunkSize_) {
472  throw cms::Exception("FedRawDataInputSource::nextEvent")
473  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
474  << " run:" << event_->run() << " of size:" << event_->size()
475  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
476  }
477 
478  const uint32_t msgSize = event_->size()-headerSize[detectedFRDversion_];
479 
481  {
482  throw cms::Exception("FedRawDataInputSource::nextEvent") <<
483  "Premature end of input file while reading event data";
484  }
485 
486  if (chunkEnd) {
487  //header was at the chunk boundary, we will have to move payload as well
488  currentFile_->moveToPreviousChunk(msgSize,headerSize[detectedFRDversion_]);
489  chunkIsFree_ = true;
490  }
491  else {
492  //header was contiguous, but check if payload fits the chunk
493  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
494  //rewind to header start position
495  currentFile_->rewindChunk(headerSize[detectedFRDversion_]);
496  //copy event to a chunk start and move pointers
497  chunkEnd = currentFile_->advance(dataPosition,headerSize[detectedFRDversion_]+msgSize);
498  assert(chunkEnd);
499  chunkIsFree_=true;
500  //header is moved
501  event_.reset( new FRDEventMsgView(dataPosition) );
502  }
503  else {
504  //everything is in a single chunk, only move pointers forward
505  chunkEnd = currentFile_->advance(dataPosition,msgSize);
506  assert(!chunkEnd);
507  chunkIsFree_=false;
508  }
509  }
510  }//end multibuffer mode
511 
512  if ( verifyAdler32_ && event_->version() >= 3 )
513  {
514  uint32_t adler = adler32(0L,Z_NULL,0);
515  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
516 
517  if ( adler != event_->adler32() ) {
518  throw cms::Exception("FedRawDataInputSource::nextEvent") <<
519  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
520  " but calculated 0x" << adler;
521  }
522  }
524 
526 }
527 
529 {
530  const boost::filesystem::path filePath(fileName);
531  if (!testModeNoBuilderUnit_) {
532  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
533  try {
534  //sometimes this fails but file gets deleted
535  boost::filesystem::remove(filePath);
536  }
537  catch (const boost::filesystem::filesystem_error& ex)
538  {
539  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
540  << ". Trying again.";
541  usleep(100000);
542  try {
543  boost::filesystem::remove(filePath);
544  }
545  catch (...) {/*file gets deleted first time but exception is still thrown*/}
546  }
547  catch (std::exception& ex)
548  {
549  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
550  << ". Trying again.";
551  usleep(100000);
552  try {
553  boost::filesystem::remove(filePath);
554  } catch (...) {/*file gets deleted first time but exception is still thrown*/}
555  }
556  } else {
557  renameToNextFree(fileName);
558  }
559 }
560 
562 {
563  std::auto_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
564  edm::Timestamp tstamp = fillFEDRawDataCollection(rawData);
565 
566  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
569  makeEvent(eventPrincipal, aux);
570 
573 
574  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
575  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
576  // daqProvenanceHelper_.dummyProvenance_);
577 
578  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), edp,
580 
581  eventsThisLumi_++;
582 
583  //this old file check runs no more often than every 10 events
584  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
585  //delete files that are not in processing
586  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
587  auto it = filesToDelete_.begin();
588  while (it!=filesToDelete_.end()) {
589  bool fileIsBeingProcessed = false;
590  for (unsigned int i=0;i<nStreams_;i++) {
591  if (it->first == streamFileTrackerPtr_->at(i)) {
592  fileIsBeingProcessed = true;
593  break;
594  }
595  }
596  if (!fileIsBeingProcessed) {
597  deleteFile(it->second->fileName_);
598  delete it->second;
599  it = filesToDelete_.erase(it);
600  }
601  else it++;
602  }
603 
604  }
606  chunkIsFree_=false;
607  return;
608 }
609 
610 edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection(std::auto_ptr<FEDRawDataCollection>& rawData) const
611 {
612  edm::Timestamp tstamp;
613  uint32_t eventSize = event_->eventSize();
614  char* event = (char*)event_->payload();
615 
616  while (eventSize > 0) {
617  eventSize -= sizeof(fedt_t);
618  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
619  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
620  eventSize -= (fedSize - sizeof(fedh_t));
621  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
622  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
623  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
625  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
626  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
627  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
628  }
629  FEDRawData& fedData = rawData->FEDData(fedId);
630  fedData.resize(fedSize);
631  memcpy(fedData.data(), event + eventSize, fedSize);
632  }
633  assert(eventSize == 0);
634 
635  return tstamp;
636 }
637 
639 {
641  try {
642  // assemble json destination path
644 
645  //TODO:should be ported to use fffnaming
646  std::ostringstream fileNameWithPID;
647  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
648  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
649  jsonDestPath /= fileNameWithPID.str();
650 
651  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
652  << jsonDestPath;
653 
655  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
656  else {
657  try {
658  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
659  }
660  catch (const boost::filesystem::filesystem_error& ex)
661  {
662  // Input dir gone?
663  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
664  // << " Maybe the file is not yet visible by FU. Trying again in one second";
665  sleep(1);
666  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
667  }
669 
670 
671  try {
672  //sometimes this fails but file gets deleted
673  boost::filesystem::remove(jsonSourcePath);
674  }
675  catch (const boost::filesystem::filesystem_error& ex)
676  {
677  // Input dir gone?
678  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
679  }
680  catch (std::exception& ex)
681  {
682  // Input dir gone?
683  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
684  }
685 
686  }
687 
688  boost::filesystem::ifstream ij(jsonDestPath);
689  Json::Value deserializeRoot;
691 
692  if (!reader.parse(ij, deserializeRoot))
693  throw std::runtime_error("Cannot deserialize input JSON file");
694 
695  //read BU JSON
697  DataPoint dp;
698  dp.deserialize(deserializeRoot);
699  bool success = false;
700  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
701  if (dpd_->getNames().at(i)=="NEvents")
702  if (i<dp.getData().size()) {
703  data = dp.getData()[i];
704  success=true;
705  }
706  }
707  if (!success) {
708  if (dp.getData().size())
709  data = dp.getData()[0];
710  else
711  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
712  " error reading number of events from BU JSON -: No input value " << data;
713  }
714  return boost::lexical_cast<int>(data);
715 
716  }
717  catch (const boost::filesystem::filesystem_error& ex)
718  {
719  // Input dir gone?
721  edm::LogError("FedRawDataInputSource") << "grabNextFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
722  << " - Maybe the BU run dir disappeared? Ending process with code 0...";
723  _exit(-1);
724  }
725  catch (std::runtime_error e)
726  {
727  // Another process grabbed the file and NFS did not register this
729  edm::LogError("FedRawDataInputSource") << "grabNextFile - runtime Exception -: " << e.what();
730  }
731 
732  catch( boost::bad_lexical_cast const& ) {
733  edm::LogError("FedRawDataInputSource") << "grabNextFile - error parsing number of events from BU JSON. "
734  << "Input value is -: " << data;
735  }
736 
737  catch (std::exception e)
738  {
739  // BU run directory disappeared?
741  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
742  }
743 
744  return -1;
745 }
746 
748 {
751 
752  edm::LogInfo("FedRawDataInputSource") << "Instead of delete, RENAME -: " << fileName
753  << " to: " << destination.string();
754  boost::filesystem::rename(source,destination);
755  boost::filesystem::rename(source.replace_extension(".jsn"),destination.replace_extension(".jsn"));
756 }
757 
759 {}
760 
761 void FedRawDataInputSource::postForkReacquireResources(boost::shared_ptr<edm::multicore::MessageReceiverForSource>)
762 {
763  InputSource::rewind();
767 }
768 
770 {}
771 
772 
774 {
775  bool stop=false;
776  unsigned int currentLumiSection = 0;
777  //threadInit_.exchange(true,std::memory_order_acquire);
778 
779  {
780  std::unique_lock<std::mutex> lk(startupLock_);
781  startupCv_.notify_one();
782  }
783 
784  while (!stop) {
785 
786  //wait for at least one free thread and chunk
787  int counter=0;
788  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty())
789  {
790  std::unique_lock<std::mutex> lkw(mWakeup_);
791  //sleep until woken up by condition or a timeout
792  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
793  counter++;
794  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
795  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
796  }
797  else {
798  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
799  }
800  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
801  }
802 
803  if (stop) break;
804 
805  //look for a new file
806  std::string nextFile;
807  uint32_t ls;
808  uint32_t fileSize;
809 
811 
813 
814  while (status == evf::EvFDaqDirector::noFile) {
815  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
816  stop=true;
817  break;
818  }
819  else
820  status = daqDirector_->updateFuLock(ls,nextFile,fileSize);
821 
822  if ( status == evf::EvFDaqDirector::runEnded) {
824  stop=true;
825  break;
826  }
827 
828  //queue new lumisection
829  if( getLSFromFilename_ && ls > currentLumiSection) {
830  currentLumiSection = ls;
831  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
832  }
833 
834  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
835  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
836  _exit(-1);
837  }
838 
839  int dbgcount=0;
840  if (status == evf::EvFDaqDirector::noFile) {
841  dbgcount++;
842  //if (!(dbgcount%20)) edm::LogInfo("FedRawDataInputSource") << "No file for me... sleep and try again...";
843  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
844  usleep(100000);
845  }
846  }
847  if ( status == evf::EvFDaqDirector::newFile ) {
848  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
849 
850 
851  boost::filesystem::path rawFilePath(nextFile);
852  std::string rawFile = rawFilePath.replace_extension(".raw").string();
853 
854  struct stat st;
855  stat(rawFile.c_str(),&st);
856  fileSize=st.st_size;
857 
858  int eventsInNewFile = grabNextJsonFile(nextFile);
859  if (fms_) fms_->stoppedLookingForFile(ls);
860  assert( eventsInNewFile>=0 );
861  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
862 
863  if (!singleBufferMode_) {
864  //calculate number of needed chunks
865  unsigned int neededChunks = fileSize/eventChunkSize_;
866  if (fileSize%eventChunkSize_) neededChunks++;
867 
868  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
869  fileQueue_.push(newInputFile);
870 
871  for (unsigned int i=0;i<neededChunks;i++) {
872 
873  //get thread
874  unsigned int newTid = 0xffffffff;
875  while (!workerPool_.try_pop(newTid)) {
876  usleep(100000);
877  }
878 
879  InputChunk * newChunk = nullptr;
880  while (!freeChunks_.try_pop(newChunk)) {
881  usleep(100000);
882  if (quit_threads_.load(std::memory_order_relaxed)) break;
883  }
884 
885  if (newChunk == nullptr) {
886  //return unused tid if we received shutdown (nullptr chunk)
887  if (newTid!=0xffffffff) workerPool_.push(newTid);
888  stop = true;
889  break;
890  }
891 
892  std::unique_lock<std::mutex> lk(mReader_);
893 
894  unsigned int toRead = eventChunkSize_;
895  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
896  newChunk->reset(i*eventChunkSize_,toRead,i);
897 
898  workerJob_[newTid].first=newInputFile;
899  workerJob_[newTid].second=newChunk;
900 
901  //wake up the worker thread
902  cvReader_[newTid]->notify_one();
903  }
904  }
905  else {
906  if (!eventsInNewFile) {
907  //still queue file for lumi update
908  std::unique_lock<std::mutex> lkw(mWakeup_);
909  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
910  fileQueue_.push(newInputFile);
911  cvWakeup_.notify_one();
912  return;
913  }
914  //in single-buffer mode put single chunk in the file and let the main thread read the file
915  InputChunk * newChunk;
916  //should be available immediately
917  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
918 
919  std::unique_lock<std::mutex> lkw(mWakeup_);
920 
921  unsigned int toRead = eventChunkSize_;
922  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
923  newChunk->reset(0,toRead,0);
924  newChunk->readComplete_=true;
925 
926  //push file and wakeup main thread
927  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
928  newInputFile->chunks_[0]=newChunk;
929  fileQueue_.push(newInputFile);
930  cvWakeup_.notify_one();
931  }
932  }
933  }
934  //make sure threads finish reading
935  unsigned numFinishedThreads = 0;
936  while (numFinishedThreads < workerThreads_.size()) {
937  unsigned int tid;
938  while (!workerPool_.try_pop(tid)) {usleep(10000);}
939  std::unique_lock<std::mutex> lk(mReader_);
940  thread_quit_signal[tid]=true;
941  cvReader_[tid]->notify_one();
942  numFinishedThreads++;
943  }
944  for (unsigned int i=0;i<workerThreads_.size();i++) {
945  workerThreads_[i]->join();
946  delete workerThreads_[i];
947  }
948 }
949 
950 void FedRawDataInputSource::readWorker(unsigned int tid)
951 {
952  bool init = true;
953  threadInit_.exchange(true,std::memory_order_acquire);
954 
955  while (1) {
956 
957  std::unique_lock<std::mutex> lk(mReader_);
958  workerJob_[tid].first=nullptr;
959  workerJob_[tid].first=nullptr;
960 
961  assert(!thread_quit_signal[tid]);//should never get it here
962  workerPool_.push(tid);
963 
964  if (init) {
965  std::unique_lock<std::mutex> lk(startupLock_);
966  init = false;
967  startupCv_.notify_one();
968  }
969  cvReader_[tid]->wait(lk);
970 
971  if (thread_quit_signal[tid]) return;
972 
973  InputFile * file;
974  InputChunk * chunk;
975 
976  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
977 
978  file = workerJob_[tid].first;
979  chunk = workerJob_[tid].second;
980 
981  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
982  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
983 
984 
985  if (fileDescriptor>=0)
986  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
987  else
988  {
989  edm::LogError("FedRawDataInputSource") <<
990  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
991  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
992  setExceptionState_=true;
993  return;
994 
995  }
996 
997  unsigned int bufferLeft = 0;
999  for (unsigned int i=0;i<readBlocks_;i++)
1000  {
1001  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1002  if ( last > 0 )
1003  bufferLeft+=last;
1004  if (last < eventChunkBlock_) {
1005  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1006  break;
1007  }
1008  }
1010  auto diff = end-start;
1011  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1012  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1013  close(fileDescriptor);
1014 
1015  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1016  assert(detectedFRDversion_<=3);
1017  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1018  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1019 
1020  }
1021 }
1022 
1024 {
1025  quit_threads_=true;
1026  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1027 
1028 }
1029 
1030 
1031 inline bool InputFile::advance(unsigned char* & dataPosition, const size_t size)
1032 {
1033  //wait for chunk
1034  while (!waitForChunk(currentChunk_)) {
1035  usleep(100000);
1037  }
1038 
1039  dataPosition = chunks_[currentChunk_]->buf_+ chunkPosition_;
1040  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1041 
1042  if (currentLeft < size) {
1043 
1044  //we need next chunk
1045  while (!waitForChunk(currentChunk_+1)) {
1046  usleep(100000);
1048  }
1049  //copy everything to beginning of the first chunk
1050  dataPosition-=chunkPosition_;
1051  assert(dataPosition==chunks_[currentChunk_]->buf_);
1052  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_+chunkPosition_, currentLeft);
1053  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_+1]->buf_, size - currentLeft);
1054  //set pointers at the end of the old data position
1056  chunkPosition_=size-currentLeft;
1057  currentChunk_++;
1058  return true;
1059  }
1060  else {
1063  return false;
1064  }
1065 }
1066 
1067 inline void InputFile::moveToPreviousChunk(const size_t size, const size_t offset)
1068 {
1069  //this will fail in case of events that are too large
1070  assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1071  assert(size - offset < chunks_[currentChunk_]->size_);
1072  memcpy(chunks_[currentChunk_-1]->buf_+offset,chunks_[currentChunk_]->buf_+chunkPosition_,size);
1075 }
1076 
1077 inline void InputFile::rewindChunk(const size_t size) {
1080 }
1081 
1082 //single-buffer mode file reading
1084 {
1085 
1086  if (fileDescriptor_<0) {
1087  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1088  bufferInputRead_ = 0;
1089  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1090  if (fileDescriptor_>=0)
1091  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1092  else
1093  {
1094  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1095  << file->fileName_ << " fd:" << fileDescriptor_;
1096  }
1097  }
1098 
1099  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1100  uint32_t existingSize = 0;
1101  for (unsigned int i=0;i<readBlocks_;i++)
1102  {
1103  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1105  existingSize+=last;
1106  }
1107  }
1108  else {
1109  const uint32_t chunksize = file->chunkPosition_;
1110  const uint32_t blockcount=chunksize/eventChunkBlock_;
1111  const uint32_t leftsize = chunksize%eventChunkBlock_;
1112  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1113  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1114 
1115  for (uint32_t i=0;i<blockcount;i++) {
1116  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1118  existingSize+=last;
1119  }
1120  if (leftsize) {
1121  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1123  existingSize+=last;
1124  }
1125  file->chunkPosition_=0;//data was moved to beginning of the chunk
1126  }
1127  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1128  if (fileDescriptor_!=-1)
1129  {
1130  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1131  close(fileDescriptor_);
1132  fileDescriptor_=-1;
1133  }
1134  }
1135 }
1136 
1137 // define this class as an input source
1139 
#define LogDebug(id)
static const char runNumber_[]
std::vector< std::string > & getData()
Definition: DataPoint.h:58
unsigned int lumi_
int i
Definition: DBlmapReader.cc:9
unsigned int getgpshigh(const unsigned char *)
uint32_t chunkPosition_
std::condition_variable cvWakeup_
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void read(edm::EventPrincipal &eventPrincipal) override
struct fedh_struct fedh_t
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
virtual void rewind_() override
std::vector< int > * getStreamFileTracker()
unsigned int offset_
jsoncollector::DataPointDefinition * dpd_
tbb::concurrent_queue< unsigned int > workerPool_
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void rewindChunk(const size_t size)
std::vector< int > * streamFileTrackerPtr_
#define nullptr
int init
Definition: HydjetWrapper.h:62
evf::EvFDaqDirector::FileStatus status_
boost::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:262
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:595
std::atomic< bool > quit_threads_
volatile std::atomic< bool > shutdown_flag
std::vector< ReaderInfo > workerJob_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
std::unique_ptr< std::thread > readSupervisorThread_
ProductProvenance const & dummyProvenance() const
Represents a JSON value.
Definition: value.h:111
std::string getEoLSFilePathOnBU(const unsigned int ls) const
std::vector< std::condition_variable * > cvReader_
FedRawDataInputSource * parent_
unsigned int sourceid
Definition: fed_header.h:32
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:214
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize)
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
U second(std::pair< T, U > const &p)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
static Timestamp beginOfTime()
Definition: Timestamp.h:103
std::vector< bool > thread_quit_signal
void evm_board_setformat(size_t size)
uint32_t bufferPosition_
tuple path
else: Piece not in the list, fine.
void updateFileIndex(int const &fileIndex)
#define DEFINE_FWK_INPUT_SOURCE(type)
void resize(size_t newsize)
Definition: FEDRawData.cc:32
const edm::RunNumber_t runNumber_
bool advance(unsigned char *&dataPosition, const size_t size)
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:354
void moveToPreviousChunk(const size_t size, const size_t offset)
unsigned char * buf_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
int grabNextJsonFile(boost::filesystem::path const &)
def load
Definition: svgfig.py:546
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
#define end
Definition: vmac.h:37
virtual void preForkReleaseResources() override
bool waitForChunk(unsigned int chunkid)
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
unsigned int offset(bool)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
bool first
Definition: L1TdeRCT.cc:75
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:381
const std::string fuOutputDir_
std::atomic< bool > readComplete_
virtual bool checkNextEvent() override
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:601
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned long long uint64_t
Definition: Time.h:15
unsigned int currentChunk_
auto dp
Definition: deltaR.h:24
tbb::concurrent_queue< InputFile * > fileQueue_
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:347
std::unique_ptr< FRDEventMsgView > event_
virtual void postForkReacquireResources(boost::shared_ptr< edm::multicore::MessageReceiverForSource >) override
std::string fileName_
edm::Timestamp fillFEDRawDataCollection(std::auto_ptr< FEDRawDataCollection > &) const
void setProcessHistoryID(ProcessHistoryID const &phid)
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:362
boost::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:259
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Non-const accessor for process history registry.
Definition: InputSource.h:174
unsigned int eventsize
Definition: fed_trailer.h:33
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:350
void readWorker(unsigned int tid)
static std::atomic< unsigned int > counter
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
Unserialize a JSON document into a Value.
Definition: reader.h:16
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
friend class InputFile
Open Root file and provide MEs ############.
size_(0)
Definition: OwnArray.h:181
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::string getJumpFilePath() const
tbb::concurrent_queue< InputChunk * > freeChunks_
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: ntuplemaker.py:245
evf::EvFDaqDirector::FileStatus nextEvent()
void readNextChunkIntoBuffer(InputFile *file)
unsigned int nEvents_
void setFMS(evf::FastMonitoringService *fms)
static std::string const source
Definition: EdmProvDump.cc:43
unsigned int getgpslow(const unsigned char *)
tuple size
Write out results.
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
std::string getEoRFilePathOnFU() const
void put(BranchDescription const &bd, WrapperOwningHolder const &edp, ProductProvenance const &productProvenance)
void renameToNextFree(std::string const &fileName) const